Overall Statistics |
Total Orders 389 Average Win 1.74% Average Loss -1.44% Compounding Annual Return 9.329% Drawdown 14.200% Expectancy 0.409 Start Equity 100000 End Equity 316746.11 Net Profit 216.746% Sharpe Ratio 0.675 Sortino Ratio 0.47 Probabilistic Sharpe Ratio 24.866% Loss Rate 36% Win Rate 64% Profit-Loss Ratio 1.20 Alpha 0.028 Beta 0.202 Annual Standard Deviation 0.077 Annual Variance 0.006 Information Ratio -0.443 Tracking Error 0.153 Treynor Ratio 0.257 Total Fees $2186.73 Estimated Strategy Capacity $350000000.00 Lowest Capacity Asset QQQ RIWIV7K5Z9LX Portfolio Turnover 8.22% |
from AlgorithmImports import * from collections import deque import numpy as np class GAPMRatios: def __init__(self, upGaps=0, downGaps=0): self.UpGaps = upGaps self.DownGaps = downGaps pass class GAPM(PythonIndicator): """ The GAPM (Gap Measure) Indicator: Purpose: - Measures the ratio of upward price gaps to downward price gaps over a specified period. - Utilizes price gaps between the current open and the previous close to identify potential trends. Calculation: 1. Iterates over a defined 'period', calculating the gap between the opening price of the current day and the closing price of the previous day. 2. Accumulates total upward gaps (positive differences) and total downward gaps (negative differences). 3. Calculates the Gap Ratio as follows: - If there are no downward gaps (dn_gaps == 0), the Gap Ratio is set to 1 to avoid division by zero. - Otherwise, the Gap Ratio is calculated as 100 times the sum of upward gaps divided by the absolute sum of downward gaps, highlighting the dominance of upward or downward gaps within the period. 4. Applies a Simple Moving Average (SMA) to the Gap Ratio over a 'signal_period' to smooth the indicator, producing the final GAPM value. Usage: - The GAPM indicator can be used to identify potential bullish or bearish trends based on the prevalence of gap movements in one direction over another. - A higher GAPM value indicates a dominance of upward gaps, suggesting potential bullish trends, while a lower value (closer to 1) suggests a dominance of downward gaps or an equal presence of both, potentially indicating bearish trends or a lack of strong directional movement. Parameters: - 'period': The lookback period over which to calculate price gaps. - 'signal_period': The period over which to apply the SMA to the Gap Ratio, smoothing the final indicator value. Returns: - The smoothed GAPM indicator, providing a measure of the relative presence of upward to downward gaps, useful for trend identification and trading decisions. """ def __init__(self, period, signal_period): """ Initialize the GAPM indicator. :param period: The lookback period over which to calculate price gaps. :param signal_period: The period over which to apply the SMA to the Gap Ratio. """ self.period = period self.signal_period = signal_period # self.sma = SimpleMovingAverage(signal_period) # self.sma_short = SimpleMovingAverage(signal_period) self.sma = ExponentialMovingAverage(signal_period) self.sma_short = ExponentialMovingAverage(signal_period) self.prev_close = None self.gaps = deque(maxlen=period) self.Value = 0 self.WarmUpPeriod = max(period, signal_period) self.GapSMAs = GAPMRatios() @property def IsReady(self) -> bool: """ Check if the indicator is ready. :return: True if the indicator is ready, False otherwise. """ return self.sma.IsReady and self.sma_short.IsReady and \ len(self.gaps) == self.gaps.maxlen def Reset(self): """Reset the indicator to its initial state.""" self.sma.Reset() self.sma_short.Reset() self.prev_close = None self.gaps.clear() self.Value = 0 def Update(self, input_data): """ Update the GAPM indicator with the latest price data. :param input_data: The input price data (bar). :return: True if the indicator is ready, False otherwise. """ if input_data is None: return False if self.prev_close is not None: gap = input_data.Open - self.prev_close self.gaps.append(gap) up_gaps = sum(g for g in self.gaps if g > 0) dn_gaps = sum(abs(g) for g in self.gaps if g < 0) # Get Up-Gap Ratio and take SMA up_gap_ratio = 1 if dn_gaps == 0 else 100 * up_gaps / dn_gaps self.sma.Update(input_data.Time, up_gap_ratio) # Get Down-Gap Ratio and take SMA down_gap_ratio = 1 if up_gaps == 0 else 100 * dn_gaps / up_gaps self.sma_short.Update(input_data.Time, down_gap_ratio) self.GapSMAs = GAPMRatios() self.GapSMAs.UpGaps = self.sma.Current.Value self.GapSMAs.DownGaps = self.sma_short.Current.Value self.Current.Value = self.sma.Current.Value self.Value = self.Current.Value self.prev_close = input_data.Close return self.IsReady
#region imports from AlgorithmImports import * #endregion class TrailingStopHelper(): def __init__(self, algo, symbol, lookback=14, volIndicator=None, trailStopCoeff=4, initialStopCoeff=1, activationCoeff=2): self.algo = algo self.symbol = symbol self.atr = volIndicator self.atr.Updated += self.OnATRIndicatorUpdated # Track Price # ----------- self.entryPrice = 0 # Messages # --------- self.ExitMessage = "" self.EntryMessage = "" # Stop Loss State # ------------------------------- self.initialStopLoss = 0 self.baseTrailStopCoeff = trailStopCoeff self.initialStopCoeff = initialStopCoeff self.activationCoeff = activationCoeff self.ResetStopLosses() ## ==================================== def OnData(self, tradeBar): # self.atr.Update(tradeBar) pass @property def lastPrice(self): if (self.symbol in self.algo.Securities \ and self.algo.Securities[self.symbol] is not None): return self.algo.Securities[self.symbol].Price return 0 ## ==================================== def OnATRIndicatorUpdated(self, sender, updated): self.PlotCharts() # Trailing Stop Exit # This method also updates the trailing stop # ============================================ def TrailingExitSignalFired(self): if( not self.atr.IsReady ): return False # If trailing stop is NOT set, get last price, and set it # -------------------------------------------------------- if( not self.stopsLossActivated ): self.highestPrice = self.lastPrice self.trailingStopLoss = self.lastPrice - (self.atr.Current.Value * self.trailStopCoef) self.stopsLossActivated = True # Recursively call this function to check for stops # again, now that the trailing stop has been activated # and the stop loss value has been updated. # -------------------------------------------------- return self.TrailingExitSignalFired() # If trailing stop loss is activated, check if price closed below it. # If it did, then exit. If not, update the trailing stop loss. # ------------------------------------------------------------------- else: if self.PriceIsBelowTrailingStop(): return True else: # If price has gone up # ----------------------------------------------- if self.lastPrice > self.highestPrice: # If price is above the trail activation price, update trailing stop # -------------------------------------------------------- if self.lastPrice > self.activationPrice: self.highestPrice = self.lastPrice newTrailingStopLoss = self.highestPrice - (self.atr.Current.Value * self.trailStopCoef) self.trailingStopLoss = max (self.trailingStopLoss, newTrailingStopLoss) # if( self.symbol.Value == "BOOT"): #ADMA # self.algo.Log(f"[{self.symbol}][Trailing Stop Updated] ${self.trailingStopLoss:.2f} (Price: ${self.lastPrice:.2f})") # check again just in case price ends up below the new trailing stop level if self.PriceIsBelowTrailingStop(): return True return False # ------------------------------------------------ def PriceIsBelowTrailingStop(self): if( self.lastPrice < self.trailingStopLoss ): # self.algo.LogUtil.Info(f"[{self.symbol}][Trailing Stop Triggered] : ${self.lastPrice:.2f}") if self.lastPrice > self.activationPrice: self.ExitMessage = "Trail Stop Activiated" else: self.ExitMessage = "Initial Stop Triggered" self.PlotCharts() return True ## Logic to run immediately after a new position is opened. ## --------------------------------------------------------- def OnPositionOpened(self, entryPrice): # self.algo.LogUtil.Info(f"Bought {self.symbol.Value}] @ approx ${self.lastPrice:.2f}") self.EntryMessage = "" self.entryPrice = entryPrice # For trailing stop # -------------------- self.SetInitialStops() return ## Logic to run immediately after a position is closed ## --------------------------------------------------------- def OnPositionClosed(self): self.PlotCharts() # self.algo.Log(f"Sold {self.symbol.Value}] @ approx ${self.lastPrice:.2f}") self.ExitMessage = "No Exit Message" self.ResetStopLosses() # ======================================================================== def ResetStopLosses(self): self.stopsLossActivated = False self.initialStopLoss = 0 self.activationPrice = 0 self.trailingStopLoss = 0 # ======================================================================== # Set initial stop and activation level. Called after new position opened. # ======================================================================== def SetInitialStops(self): ## TODO: Use onOrderEvent to set this, because the actual price may be different self.entryPrice = self.lastPrice # self.initialStopLoss = self.lastPrice - (self.atr.Current.Value * self.trailStopCoef) self.initialStopLoss = self.entryPrice - (self.atr.Current.Value * self.initialStopCoeff) self.activationPrice = self.entryPrice + (self.atr.Current.Value * self.activationCoeff) @property def trailStopCoef(self): # Volatility Test # -------------------------------------------------------------------- # testing: adjusting based on relative volatility (per atr percentile) # return (self.baseTrailStopCoeff * (self.atr.Current.Value / self.algo.atr_percentile)) # Logic to Use Activation. # If price above activation price, make trailstop Coeff Wider if self.lastPrice > self.activationPrice: # takeProfitCoeff)initialStopCoeff return self.activationCoeff else: return self.baseTrailStopCoeff # ==================== def PlotCharts(self): # return # self.algo.Plot("relative volatility", "relvol", (self.atr.Current.Value / self.algo.atr_percentile)) # Plot Price on a single chart for this symbol # ------------------------------------------------ self.algo.Plot(f"{self.symbol.Value}-charts", "Price", self.lastPrice) # Stop losses & activation levels # ------------------------------------ self.algo.Plot(f"{self.symbol}-charts", "Initial Stop", self.initialStopLoss) self.algo.Plot(f"{self.symbol}-charts", "Acivation Pt", self.activationPrice) self.algo.Plot(f"{self.symbol}-charts", "TrailingStop", self.trailingStopLoss) return
from AlgorithmImports import * from collections import deque import numpy as np class WilliamsVixFix(PythonIndicator): """ Williams VixFix Indicator ========================= A synthetic VIX that measures the price volatility over the last N days. This indicator is designed to provide a VIX for any asset, overcoming the limitation of the traditional VIX, which is only available for select indices. Attributes: ----------- - lookback_period: The number of days for the highest closing price lookback. - bollinger_length: The length of the Bollinger Bands. - bollinger_multiplier: Multiplier for the Bollinger Band's standard deviation. - percentile_lookback: Lookback period for calculating high and low percentiles. - high_percentile: The high percentile factor. - low_percentile: The low percentile factor. """ def __init__(self, algo, lookback_period=22, bollinger_length=20, bollinger_multiplier=2.0, percentile_lookback=50, high_percentile=0.85): # Initialization of parameters and storing algorithm reference self.algo = algo self.dynamic_period = 0 # if this gets set, it gets used self.lookback_period = lookback_period self.WarmUpPeriod = lookback_period + max(bollinger_length,percentile_lookback) self.bollinger_length = bollinger_length self.bollinger_multiplier = bollinger_multiplier self.percentile_lookback = percentile_lookback self.high_percentile = high_percentile # self.low_percentile = low_percentile # Not used self.lastInput = None self.current_low = 0 # Queues to store historical closing prices and WVF values self.all_historical_closes = deque(maxlen=lookback_period) self.wvf_values = deque(maxlen=percentile_lookback) # Rolling windows for calculating moving averages and standard deviations # self.midLine_window = RollingWindow[float](bollinger_length) # self.standard_deviation_window = RollingWindow[float](bollinger_length) self.IsReady = False self.metrics = None @property def adjusted_lookback_period(self): if self.dynamic_period > 0: return self.dynamic_period else: return self.lookback_period @property def adjusted_historical_closes(self): count = self.adjusted_lookback_period recent_closes_subset = deque(list(self.all_historical_closes)[:count], maxlen=count) return recent_closes_subset def SetDynamicPeriod(self, newPeriod): if(self.dynamic_period == newPeriod): return self.dynamic_period = newPeriod self.ReCalculateIndicatorValue() # ========================= def Update(self, input): # Updating the historical closes queue self.current_low = input.Low self.all_historical_closes.appendleft(input.Close) self.lastInput = input return self.ReCalculateIndicatorValue() def ReCalculateIndicatorValue(self): # Check if enough data is available for calculation if len(self.all_historical_closes) < self.adjusted_lookback_period: return False highest_close = max(self.adjusted_historical_closes) # Avoid division by zero if highest_close == 0: self.algo.Debug("Highest close is zero, skipping calculation") return False # Calculating the Williams Vix Fix value wvf = ((highest_close - self.current_low) / highest_close) * 100 self.wvf_values.appendleft(wvf) if len(self.wvf_values) < self.percentile_lookback: return False # Calculate the mid-line and standard deviation for Bollinger Bands mid_line = np.mean(self.wvf_values) sd = self.bollinger_multiplier * np.std(self.wvf_values) # Updating the rolling windows # self.midLine_window.Add(mid_line) # self.standard_deviation_window.Add(sd) # Calculating the upper band and percentile ranges upper_band = mid_line + sd # Logic for 'percentage of max high' -- like donchian with a buffer # ---------------------------------------------------------------------- range_high = max(self.wvf_values) * self.high_percentile # Logic for proper percentile value # ---------------------------------------------------------------------- # range_high = np.percentile(self.wvf_values, self.high_percentile * 100) # range_low = min(self.wvf_values) * self.low_percentile # Not used currently # extreme = 1 if (wvf >= upper_band) else 0 extreme = 1 if (wvf >= upper_band or wvf >= range_high) else 0 # Setting the output value self.metrics = { 'wvf': wvf, 'upper_band': upper_band, 'range_high': range_high, # 'range_low' : range_low, # Currently not used 'extreme' : extreme } self.Value = wvf # Trigger the dispatch of the self.OnUpdated(IndicatorDataPoint(self.lastInput.Symbol, self.lastInput.Time, wvf)) self.IsReady = True return True def IsReady(self) -> bool: # The indicator is ready if the value has been set return self.metrics is not None
# region imports from AlgorithmImports import * from GAPM import * from TrailingStopHelper import * # endregion class GapSignals(QCAlgorithm): def Initialize(self): # inspect march 13 2013 # self.Time < datetime(2013, 3, 14) and self.Time > datetime(2013, 3, 9) self.SetStartDate(2011, 1, 1) self.SetEndDate(2023, 12, 1) self.SetCash(100000) # tickerIndex = int(self.GetParameter("tickerIndex")) # self.ticker = ["QQQ","GLD","TSLA","SPY","DBB","IGE","SHY","UUP","XLU", # "AMZN","AAPL","MSFT","ADBE","V", "CME", "SPY", "GOOG", "NVDA", # "UNH", "JNJ", "DIS"][tickerIndex] self.ticker = "QQQ" self.SetBenchmark(self.ticker) self.symbol = self.AddEquity(self.ticker, Resolution.Daily).Symbol self.period = int(self.GetParameter("period")) self.signal_period = int(self.GetParameter("signalperiod")) self.rising_period = int(self.GetParameter("risingperiod")) self.atr = AverageTrueRange(14) self.ema_fast = ExponentialMovingAverage(int(self.GetParameter("emafast"))) self.ema_slow = ExponentialMovingAverage(int(self.GetParameter("emaslow"))) ## Initialize the GAPM indicator & GAPM Rolling Windows ## ----------------------------------------------------- self.gapm = GAPM(self.period, self.signal_period) self.gapmWindow = RollingWindow[float](self.rising_period) self.gapmShortWindow = RollingWindow[float](self.rising_period) self.gapm.Updated += self.OnGapmUpdated ## Init Trailing stop helper self.trailingStop = TrailingStopHelper(self,self.symbol,14,self.atr) ## Initialize the Volume ROC indicator ## ------------------------------------- # self.vrocPeriod = int(self.GetParameter("vrocPeriod")) # self.vroc = RateOfChange(self.vrocPeriod) # self.vrocWin = RollingWindow[float](3) # VROC Rolling window for tracking rise/fall # self.vroc.Updated += (lambda indicator, _ : self.vrocWin.Add(indicator.Current.Value)) ## Register Indicators ## ------------------------------ # self.RegisterIndicator(self.symbol,self.vroc, Resolution.Daily, Field.Volume) self.RegisterIndicator(self.symbol, self.atr, Resolution.Daily) self.RegisterIndicator(self.symbol, self.gapm, Resolution.Daily) self.RegisterIndicator(self.symbol, self.ema_fast, Resolution.Daily) self.RegisterIndicator(self.symbol, self.ema_slow, Resolution.Daily) ## Initialize the VixFix Indicator ## -------------------------------- # self.vixfix = WilliamsVixFix(self, self.vfxLookback,self.vfxBBLookback,self.vfxBBCoeff,self.vfxPctLookback,self.vfxPctThresh) # self.RegisterIndicator(self.symbol, self.vixfix, Resolution.Daily) def OnGapmUpdated(self, indicator, data): # self.Plot("GAPM Value", "Gapm", self.gapm.GapSMAs.UpGaps) # # self.Plot("GAPM Value", "GapmShort", self.gapm.GapSMAs.DownGaps) # self.Plot("EMAs", "ema_fast", self.ema_fast.Current.Value) # self.Plot("EMAs", "ema_slow", self.ema_slow.Current.Value) self.gapmWindow.Add(self.gapm.GapSMAs.UpGaps) # self.gapmShortWindow.Add(self.gapm.GapSMAs.DownGaps) return def OnData(self, data): if (self.ticker not in data ) or (data[self.ticker] is None): return if not (self.gapm.IsReady and self.gapmWindow.IsReady and self.ema_fast.IsReady and self.ema_slow.IsReady): return if not self.Portfolio.Invested: if self.GapmLongSignalFired() and (self.ema_fast.Current.Value >= self.ema_slow.Current.Value) : # if self.GapmLongSignalFired() and (self.ema_fast.Current.Value >= self.ema_slow.Current.Value) and (self.vrocWin[0] > 0) : # if self.GapmLongSignalFired() and (self.vrocWin[0] > self.vrocWin[1] > 0): # if self.GapmLongSignalFired() and (self.ema_fast.Current.Value >= self.ema_slow.Current.Value) \ # and (self.Time.weekday() != 4): # self.takeProfitPrice = data[self.ticker].Close + self.atr.Current.Value self.SetHoldings(self.symbol, 1) self.trailingStop.OnPositionOpened(self.CurrentSlice[self.symbol].Close) # if self.GapmShortSignalFired(): # self.takeProfitPrice = data[self.ticker].Close - self.atr.Current.Value # self.SetHoldings(self.symbol, -1) else: if( self.Portfolio[self.ticker].IsLong ): # if self.trailingStop.TrailingExitSignalFired(): if self.GapmLongExitSignalFired(): self.Liquidate(self.symbol) self.trailingStop.OnPositionClosed() # if( self.Portfolio[self.ticker].IsShort ): # # if(data[self.ticker].Close <= self.takeProfitPrice): # # self.Liquidate(self.symbol) # if self.GapmShortExitSignalFired(): # self.Liquidate(self.symbol) # Try with vixfix in multi time frame algo (on daily) ## LONG: Multiple GAPM based exit signals ## --------------------------------- def GapmLongSignalFired(self): # Positive GAPM Falling # ---------------------- upGapsReversedList = list(self.gapmWindow) upGapsFalling = all(upGapsReversedList[i] < upGapsReversedList[i+1] for i in range(len(upGapsReversedList)-1)) # Positive GAPM Rising # --------------------- upGapsOrderedList = list(self.gapmWindow)[::-1] upGapsRising = all(upGapsOrderedList[i] < upGapsOrderedList[i+1] for i in range(len(upGapsOrderedList)-1)) # Positive GAPM Turned Up (was falling now rising) # first half are falling, and last half are rising. # if odd number, put more in first half # ------------------------------------------------- # requires window of at least 4 # if len(upGapsOrderedList) < 4: # raise Exception("GAPM TurnedUp signal reqiures GAPM window to have at least 4 values") # else: # mid = len(upGapsOrderedList) // 2 # firstHalfDescending = all(upGapsOrderedList[i] > upGapsOrderedList[i+1] for i in range(mid)) # secondHalfAscending = all(upGapsOrderedList[i] < upGapsOrderedList[i+1] for i in range(mid, len(upGapsOrderedList)-1)) # upGapsTurnedUp = firstHalfDescending and secondHalfAscending # return upGapsTurnedUp # return upGapsFalling return upGapsRising def TrailExitFired(self): if self.trailingStop.TrailingExitSignalFired(): self.exitMessage = self.trailingStop.ExitMessage return True ## SHORT: Multiple GAPM based exit signals ## Note that the gapmShortWindow has values in reverse. ## ---------------------------------------------------- # def GapmShortSignalFired(self): # # Negative GAPM Falling # # ---------------------- # downGapsReversedList = list(self.gapmShortWindow) # downGapsFalling = all(downGapsReversedList[i] < downGapsReversedList[i+1] for i in range(len(downGapsReversedList)-1)) # # Negative GAPM Rising # # --------------------- # downGapsOrderedList = list(self.gapmShortWindow)[::-1] # downGapsRising = all(downGapsOrderedList[i] < downGapsOrderedList[i+1] for i in range(len(downGapsOrderedList)-1)) # # Negative GAPM Turned Up (was falling now rising) # # ------------------------------------------------- # # requires window of at least 4 # # if len(self.gapmWindow) < 4 # # raise Exception("GAPM TurnedUp signal reqiures GAPM window to have at least 4 values") # # gapmList = list(self.gapmWindow) # # # gapmTurnedUp = ... #logic to check if first half are falling, and last half are rising. if odd number, put more in first half # return downGapsRising # # return downGapsRising ## EXITS ######################## def GapmLongExitSignalFired(self): # Check if the GAPM value is rising # Positive GAPM Falling # ---------------------- upGapsReversedList = list(self.gapmWindow) upGapsFalling = all(upGapsReversedList[i] < upGapsReversedList[i+1] for i in range(len(upGapsReversedList)-1)) # Positive GAPM Rising # --------------------- upGapsOrderedList = list(self.gapmWindow)[::-1] upGapsRising = all(upGapsOrderedList[i] < upGapsOrderedList[i+1] for i in range(len(upGapsOrderedList)-1)) # gapmTurnedUp # return not upGapsFalling return upGapsFalling # return not upGapsRising # def GapmShortExitSignalFired(self): # downGapsReversedList = list(self.gapmShortWindow) # downGapsFalling = all(downGapsReversedList[i] < downGapsReversedList[i+1] for i in range(len(downGapsReversedList)-1)) # downGapsOrderedList = list(self.gapmShortWindow)[::-1] # downGapsRising = all(downGapsOrderedList[i] < downGapsOrderedList[i+1] for i in range(len(downGapsOrderedList)-1)) # return not downGapsRising