Overall Statistics
Total Orders
Average Win
Average Loss
Compounding Annual Return
Start Equity
End Equity
Net Profit
Sharpe Ratio
Sortino Ratio
Probabilistic Sharpe Ratio
Loss Rate
Win Rate
Profit-Loss Ratio
Annual Standard Deviation
Annual Variance
Information Ratio
Tracking Error
Treynor Ratio
Total Fees
Estimated Strategy Capacity
Lowest Capacity Asset
Portfolio Turnover
from AlgorithmImports import *
from collections import deque
import numpy as np

class GAPMRatios:
    def __init__(self, upGaps=0, downGaps=0):
        self.UpGaps     = upGaps
        self.DownGaps   = downGaps

class GAPM(PythonIndicator):
    The GAPM (Gap Measure) Indicator:

        - Measures the ratio of upward price gaps to downward price gaps over a specified period.
        - Utilizes price gaps between the current open and the previous close to identify potential trends.

        1. Iterates over a defined 'period', calculating the gap between the opening price of the current day
           and the closing price of the previous day.
        2. Accumulates total upward gaps (positive differences) and total downward gaps (negative differences).
        3. Calculates the Gap Ratio as follows:
            - If there are no downward gaps (dn_gaps == 0), the Gap Ratio is set to 1 to avoid division by zero.
            - Otherwise, the Gap Ratio is calculated as 100 times the sum of upward gaps divided by the absolute sum
              of downward gaps, highlighting the dominance of upward or downward gaps within the period.
        4. Applies a Simple Moving Average (SMA) to the Gap Ratio over a 'signal_period' to smooth the indicator,
           producing the final GAPM value.

        - The GAPM indicator can be used to identify potential bullish or bearish trends based on the prevalence of
          gap movements in one direction over another.
        - A higher GAPM value indicates a dominance of upward gaps, suggesting potential bullish trends, while
          a lower value (closer to 1) suggests a dominance of downward gaps or an equal presence of both, potentially
          indicating bearish trends or a lack of strong directional movement.

        - 'period': The lookback period over which to calculate price gaps.
        - 'signal_period': The period over which to apply the SMA to the Gap Ratio, smoothing the final indicator value.

        - The smoothed GAPM indicator, providing a measure of the relative presence of upward to downward gaps,
          useful for trend identification and trading decisions.

    def __init__(self, period, signal_period):
        Initialize the GAPM indicator.

        :param period: The lookback period over which to calculate price gaps.
        :param signal_period: The period over which to apply the SMA to the Gap Ratio.
        self.period = period
        self.signal_period = signal_period
        # self.sma            = SimpleMovingAverage(signal_period)
        # self.sma_short      = SimpleMovingAverage(signal_period)
        self.sma            = ExponentialMovingAverage(signal_period)
        self.sma_short      = ExponentialMovingAverage(signal_period)

        self.prev_close     = None
        self.gaps           = deque(maxlen=period)
        self.Value          = 0
        self.WarmUpPeriod   = max(period, signal_period)
        self.GapSMAs        = GAPMRatios()

    def IsReady(self) -> bool:
        Check if the indicator is ready.

        :return: True if the indicator is ready, False otherwise.
        return self.sma.IsReady and self.sma_short.IsReady and \
                len(self.gaps) == self.gaps.maxlen

    def Reset(self):
        """Reset the indicator to its initial state."""
        self.prev_close = None
        self.Value      = 0

    def Update(self, input_data):
        Update the GAPM indicator with the latest price data.

        :param input_data: The input price data (bar).
        :return: True if the indicator is ready, False otherwise.
        if input_data is None:
            return False

        if self.prev_close is not None:
            gap = input_data.Open - self.prev_close

            up_gaps = sum(g for g in self.gaps if g > 0)
            dn_gaps = sum(abs(g) for g in self.gaps if g < 0)

            # Get Up-Gap Ratio and take SMA
            up_gap_ratio = 1 if dn_gaps == 0 else 100 * up_gaps / dn_gaps
            self.sma.Update(input_data.Time, up_gap_ratio)

            # Get Down-Gap Ratio and take SMA
            down_gap_ratio = 1 if up_gaps == 0 else 100 * dn_gaps / up_gaps
            self.sma_short.Update(input_data.Time, down_gap_ratio)

            self.GapSMAs = GAPMRatios()
            self.GapSMAs.UpGaps   = self.sma.Current.Value
            self.GapSMAs.DownGaps = self.sma_short.Current.Value
            self.Current.Value    = self.sma.Current.Value
            self.Value            = self.Current.Value 

        self.prev_close = input_data.Close

        return self.IsReady
#region imports
from AlgorithmImports import *

class TrailingStopHelper():
    def __init__(self, algo, symbol, lookback=14, volIndicator=None, trailStopCoeff=4, initialStopCoeff=1, activationCoeff=2):
        self.algo           = algo
        self.symbol         = symbol 
        self.atr            = volIndicator
        self.atr.Updated   += self.OnATRIndicatorUpdated

        # Track Price 
        # -----------
        self.entryPrice         = 0
        # Messages
        # ---------
        self.ExitMessage        = ""
        self.EntryMessage       = ""

        # Stop Loss State
        # -------------------------------
        self.initialStopLoss    = 0
        self.baseTrailStopCoeff = trailStopCoeff
        self.initialStopCoeff   = initialStopCoeff
        self.activationCoeff    = activationCoeff
    ## ====================================
    def OnData(self, tradeBar):
        # self.atr.Update(tradeBar)

    def lastPrice(self):
        if (self.symbol in self.algo.Securities \
            and self.algo.Securities[self.symbol] is not None):

            return self.algo.Securities[self.symbol].Price
        return 0

    ## ====================================
    def OnATRIndicatorUpdated(self, sender, updated):

    # Trailing Stop Exit    
    # This method also updates the trailing stop
    # ============================================
    def TrailingExitSignalFired(self):
        if( not self.atr.IsReady ):
            return False   

        # If trailing stop is NOT set, get last price, and set it
        # --------------------------------------------------------
        if( not self.stopsLossActivated ):
            self.highestPrice       = self.lastPrice
            self.trailingStopLoss   = self.lastPrice - (self.atr.Current.Value * self.trailStopCoef)

            self.stopsLossActivated = True
            # Recursively call this function to check for stops
            # again, now that the trailing stop has been activated
            # and the stop loss value has been updated.
            # --------------------------------------------------
            return self.TrailingExitSignalFired()            
        # If trailing stop loss is activated, check if price closed below it.
        # If it did, then exit. If not, update the trailing stop loss. 
        # -------------------------------------------------------------------
            if self.PriceIsBelowTrailingStop():              
                return True

                # If price has gone up
                # -----------------------------------------------
                if self.lastPrice > self.highestPrice:

                    # If price is above the trail activation price, update trailing stop
                    # --------------------------------------------------------
                    if self.lastPrice > self.activationPrice:
                        self.highestPrice = self.lastPrice
                        newTrailingStopLoss = self.highestPrice -  (self.atr.Current.Value * self.trailStopCoef)                           
                        self.trailingStopLoss  = max (self.trailingStopLoss, newTrailingStopLoss)
                    # if( self.symbol.Value == "BOOT"): #ADMA
                    #     self.algo.Log(f"[{self.symbol}][Trailing Stop Updated] ${self.trailingStopLoss:.2f} (Price: ${self.lastPrice:.2f})")
                    # check again just in case price ends up below the new trailing stop level
                    if self.PriceIsBelowTrailingStop():
                        return True

        return False 

    # ------------------------------------------------
    def PriceIsBelowTrailingStop(self):
        if( self.lastPrice < self.trailingStopLoss ):
            # self.algo.LogUtil.Info(f"[{self.symbol}][Trailing Stop Triggered] : ${self.lastPrice:.2f}")
            if self.lastPrice > self.activationPrice:
                self.ExitMessage = "Trail Stop Activiated"
                self.ExitMessage = "Initial Stop Triggered"

            return True

    ## Logic to run immediately after a new position is opened.
    ## ---------------------------------------------------------
    def OnPositionOpened(self, entryPrice):            
        # self.algo.LogUtil.Info(f"Bought {self.symbol.Value}] @ approx ${self.lastPrice:.2f}") 
        self.EntryMessage   = ""
        self.entryPrice = entryPrice
        # For trailing stop
        # --------------------

    ## Logic to run immediately after a position is closed
    ## ---------------------------------------------------------
    def OnPositionClosed(self):            
        # self.algo.Log(f"Sold {self.symbol.Value}] @ approx ${self.lastPrice:.2f}")         
        self.ExitMessage  = "No Exit Message"
    # ========================================================================
    def ResetStopLosses(self):
        self.stopsLossActivated   = False
        self.initialStopLoss      = 0
        self.activationPrice        = 0
        self.trailingStopLoss     = 0
    # ========================================================================
    # Set initial stop and activation level. Called after new position opened.
    # ========================================================================
    def SetInitialStops(self):
        ## TODO: Use onOrderEvent to set this, because the actual price may be different
        self.entryPrice        = self.lastPrice
        # self.initialStopLoss = self.lastPrice - (self.atr.Current.Value * self.trailStopCoef)  
        self.initialStopLoss   = self.entryPrice - (self.atr.Current.Value * self.initialStopCoeff)  
        self.activationPrice   = self.entryPrice + (self.atr.Current.Value * self.activationCoeff)  

    def trailStopCoef(self):
        # Volatility Test
        # --------------------------------------------------------------------
        # testing: adjusting based on relative volatility (per atr percentile)
        # return (self.baseTrailStopCoeff * (self.atr.Current.Value / self.algo.atr_percentile))
        # Logic to Use Activation.
        # If price above activation price, make trailstop Coeff Wider
        if self.lastPrice > self.activationPrice: # takeProfitCoeff)initialStopCoeff
            return self.activationCoeff
            return self.baseTrailStopCoeff

    # ====================
    def PlotCharts(self):    
        # return
        # self.algo.Plot("relative volatility", "relvol", (self.atr.Current.Value / self.algo.atr_percentile))
        # Plot Price on a single chart for this symbol 
        # ------------------------------------------------
        self.algo.Plot(f"{self.symbol.Value}-charts", "Price", self.lastPrice)        
        # Stop losses & activation levels
        # ------------------------------------
        self.algo.Plot(f"{self.symbol}-charts", "Initial Stop", self.initialStopLoss)
        self.algo.Plot(f"{self.symbol}-charts", "Acivation Pt", self.activationPrice)
        self.algo.Plot(f"{self.symbol}-charts", "TrailingStop", self.trailingStopLoss)
from AlgorithmImports import *
from collections import deque
import numpy as np

class WilliamsVixFix(PythonIndicator):

    Williams VixFix Indicator
    A synthetic VIX that measures the price volatility over the last N days.
    This indicator is designed to provide a VIX for any asset, overcoming the limitation
    of the traditional VIX, which is only available for select indices.

    - lookback_period: The number of days for the highest closing price lookback.
    - bollinger_length: The length of the Bollinger Bands.
    - bollinger_multiplier: Multiplier for the Bollinger Band's standard deviation.
    - percentile_lookback: Lookback period for calculating high and low percentiles.
    - high_percentile: The high percentile factor.
    - low_percentile: The low percentile factor.

    def __init__(self, algo, lookback_period=22, bollinger_length=20, bollinger_multiplier=2.0, 
                 percentile_lookback=50, high_percentile=0.85):
        # Initialization of parameters and storing algorithm reference
        self.algo = algo
        self.dynamic_period     = 0 # if this gets set, it gets used
        self.lookback_period    = lookback_period
        self.WarmUpPeriod       = lookback_period + max(bollinger_length,percentile_lookback)
        self.bollinger_length   = bollinger_length
        self.bollinger_multiplier = bollinger_multiplier
        self.percentile_lookback = percentile_lookback
        self.high_percentile = high_percentile
        # self.low_percentile = low_percentile # Not used
        self.lastInput = None
        self.current_low = 0

        # Queues to store historical closing prices and WVF values
        self.all_historical_closes = deque(maxlen=lookback_period)
        self.wvf_values = deque(maxlen=percentile_lookback)

        # Rolling windows for calculating moving averages and standard deviations
        # self.midLine_window = RollingWindow[float](bollinger_length)
        # self.standard_deviation_window = RollingWindow[float](bollinger_length)

        self.IsReady = False
        self.metrics = None

    def adjusted_lookback_period(self):
        if self.dynamic_period > 0:
            return self.dynamic_period 
            return self.lookback_period

    def adjusted_historical_closes(self):
        count = self.adjusted_lookback_period
        recent_closes_subset = deque(list(self.all_historical_closes)[:count], maxlen=count)
        return recent_closes_subset

    def SetDynamicPeriod(self, newPeriod):
        if(self.dynamic_period == newPeriod):

        self.dynamic_period = newPeriod

    # =========================
    def Update(self, input):

        # Updating the historical closes queue
        self.current_low = input.Low

        self.lastInput = input
        return self.ReCalculateIndicatorValue()
    def ReCalculateIndicatorValue(self):
        # Check if enough data is available for calculation
        if len(self.all_historical_closes) < self.adjusted_lookback_period:
            return False

        highest_close = max(self.adjusted_historical_closes)

        # Avoid division by zero
        if highest_close == 0:
            self.algo.Debug("Highest close is zero, skipping calculation")
            return False

        # Calculating the Williams Vix Fix value
        wvf = ((highest_close - self.current_low) / highest_close) * 100

        if len(self.wvf_values) < self.percentile_lookback:
            return False

        # Calculate the mid-line and standard deviation for Bollinger Bands
        mid_line = np.mean(self.wvf_values)
        sd = self.bollinger_multiplier * np.std(self.wvf_values)

        # Updating the rolling windows
        # self.midLine_window.Add(mid_line)
        # self.standard_deviation_window.Add(sd)

        # Calculating the upper band and percentile ranges
        upper_band = mid_line + sd
        # Logic for 'percentage of max high' -- like donchian with a buffer
        # ----------------------------------------------------------------------
        range_high = max(self.wvf_values) * self.high_percentile
        # Logic for proper percentile value
        # ----------------------------------------------------------------------
        # range_high = np.percentile(self.wvf_values, self.high_percentile * 100)

        # range_low = min(self.wvf_values) * self.low_percentile # Not used currently
        # extreme = 1 if (wvf >= upper_band) else 0
        extreme = 1 if (wvf >= upper_band or wvf >= range_high) else 0

        # Setting the output value
        self.metrics = {
            'wvf': wvf,
            'upper_band': upper_band,
            'range_high': range_high,
            # 'range_low' : range_low,          # Currently not used
            'extreme'   : extreme
        self.Value = wvf
        # Trigger the dispatch of the 
        self.OnUpdated(IndicatorDataPoint(self.lastInput.Symbol, self.lastInput.Time, wvf))
        self.IsReady = True
        return True

    def IsReady(self) -> bool:
        # The indicator is ready if the value has been set
        return self.metrics is not None 

# region imports
from AlgorithmImports import *
from GAPM import *
from TrailingStopHelper import *
# endregion

class GapSignals(QCAlgorithm):
    def Initialize(self):
        # inspect march 13 2013
        # self.Time < datetime(2013, 3, 14) and self.Time > datetime(2013, 3, 9)
        self.SetStartDate(2011, 1, 1)
        self.SetEndDate(2023, 12, 1)
        # tickerIndex = int(self.GetParameter("tickerIndex"))
        # self.ticker = ["QQQ","GLD","TSLA","SPY","DBB","IGE","SHY","UUP","XLU", 
        #                "AMZN","AAPL","MSFT","ADBE","V", "CME", "SPY", "GOOG", "NVDA", 
        #                "UNH", "JNJ", "DIS"][tickerIndex]

        self.ticker = "QQQ"
        self.symbol = self.AddEquity(self.ticker, Resolution.Daily).Symbol
        self.period         = int(self.GetParameter("period"))
        self.signal_period  = int(self.GetParameter("signalperiod"))
        self.rising_period  = int(self.GetParameter("risingperiod"))

        self.atr       = AverageTrueRange(14)
        self.ema_fast  = ExponentialMovingAverage(int(self.GetParameter("emafast")))
        self.ema_slow  = ExponentialMovingAverage(int(self.GetParameter("emaslow")))        
        ## Initialize the GAPM indicator & GAPM Rolling Windows
        ## -----------------------------------------------------
        self.gapm = GAPM(self.period, self.signal_period)
        self.gapmWindow      = RollingWindow[float](self.rising_period) 
        self.gapmShortWindow = RollingWindow[float](self.rising_period) 
        self.gapm.Updated += self.OnGapmUpdated

        ## Init Trailing stop helper
        self.trailingStop  = TrailingStopHelper(self,self.symbol,14,self.atr)

        ## Initialize the Volume ROC indicator
        ## -------------------------------------
        # self.vrocPeriod = int(self.GetParameter("vrocPeriod"))
        # self.vroc    = RateOfChange(self.vrocPeriod)
        # self.vrocWin = RollingWindow[float](3) # VROC Rolling window for tracking rise/fall
        # self.vroc.Updated += (lambda indicator, _ : self.vrocWin.Add(indicator.Current.Value)) 

        ## Register Indicators
        ## ------------------------------ 
        # self.RegisterIndicator(self.symbol,self.vroc, Resolution.Daily, Field.Volume)
        self.RegisterIndicator(self.symbol, self.atr, Resolution.Daily)
        self.RegisterIndicator(self.symbol, self.gapm, Resolution.Daily)
        self.RegisterIndicator(self.symbol, self.ema_fast, Resolution.Daily)
        self.RegisterIndicator(self.symbol, self.ema_slow, Resolution.Daily)
        ## Initialize the VixFix Indicator
        ## --------------------------------
        # self.vixfix = WilliamsVixFix(self, self.vfxLookback,self.vfxBBLookback,self.vfxBBCoeff,self.vfxPctLookback,self.vfxPctThresh)
        # self.RegisterIndicator(self.symbol, self.vixfix, Resolution.Daily)

    def OnGapmUpdated(self, indicator, data):
        # self.Plot("GAPM Value", "Gapm", self.gapm.GapSMAs.UpGaps)
        # # self.Plot("GAPM Value", "GapmShort", self.gapm.GapSMAs.DownGaps)
        # self.Plot("EMAs", "ema_fast", self.ema_fast.Current.Value)
        # self.Plot("EMAs", "ema_slow", self.ema_slow.Current.Value)

        # self.gapmShortWindow.Add(self.gapm.GapSMAs.DownGaps)

    def OnData(self, data):        
        if (self.ticker not in data ) or (data[self.ticker] is None):

        if not (self.gapm.IsReady and self.gapmWindow.IsReady and self.ema_fast.IsReady and self.ema_slow.IsReady):
        if not self.Portfolio.Invested:
            if self.GapmLongSignalFired() and (self.ema_fast.Current.Value >= self.ema_slow.Current.Value) :
            # if self.GapmLongSignalFired() and (self.ema_fast.Current.Value >= self.ema_slow.Current.Value) and (self.vrocWin[0] > 0) :
            # if self.GapmLongSignalFired() and (self.vrocWin[0] > self.vrocWin[1] > 0):
            # if self.GapmLongSignalFired() and (self.ema_fast.Current.Value >= self.ema_slow.Current.Value) \
            #     and (self.Time.weekday() != 4):
                # self.takeProfitPrice = data[self.ticker].Close + self.atr.Current.Value 
                self.SetHoldings(self.symbol, 1)

            # if self.GapmShortSignalFired():
            #     self.takeProfitPrice = data[self.ticker].Close - self.atr.Current.Value 
            #     self.SetHoldings(self.symbol, -1)
            if( self.Portfolio[self.ticker].IsLong ): 
                # if self.trailingStop.TrailingExitSignalFired():
                if self.GapmLongExitSignalFired():

            # if( self.Portfolio[self.ticker].IsShort ):
            #     # if(data[self.ticker].Close <= self.takeProfitPrice):
            #     #     self.Liquidate(self.symbol)                        
            #     if self.GapmShortExitSignalFired():
            #         self.Liquidate(self.symbol)        

# Try with vixfix in multi time frame algo (on daily)

    ## LONG: Multiple GAPM based exit signals
    ## ---------------------------------
    def GapmLongSignalFired(self):
        # Positive GAPM Falling
        # ----------------------
        upGapsReversedList = list(self.gapmWindow)
        upGapsFalling = all(upGapsReversedList[i] < upGapsReversedList[i+1] for i in range(len(upGapsReversedList)-1))
        # Positive GAPM Rising
        # ---------------------
        upGapsOrderedList = list(self.gapmWindow)[::-1]
        upGapsRising = all(upGapsOrderedList[i] < upGapsOrderedList[i+1] for i in range(len(upGapsOrderedList)-1))
        # Positive GAPM Turned Up (was falling now rising)
        # first half are falling, and last half are rising. 
        # if odd number, put more in first half
        # -------------------------------------------------
        # requires window of at least 4
        # if len(upGapsOrderedList) < 4:
        #     raise Exception("GAPM TurnedUp signal reqiures GAPM window to have at least 4 values")
        # else:
        #     mid = len(upGapsOrderedList) // 2
        #     firstHalfDescending = all(upGapsOrderedList[i] > upGapsOrderedList[i+1] for i in range(mid))
        #     secondHalfAscending = all(upGapsOrderedList[i] < upGapsOrderedList[i+1] for i in range(mid, len(upGapsOrderedList)-1))
        #     upGapsTurnedUp = firstHalfDescending and secondHalfAscending
        # return upGapsTurnedUp
        # return upGapsFalling
        return upGapsRising

    def TrailExitFired(self):
        if  self.trailingStop.TrailingExitSignalFired():
            self.exitMessage = self.trailingStop.ExitMessage
            return True

    ## SHORT: Multiple GAPM based exit signals
    ## Note that the gapmShortWindow has values in reverse.
    ## ----------------------------------------------------
    # def GapmShortSignalFired(self):
    #     # Negative GAPM Falling
    #     # ----------------------
    #     downGapsReversedList = list(self.gapmShortWindow)
    #     downGapsFalling      = all(downGapsReversedList[i] < downGapsReversedList[i+1] for i in range(len(downGapsReversedList)-1))
    #     # Negative GAPM Rising
    #     # ---------------------
    #     downGapsOrderedList = list(self.gapmShortWindow)[::-1]
    #     downGapsRising = all(downGapsOrderedList[i] < downGapsOrderedList[i+1] for i in range(len(downGapsOrderedList)-1))
    #     # Negative GAPM Turned Up (was falling now rising)
    #     # -------------------------------------------------
    #     # requires window of at least 4
    #     # if len(self.gapmWindow) < 4
    #     #     raise Exception("GAPM TurnedUp signal reqiures GAPM window to have at least 4 values")
    #     # gapmList = list(self.gapmWindow)
    #     # # gapmTurnedUp = ... #logic to check if first half are falling, and last half are rising. if odd number, put more in first half
    #     return downGapsRising
    #     # return downGapsRising

    ## EXITS

    def GapmLongExitSignalFired(self):
        # Check if the GAPM value is rising

        # Positive GAPM Falling
        # ----------------------
        upGapsReversedList = list(self.gapmWindow)
        upGapsFalling = all(upGapsReversedList[i] < upGapsReversedList[i+1] for i in range(len(upGapsReversedList)-1))
        # Positive GAPM Rising
        # ---------------------
        upGapsOrderedList = list(self.gapmWindow)[::-1]
        upGapsRising = all(upGapsOrderedList[i] < upGapsOrderedList[i+1] for i in range(len(upGapsOrderedList)-1))
        # gapmTurnedUp 
        # return not upGapsFalling
        return upGapsFalling
        # return not upGapsRising

    # def GapmShortExitSignalFired(self):

    #     downGapsReversedList = list(self.gapmShortWindow)
    #     downGapsFalling      = all(downGapsReversedList[i] < downGapsReversedList[i+1] for i in range(len(downGapsReversedList)-1))
    #     downGapsOrderedList = list(self.gapmShortWindow)[::-1]
    #     downGapsRising = all(downGapsOrderedList[i] < downGapsOrderedList[i+1] for i in range(len(downGapsOrderedList)-1))

    #     return not downGapsRising