Overall Statistics |
Total Trades 0 Average Win 0% Average Loss 0% Compounding Annual Return 0% Drawdown 0% Expectancy 0 Net Profit 0% Sharpe Ratio 0 Probabilistic Sharpe Ratio 0% Loss Rate 0% Win Rate 0% Profit-Loss Ratio 0 Alpha 0 Beta 0 Annual Standard Deviation 0 Annual Variance 0 Information Ratio 0 Tracking Error 0 Treynor Ratio 0 Total Fees $0.00 Estimated Strategy Capacity $0 Lowest Capacity Asset |
from AlgorithmImports import * from collections import deque # https://github.com/QuantConnect/Lean/blob/master/Algorithm.Python/CustomIndicatorAlgorithm.py class ATRTrailingStop(): # class ATRTraillingStop(PythonIndicator): # no longer inheriting from PythonIndicator because we need to be able # to pass in a custom data to Update '''Maybe Aaron should write this description''' def __init__(self, name, multiplier): self.Name = name # self.Value = 0 self.Current = IndicatorDataPoint() self.queue = deque([0,0], maxlen=1) self.close_queue = deque([0,0], maxlen=1) self.multiplier = multiplier self.IsReady = False # self.Debug = algo.Debug def __repr__(self): return "{0} -> IsReady: {1}. Time: {2}. Value: {3}".format(self.Name, self.IsReady, self.Time, self.Value) def Update(self, bar, atr_value): ''' Updates the current value of the indicator and determines if it is ready to use Args: bar (object): TradeBar object atr_value (object): IndicatorDataPoint object from an AverageTrueRange indicator Returns: (bool): is the indicator ready to use? ''' # self.Time = bar.EndTime if self.close_queue[0] != 0: prev_atr_stop_val = self.queue[0] prev_close_price = self.close_queue[0] current_close_price = bar.Close nLoss = self.multiplier * atr_value new_atr_stop_val = 0 if current_close_price > prev_atr_stop_val and prev_close_price > prev_atr_stop_val: new_atr_stop_val = max(prev_atr_stop_val, current_close_price - nLoss) elif current_close_price < prev_atr_stop_val and prev_close_price < prev_atr_stop_val: new_atr_stop_val = min(prev_atr_stop_val, current_close_price + nLoss) elif current_close_price > prev_atr_stop_val: new_atr_stop_val = current_close_price - nLoss else: new_atr_stop_val = current_close_price + nLoss self.queue.appendleft(new_atr_stop_val) # self.Value = new_atr_val self.Current = IndicatorDataPoint(bar.EndTime, new_atr_stop_val) self.close_queue.appendleft(bar.Close) self.IsReady = self.queue[0] != 0 # return len(self.queue) == 2
from AlgorithmImports import * from collections import deque # https://github.com/QuantConnect/Lean/blob/master/Algorithm.Python/CustomIndicatorAlgorithm.py class CustomEma(): ''' Custom EMA Indicator that uses an SMA as the first EMA value https://www.quantconnect.com/forum/discussion/3383/custom-indicator-in-python-algorithm/p1 ''' def __init__(self, name, ema_length): self.Name = name # self.Value = 0 self.Current = IndicatorDataPoint() self.IsReady = False self.close_queue = deque(maxlen=ema_length) self.ema_queue = deque(maxlen=1) self.ema_length = ema_length self.ema_multiplier = 2/(ema_length+1) def __repr__(self): return "{0} -> IsReady: {1}. Time: {2}. Value: {3}".format(self.Name, self.IsReady, self.Time, self.Value) def sma(self, values, divider): '''Calculates the simple moving average of a list of values and a divider''' return sum(values) / divider def Update(self, input): ''' Updates the current value of the indicator and determines if it is ready to use Args: input (object): IBaseData object Returns: (bool): is the indicator ready? ''' close = input.Close self.close_queue.appendleft(close) close_count = len(self.close_queue) # self.Time = input.EndTime ema = 0 if close_count < self.ema_length: # if we don't have enough data yet to calculate an SMA # just push the close price into the queue self.ema_queue.appendleft(close) elif close_count >= self.ema_length: if close_count == self.ema_length: # if we have exactly enough data in the queue to calculate an # SMA, use that (this is our starting value) # self.ema_queue.appendleft(close) ema = self.sma(self.close_queue, self.ema_length) elif close_count > self.ema_length: # https://www.investopedia.com/ask/answers/122314/what-exponential-moving-average-ema-formula-and-how-ema-calculated.asp # https://github.com/QuantConnect/Lean/blob/master/Indicators/ExponentialMovingAverage.cs ema = close * self.ema_multiplier + self.ema_queue[0] * (1 - self.ema_multiplier) self.ema_queue.appendleft(ema) self.Current = IndicatorDataPoint(input.EndTime, ema) self.IsReady = True
from AlgorithmImports import * from collections import deque import os import math import numpy as np import pickle import gzip class CustomHelpers: '''A class of custom utility functions''' def Cross(direction, a, b, algorithm): ''' Determines whether the ''' if direction not in ['under', 'over', 'any']: raise ValueError("direction must be 'over', 'under' or 'any'") if not isinstance(algorithm, QCAlgorithm): raise ValueError("algorithm must be an instance of QCAlgorithm") a_is_iterable = False b_is_iterable = False if isinstance(a, (RollingWindow, list, deque)): a_is_iterable = True if isinstance(a[0], IndicatorDataPoint): a = [x.Value for x in a] if isinstance(a[0], TradeBar): a = [x.Close for x in a] if not isinstance(a[0], (int, float)): raise TypeError("parsed values for iterable 2nd argument must be of type float or int") elif not isinstance(a, (int, float)): raise TypeError("simple value for 2nd argument must be of type float or int") if isinstance(b, (RollingWindow, list, deque)): b_is_iterable = True if isinstance(a[0], IndicatorDataPoint): b = [x.Value for x in b] if isinstance(b[0], TradeBar): b = [x.Close for x in b] if not isinstance(b[0], (int, float)): raise TypeError("parsed values for iterable 3rd argument must be of type float or int") elif not isinstance(b, (int, float)): raise TypeError("simple value for 3rd argument must be of type float or int") # everything passes the sniff test, do the thing... if direction is 'over' and a_is_iterable and b_is_iterable: return a[1] <= b[1] and a[0] > b[0] elif direction is 'over' and a_is_iterable and not b_is_iterable: return a[1] <= b and a[0] > b elif direction is 'under' and a_is_iterable and b_is_iterable: return a[1] >= b[1] and a[0] < b[0] elif direction is 'under' and a_is_iterable and not b_is_iterable: return a[1] >= b and a[0] < b elif direction is 'any' and a_is_iterable and b_is_iterable: return (a[1] <= b and a[0] > b) or (a[1] >= b[1] and a[0] < b[0]) elif direction is 'any' and a_is_iterable and not b_is_iterable: return (a[1] <= b and a[0] > b) or (a[1] >= b and a[0] < b) def IndicatorCrossover(window_1, window_2, algorithm): ''' Determines whether one rolling indicator iterable curossed over another Args: window_1 (list): RollingWindow of IndicatorDataPoint objects window_2 (list): RollingWindow of IndicatorDataPoint objects Returns: bool ''' return window_1[1].Value <= window_2[1].Value and window_1[0].Value > window_2[0].Value def IndicatorCrossunder(window_1, window_2, algorithm): ''' Determines whether one rolling indicator window curossed under another Args: window_1 (list): RollingWindow of IndicatorDataPoint objects window_2 (list): RollingWindow of IndicatorDataPoint objects Returns: bool ''' return window_1[1].Value >= window_2[1].Value and window_1[0].Value < window_2[0].Value def EquityCrossoverIndicator(equity_window, indicator_window, algorithm): ''' Determines whether a rolling equity window crossed over a rolling indicator window Args: equity_window (list): RollingWindow of TradeBar objects indicator_window (list): RollingWindow of IndicatorDataPoint objects Returns: bool ''' return equity_window[1].Close <= indicator_window[1].Value and equity_window[0].Close > indicator_window[0].Value def EquityCrossunderIndicator(equity_window, indicator_window, algorithm): ''' Determines whether a rolling equity window crossed under a rolling indicator window Args: equity_window (list): RollingWindow of TradeBar objects indicator_window (list): RollingWindow of IndicatorDataPoint objects Returns: bool ''' return equity_window[1].Close >= indicator_window[1].Value and equity_window[0].Close < indicator_window[0].Value def EquityCrossoverValue(equity_window, value, algorithm): ''' Determines whether a rolling equity window crossed over a static value Args: equity_window (list): RollingWindow of TradeBar objects value (float): RollingWindow of IndicatorDataPoint objects Returns: bool ''' return equity_window[1].Close <= value and equity_window[0].Close > value def EquityCrossunderValue(equity_window, value, algorithm): ''' Determines whether a rolling equity window crossed under a static value Args: equity_window (list): RollingWindow of TradeBar objects value (float): value to check the equity_window against Returns: bool ''' return equity_window[1].Close >= value and equity_window[0].Close < value def IndicatorCrossoverValue(indicator_window, value, algorithm): ''' Determines whether a rolling indicator window crossed over a static value Args: indicator_window (list): RollingWindow of IndicatorDataPoint objects value (float): value to check the indicator window against Returns: bool ''' return indicator_window[1].Value <= value and indicator_window[0].Value > value def IndicatorCrossunderValue(indicator_window, value, algorithm): ''' Determines whether a rolling indicator window crossed under a static value Args: indicator_window (list): RollingWindow of IndicatorDataPoint objects value (float): value to check the indicator window against Returns: bool ''' return indicator_window[1].Value >= value and indicator_window[0].Value < value def IndicatorCrossedValue(indicator_window, value, algorithm): ''' Determines whether a rolling indicator window crossed either over OR under a static value Args: indicator_window (list): RollingWindow of IndicatorDataPoint objects value (float): value to check the indicator window against Returns: bool ''' return CustomHelpers.IndicatorCrossoverValue(indicator_window, value, algorithm) \ or CustomHelpers.IndicatorCrossunderValue(indicator_window, value, algorithm) def IndicatorCrossoverValueWindow(indicator_window, value_window, algorithm): ''' Determines whether a rolling indicator window crossed over a simple value window Args: indicator_window (list): RollingWindow of IndicatorDataPoint objects value_window (list): RollingWindow of int or float values Returns: bool ''' return indicator_window[1].Value <= value_window[1] and indicator_window[0].Value > value_window[0] def IndicatorCrossunderValueWindow(indicator_window, value_window, algorithm): ''' Determines whether a rolling indicator window crossed over a simple value window Args: indicator_window (list): RollingWindow of IndicatorDataPoint objects value_window (list): RollingWindow of int or float values Returns: bool ''' return indicator_window[1].Value >= value_window[1] and indicator_window[0].Value < value_window[0] def NanToZero(input): ''' Checks if the input is a valid number. Retruns the number if so, otherwise returns zero Args: input (any): the value to check Returns: int|float ''' return 0 if math.isnan(input) else input def ATRTrailingStopValue(equity_window, atr_window, multiplier): ''' Description: Determines the current ATR Stoploss Value Note: this was first ATRTrailingStop attempt. I've made a custom indicator for this instead since we need to keep it in a rolling window Args: equity_window (list): rolling window of trade bars atr_window (list): rolling window of AverageTrueRange IndicatorDataPoint objects multiplier (float): ¯\_(ツ)_/¯ Returns: (float): the current ATR Trailing Stop value ''' nLoss = atr_window[0].Value * multiplier atr_val = atr_window[1].Value or 0 if equity_window[0] > atr_val and equity_window[1] > atr_val: return max(atr_val, equity_window[0] - nLoss) elif equity_window[0] < atr_val and equity_window[1] < atr_val: return min(atr_val, equity_window + nLoss) elif equity_window[0] > atr_val: return equity_window[0] - nLoss else: return equity_window[0] + nLoss def IsMarketHours(algorithm): ''' Description: Determines if the current time is within normal US market hours Returns: (bool) ''' return (algorithm.Time.hour == 9 and algorithm.Time.minute >= 30) \ or (algorithm.Time.hour > 9 and algorithm.Time.hour < 16) \ or (algorithm.Time.hour == 16 and algorithm.Time.minute == 00) def Timedelta64ToMinutes(timedelta64): ''' Description: Converts a numpy timedelta64 object to minutes (float) Returns: (float) ''' return timedelta64.astype('timedelta64[s]').item().total_seconds() / 60 def hurst(ts): """Returns the Hurst Exponent of the time series vector ts""" # Create the range of lag values lags = range(2, 100) # Calculate the array of the variances of the lagged differences tau = [np.sqrt(np.std(np.subtract(ts[lag:], ts[:-lag]))) for lag in lags] # Use a linear fit to estimate the Hurst Exponent poly = np.polyfit(np.log(lags), np.log(tau), 1) # Return the Hurst exponent from the polyfit output return poly[0] * 2.0 ############################# Variance Ratio Functions ##################################### # https://github.com/letianzj/QuantResearch/blob/master/notebooks/mean_reversion.py def normcdf(X): (a1, a2, a3, a4, a5) = (0.31938153, -0.356563782, 1.781477937, -1.821255978, 1.330274429) L = abs(X) K = 1.0 / (1.0 + 0.2316419 * L) w = 1.0 - 1.0 / np.sqrt(2 * np.pi) * np.exp(-L * L / 2.) * ( a1 * K + a2 * K * K + a3 * pow(K, 3) + a4 * pow(K, 4) + a5 * pow(K, 5)) if X < 0: w = 1.0 - w return w def vratio(a, lag=2, cor='hom'): t = (np.std((a[lag:]) - (a[1:-lag + 1]))) ** 2 b = (np.std((a[2:]) - (a[1:-1]))) ** 2 n = float(len(a)) mu = sum(a[1:len(a)] - a[:-1]) / n m = (n - lag + 1) * (1 - lag / n) # print mu, m, lag b = sum(np.square(a[1:len(a)] - a[:len(a) - 1] - mu)) / (n - 1) t = sum(np.square(a[lag:len(a)] - a[:len(a) - lag] - lag * mu)) / m vratio = t / (lag * b) la = float(lag) if cor == 'hom': varvrt = 2 * (2 * la - 1) * (la - 1) / (3 * la * n) elif cor == 'het': varvrt = 0 sum2 = sum(np.square(a[1:len(a)] - a[:len(a) - 1] - mu)) for j in range(lag - 1): sum1a = np.square(a[j + 1:len(a)] - a[j:len(a) - 1] - mu) sum1b = np.square(a[1:len(a) - j] - a[0:len(a) - j - 1] - mu) sum1 = np.dot(sum1a, sum1b) delta = sum1 / (sum2 ** 2) varvrt = varvrt + ((2 * (la - j) / la) ** 2) * delta zscore = (vratio - 1) / np.sqrt(float(varvrt)) pval = CustomHelpers.normcdf(zscore) return vratio, zscore, pval ############################# Object Store Functions ##################################### # https://www.quantconnect.com/forum/discussion/9000/objectstore-deleting-all-saved-data-in-a-project/p1 def bytesto(bytes, to, bsize=1024): a = {'kb' : 1, 'mb': 2, 'gb' : 3, 'tb' : 4, 'pb' : 5, 'eb' : 6 } r = float(bytes) return bytes / (bsize ** a[to]) def ObjectStoreOverview(algorithm, storage_limit=50, file_limit=1000): """ Some basic ObjectStore information. """ keys = [str(j).split(',')[0][1:] for _, j in enumerate(algorithm.ObjectStore.GetEnumerator())] sizes = [os.path.getsize(algorithm.ObjectStore.GetFilePath(key)) for key in keys] remaining = storage_limit - sum(sizes)/1e6 print(f'Remaining storage: {remaining} MB.') print(f'Remaining file limit: {file_limit-len(keys)}') # For human readability converted_sizes = [] for size in sizes: if size < 1024: s = f'{size} bytes' elif size < 1024: s = f'{size/1000} KB' else: s = f'{size/1e+6} MB' converted_sizes.append(s) print('--') print('Key, Size') for file in zip(keys, converted_sizes,): #timestamps): print(' '.join(file)) return keys def clear_ObjectStore(alogrithm): keys = [str(j).split(',')[0][1:] for _, j in enumerate(alogrithm.ObjectStore.GetEnumerator())] for key in keys: alogrithm.ObjectStore.Delete(key) def serialize_and_compress(data): ''' Serializes and compresses data using gzip and pickle for storage in ObjectStore using SaveBytes ''' return gzip.compress(pickle.dumps(data)) def unserialize_and_decompress_bytes(data): ''' Unserializes and decompresses data that was serialized and compressed using our serialize_and_compress function and stored using ObjectStore.SaveBytes and then refetched using ObjectStore.ReadBytes ''' return pickle.loads(gzip.decompress(bytes(data)))
from AlgorithmImports import * from collections import deque from math import sqrt # https://github.com/QuantConnect/Lean/blob/master/Algorithm.Python/CustomIndicatorAlgorithm.py class EmaSampleStdDev(): ''' Custom indicator that calculates the Z-Score of a Sample Standard Deviation of an Exponential Weighted Moving Average of prices for a given ema sample period and deviation length https://www.quantconnect.com/forum/discussion/3383/custom-indicator-in-python-algorithm/p1 ''' def __init__(self, name, ema_length, deviation_length): self.Name = name # self.Value = 0 self.Current = IndicatorDataPoint() self.IsReady = False self.close_queue = deque(maxlen=deviation_length) self.ema_queue = deque(maxlen=deviation_length+ema_length) self.ema_length = ema_length self.ema_multiplier = 2/(ema_length+1) self.deviation_length = deviation_length def __repr__(self): return "{0} -> IsReady: {1}. Time: {2}. Value: {3}".format(self.Name, self.IsReady, self.Time, self.Value) def sma(self, values, divider): '''Calculates the simple moving average of a list of values and a divider''' return sum(values) / divider def Update(self, input): ''' Updates the current value of the indicator and determines if it is ready to use Args: input (object): IBaseData object Returns: (bool): is the indicator ready? ''' close = input.Close self.close_queue.appendleft(close) close_count = len(self.close_queue) # self.Time = input.EndTime ema = 0 if close_count < self.ema_length: # if we don't have enough data yet to calculate an SMA # just push the close price into the queue self.ema_queue.appendleft(close) elif close_count >= self.ema_length: if close_count == self.ema_length: # if we have exactly enough data in the queue to calculate an # SMA, use that (this is our starting value) # self.ema_queue.appendleft(close) ema = self.sma(self.close_queue, self.ema_length) elif close_count > self.ema_length: # https://www.investopedia.com/ask/answers/122314/what-exponential-moving-average-ema-formula-and-how-ema-calculated.asp # https://github.com/QuantConnect/Lean/blob/master/Indicators/ExponentialMovingAverage.cs ema = close * self.ema_multiplier + self.ema_queue[0] * (1 - self.ema_multiplier) self.ema_queue.appendleft(ema) if len(self.ema_queue) == self.deviation_length + self.ema_length: StdSampDev = sqrt( self.sma( [(close_price - self.ema_queue[idx])**2 for idx, close_price in enumerate(self.close_queue)], # self.close_queue must be at least deviation_length self.deviation_length - 1 ) ) z_score = (close - ema) / StdSampDev self.Current = IndicatorDataPoint(input.EndTime, z_score) # self.Current = IndicatorDataPoint(input.EndTime, ema) # self.Value = z_score # return len(self.ema_queue) == self.deviation_length self.IsReady = len(self.ema_queue) == self.deviation_length + self.ema_length # self.IsReady = len(self.ema_queue) == self.ema_length # self.Current = IndicatorDataPoint(input.EndTime, self.ema_queue[0])
# Mean reversion is a concept in finance that explains, in the long term, the price will typically revert back to a mean. # The Reverend is a compilation of mean reversion strategies that you can select and test on specific instruments to determine the strategy and perameters you may want to execute. # Each of the strategies you can select involve some variation of a standard deviation Z score applied to the dataset compared to a statistical mean or baseline. # The baselines coded below include a linnear regression baseline, a price average baseline (SMA), an EMA baseline, and a volume weighted average price baseline. Computation options include Sample or Population Standard Deviation formulas. # In general, the further the Z score is away from the mean/baseline (0 on the Z score scale), the more it is statistically improbable that it will stay there. Under normative distribution, 95% of the dataset sits between -2 and +2. # The Reverend buys an instrument (long) when it is far away from the mean, and sells when the price reverts closer to the mean. # It operationalizes a buy using a trailing function on the z-score after the z-score has dipped below a preset value, and buys when price reverses over the trailing value (also a % setting). # It operationalizes a sell using an ATR TSL when the z-score reverts to 0. # When the z-score reverts to 0, the code checks to see if an ATR TSL would have sold already, and if so, sells (this is the ATR Bail function). # If the ATR TSL would not have sold already, it "turns on" an ATR TSL when the z-score crosses 0 and follows the rest of the trend until the ATR TSL signals to sell. from AlgorithmImports import * from custom.CustomHelpers import CustomHelpers from custom.EmaSampleStdDev import EmaSampleStdDev from datetime import datetime,timedelta from QuantConnect.Data import Custom from UniverseSelectionData import UniverseSelectionData from SymbolData import SymbolData import time class TheReverendAlgorithm(QCAlgorithm): def Initialize(self): # setup back testing parameters # self.SetStartDate(2015, 7, 1) # self.SetEndDate(2020, 7, 1) self.benchmark_symbol = "SPY" self.AddEquity(self.benchmark_symbol, Resolution.Minute) self.SetBenchmark(self.benchmark_symbol) # set statistics reference self.SetStartDate(2021, 7, 12) self.SetEndDate(2021, 7, 13) self.SetCash(10000000) self.SetBrokerageModel(BrokerageName.InteractiveBrokersBrokerage) # import algorithm parameters self.max_drawdown_perc = float(self.GetParameter("max-drawdown-percent")) self.bar_length_in_minutes = int(self.GetParameter("bar-length-in-minutes")) self.ema_length = int(self.GetParameter("ema-length")) self.deviation_length = int(self.GetParameter("deviation-length")) self.min_dollar_volume = int(self.GetParameter("min-dollar-volume")) self.min_price = int(self.GetParameter("min-price")) self.min_market_cap = int(self.GetParameter("min-market-cap")) self.vix_length = int(self.GetParameter("vix-length")) self.vix_filter_max = int(self.GetParameter("vix-filter-max")) self.ztp = float(self.GetParameter("ztp")) self.short_ztp = float(self.GetParameter("short-ztp")) self.resolution = Resolution.Minute self.UniverseSettings.Resolution = self.resolution self.AddUniverse(self.CoarseSelectionFilter, self.FineSelectionFilter) # tracks our maximum portfolio value self.max_portfolio_value = self.Portfolio.TotalPortfolioValue # tracks the highest vix ema z-score from the last {self.vix_length} bars self.vix_zscore_max = 0 # stores a UniverseSelectionData instance for each symbol we tracking # for universe selection keyed by symbol self.selection_data = {} # stores a SymbolData instance for each symbol we tracking for buy/sell # signals keyed by symbol self.symbol_data = {} # stores symbols that should no longer be in our universe but we are # currently still invested in self.universe_whitelist = [] # determines whether to try to pull initial history from object store self.is_initial_setup_period = True self.is_training_selection_data = False self.is_training_symbol_data = False self.selection_data_symbols = [] self.symbol_data_symbols = [] self.added_symbols = [] self.removed_symbols = [] # Add CBOE:VIX data # self.vix = self.AddIndex("VIX", self.resolution).Symbol # create a rolling windows so we can access past indicator data # self.vix_zscore_window = RollingWindow[IndicatorDataPoint](self.vix_length) # setup the vix zscore indicator # self.vix_zscore = EmaSampleStdDev( # '{}.EMA_SAMPLE_STD_DEV({}-{})'.format(self.vix, self.ema_length, self.deviation_length), # self.ema_length, # self.deviation_length # ) self.universe_symbols = set() # define our trade bar consolidator. we can # access the bar from the DataConsolidated events # self.bar_consolidator = TradeBarConsolidator(timedelta(minutes=self.bar_length_in_minutes)) # attach our event handler. the event handler is a function that will # be called each time we produce a new consolidated piece of data. # self.bar_consolidator.DataConsolidated += self.ConsolidatedBarHandler # this call adds our consolidator to # the manager to receive updates from the engine # self.SubscriptionManager.AddConsolidator(self.vix, self.bar_consolidator) # register the consolidated bar data to automatically # update the indicators # self.RegisterIndicator(self.vix, self.vix_zscore, self.bar_consolidator) # vix_history = self.History(self.vix, (self.deviation_length*self.bar_length_in_minutes)*2, self.resolution) # for time, row in vix_history.loc[self.vix].iterrows(): # tradeBar = TradeBar() # tradeBar.Close = row['close'] # tradeBar.Open = row['open'] # tradeBar.High = row['high'] # tradeBar.Low = row['low'] # # tradeBar.Volume = row['volume'] # tradeBar.Time = time # # tradeBar.Time = index[1] # tradeBar.Symbol = self.vix # self.bar_consolidator.Update(tradeBar) self.Schedule.On(self.DateRules.EveryDay(self.benchmark_symbol), self.TimeRules.At(5, 0), self.BeforeMarketOpen) def CoarseSelectionFilter(self, universe): self.symbol_data_symbols.clear() self.selection_data_symbols.clear() universe = sorted(universe, key=lambda c: c.DollarVolume, reverse=True) selected = [x.Symbol for x in universe if True \ and x.HasFundamentalData and x.AdjustedPrice > self.min_price and x.DollarVolume > self.min_dollar_volume] self.Debug("Coarse universe size: " + str(len(selected))) return selected def FineSelectionFilter(self, universe): # filter out stocks that IPO'd less than 4 years ago and have a market cap # greater than our defined paremeter four_years_ago = datetime.now() - timedelta(days=4*365) selected = [x.Symbol for x in universe if True \ and x.SecurityReference.IPODate < four_years_ago and x.MarketCap > self.min_market_cap] for symbol in selected: self.universe_symbols.add(symbol) self.Debug("Fine universe size: " + str(len(self.universe_symbols))) return list(self.universe_symbols) def OnSecuritiesChanged(self, changes): ''' Event fired each time the we add/remove securities from the data feed Args: changes: The security additions and removals from the algorithm ''' added_symbols = [security.Symbol for security in changes.AddedSecurities] self.Debug(f"Number of Symbols: {len(added_symbols)}") t = time.process_time() daily_history = self.History(added_symbols, 200, Resolution.Daily) t2 = time.process_time() self.Debug(f"Daily History took {str(t2-t)}") self.Debug(f"Hourly History Started..") hour_history = self.History(added_symbols, 2200, Resolution.Hour) t3 = time.process_time() self.Debug(f"Hour History took {str(t3-t2)}") for security in changes.AddedSecurities: symbol = security.Symbol if symbol not in self.selection_data: if symbol in hour_history.index.levels[0] and symbol in daily_history.index.levels[0]: hour_df = hour_history.loc[symbol] daily_df = daily_history.loc[symbol] self.selection_data[symbol] = UniverseSelectionData( self, symbol, hour_df, daily_df, ) else: self.Log(f"No data for found {symbol}") if self.is_initial_setup_period: self.Train(self.DateRules.Today, self.TimeRules.At(4, 0), self.BeforeMarketOpen) def BeforeMarketOpen(self): self.is_initial_setup_period = False t = time.process_time() history_request_amount = (self.ema_length*2 + self.deviation_length)*self.bar_length_in_minutes symbols_needed = [symbol for symbol, sel_data in self.selection_data.items() if sel_data.IsReady() and \ symbol not in self.symbol_data and \ sel_data.CanTrade()] symbols_selected = symbols_needed self.Debug(f"{history_request_amount} minute bars on {len(symbols_selected)} symbols out of {len(symbols_needed)}") minute_history = self.History(symbols_selected, history_request_amount, self.resolution) self.Debug(f"{history_request_amount} minute bars on {len(symbols_selected)} symbols took {str(time.process_time()-t)}") for symbol in symbols_needed: if symbol in minute_history.index.levels[0]: symbol_data = SymbolData(symbol, self) symbol_data.RegisterIndicators(self) symbol_data.WarmUpIndicators(minute_history.loc[symbol], self) self.symbol_data[symbol] = symbol_data def OnFinishedTraining(self): # clean up added/removed lists self.added_symbols.clear() self.removed_symbols.clear() self.is_initial_setup_period = False self.is_training_symbol_data = False def OnData(self, data): return ''' OnData event is the primary entry point for your algorithm. Each new data point will be pumped in here. Arguments: data (object): Slice object keyed by symbol containing the stock data ''' # if not (self.vix_zscore.IsReady or not self.vix_zscore_window.IsReady): # return # if not CustomHelpers.IsMarketHours(self): return t = time.process_time() self.UpdateMaxPortfolioValue() if self.IsInMaxDrawdownRange(): self.Notify.Email( "reggie.ofarrell@gmail.com", "Reverend - Max Drawdown Hit", "Reverend script hit max drawdown at {}".format(self.Time) ) self.Liquidate() self.Log("Max drawdown hit, exiting algorithm") self.Quit() return if self.is_training_selection_data or self.is_training_symbol_data: self.Log("OnData called while still Training at " + str(self.Time)) return if not data.HasData: self.Log("slice object empty in OnData at " + str(self.Time)) return invested = [symbol for symbol, portfolio_item in self.Portfolio.items() if portfolio_item.Invested] # create a list to hold new symbols we will invest in new_long_investments = [] new_short_investments = [] log_time = False # for symbol, symbol_data in self.symbol_data.items(): for symbol in list(self.symbol_data): symbol_data = self.symbol_data[symbol] if not (self.Securities.ContainsKey(symbol) and self.ActiveSecurities[symbol].IsTradable): self.Log(f"{str(symbol.Value)} is missing from self.Securities or is not tradable, skipping in OnData") continue if not data.Bars.ContainsKey(symbol): self.Log("missing bars " + str(symbol) + " at " + str(self.Time)) continue if symbol_data.CanEmit(data[symbol].Close): log_time = True # self.Log(f'symbol {symbol_data.symbol.Value} can emit') # we are not invested, see if it's time to buy if not self.Portfolio[symbol].Invested: # buy long logic if symbol_data.long_signal['signal'] == 'buy': new_long_investments.append(symbol) self.SetHoldings(symbol, 0.005, False, 'Buy') # buy short logic if symbol_data.short_signal['signal'] == 'buy': # NOTE: temporarily disable shorting # new_short_investments.append(symbol) # self.SetHoldings(symbol, -0.01) pass # we are invested, update data and sell if it's time elif self.Portfolio[symbol].Invested: if ( self.Portfolio[symbol].IsLong and symbol_data.long_signal['signal'] == 'sell' or self.Portfolio[symbol].IsShort and symbol_data.short_signal['signal'] == 'sell' ): self.ExitPosition(symbol) invested.remove(symbol) '''Need to figure out portfolio management next''' if log_time: log_time = False elapsed_time = time.process_time() - t self.Log(f"OnData - Completed for {str(len(self.symbol_data))} symbols in {str(elapsed_time)} seconds") def ConsolidatedBarHandler(self, sender, consolidated): ''' This is our event handler for our consolidated trade bar defined above in Initialize(). So each time the consolidator produces a new bar, this function will be called automatically. Args: sender (object): IDataConsolidator that invoked the event consolidated (object): consolidated TradeBar ''' pass # if not CustomHelpers.IsMarketHours(self.algorithm): # return # self.vix_zscore.Update(consolidated) # if self.vix_zscore.IsReady: # self.vix_zscore_window.Add(self.vix_zscore.Current) # if self.vix_zscore_window.IsReady: # self.vix_zscore_max = max([x.Value for x in self.vix_zscore_window]) def CheckVixFilter(self): '''Determines if our stored vix_zscore_max is in the range set by the vix_filter_max parameter''' return True return self.vix_zscore_max < self.vix_filter_max def ExitPosition(self, symbol): ''' Liquidates our holdings in a stock, updates last_exit_price tracking and does data cleanup if necessary ''' self.Liquidate(symbol, "Sell") # some cleanup we need to do if this stock was only still in # our universe because we were invested in it if symbol in self.universe_whitelist: symbol_data = self.symbol_data.pop(symbol, None) if symbol_data is not None: symbol_data.RemoveConsolidators() self.selection_data.pop(symbol, None) self.universe_whitelist.remove(symbol) def UpdateMaxPortfolioValue(self): ''' Update our stored max_portfolio_value if the current self.Portfolio.TotalPortfolioValue is higher than our currently stored value ''' if self.Portfolio.TotalPortfolioValue > self.max_portfolio_value: self.max_portfolio_value = self.Portfolio.TotalPortfolioValue def IsInMaxDrawdownRange(self): '''Check if it's time to liquidate because we've dipped below our max drawdown''' return self.Portfolio.TotalPortfolioValue < (1 - self.max_drawdown_perc) * self.max_portfolio_value
from AlgorithmImports import * from custom.CustomHelpers import CustomHelpers from custom.EmaSampleStdDev import EmaSampleStdDev from custom.ATRTrailingStop import ATRTrailingStop from CustomEma import CustomEma from datetime import timedelta from collections import deque class SymbolData: '''Contains data specific to a symbol required by the EMMA alogrithm''' def __init__(self, symbol, algorithm ): self.symbol = symbol self.algorithm = algorithm self.consolidation_period = int(algorithm.GetParameter("bar-length-in-minutes")) self.ema_length = int(algorithm.GetParameter("ema-length")) self.deviation_length = int(algorithm.GetParameter("deviation-length")) self.atr_length = int(algorithm.GetParameter("atr-length")) self.atr_multiplier = float(algorithm.GetParameter("atr-multiplier")) self.ztp = float(algorithm.GetParameter("ztp")) self.short_ztp = float(algorithm.GetParameter("short-ztp")) self.bottom_zx = float(algorithm.GetParameter("bottom-zx")) self.top_zx = float(algorithm.GetParameter("top-zx")) self.crash_line = float(algorithm.GetParameter("crash-line")) self.z_trail_perc = float(algorithm.GetParameter("z-trail-percent")) self.short_z_trail_perc = float(algorithm.GetParameter("short-z-trail-percent")) self.bar = None self.zcrossedzero = 0.0 self.zcrossatbuy = 0.0 self.dippedbelow4 = 0.0 self.szcrossedSZTP = 0.0 self.szcrossatbuy = 0.0 self.z_trail = deque([9999, 9999], maxlen=2) self.short_z_trail = deque([-9999, -9999], maxlen=2) # Options for long_signal and short_signal: None, buy, sell self.long_signal = { "signal": None, "zscore": 0 } self.short_signal = { "signal": None, "zscore": 0 } # self.last_exit_price = 0 # self.highest_price_since_last_investment = 0 # self.trailing_stop_loss_price = 0 # create a rolling window so we can access past trade bars self.window = RollingWindow[TradeBar](2) self.ema_window = RollingWindow[IndicatorDataPoint](2) self.ema_std_dev_window = RollingWindow[IndicatorDataPoint](2) self.atr_tsl_window = RollingWindow[IndicatorDataPoint](2) self.position_window = RollingWindow[IndicatorDataPoint](2) # setup the indicators self.ema_std_dev = EmaSampleStdDev( '{}.EMA_SAMPLE_STD_DEV({}-{})'.format(symbol, self.ema_length, self.deviation_length), self.ema_length, self.deviation_length ) self.ema = CustomEma( '{}.EMA({})'.format(symbol, self.ema_length), self.ema_length, ) self.atr = AverageTrueRange('{}.ATR({})'.format(symbol, self.atr_length), self.atr_length, MovingAverageType.Wilders) self.atr_trailing_stop = ATRTrailingStop('{}.ATRTrailingStop'.format(symbol),self.atr_multiplier) # define our trade bar consolidator. we can # access the bar from the DataConsolidated events self.bar_consolidator = TradeBarConsolidator(timedelta(minutes=self.consolidation_period)) # self.bar_consolidator = TradeBarConsolidator(TimeSpan.FromMinutes(consolidation_period)) # attach our event handler. the event handler is a function that will # be called each time we produce a new consolidated piece of data. self.bar_consolidator.DataConsolidated += self.ConsolidatedBarHandler # this call adds our consolidator to # the manager to receive updates from the engine algorithm.SubscriptionManager.AddConsolidator(symbol, self.bar_consolidator) def RegisterIndicators(self, algorithm): # register the consolidated bar data to automatically update the indicators # algorithm.RegisterIndicator(self.symbol, self.ema_std_dev, self.bar_consolidator) algorithm.RegisterIndicator(self.symbol, self.atr, self.bar_consolidator) # not registering our custom ATRStopLoss indicator here becuase it needs to be manually updated def RemoveConsolidators(self): if self.bar_consolidator is not None: self.algorithm.SubscriptionManager.RemoveConsolidator(self.symbol, self.bar_consolidator) def WarmUpIndicators(self, history, algorithm): # for index, row in history.iterrows(): for time, row in history.iterrows(): tradeBar = TradeBar() tradeBar.Close = row['close'] tradeBar.Open = row['open'] tradeBar.High = row['high'] tradeBar.Low = row['low'] tradeBar.Volume = row['volume'] tradeBar.Time = time # tradeBar.Time = index[1] tradeBar.Symbol = self.symbol self.bar_consolidator.Update(tradeBar) def ConsolidatedBarHandler(self, sender, consolidated): ''' This is our event handler for our consolidated trade bar defined above in Initialize(). So each time the consolidator produces a new bar, this function will be called automatically. We will do everything here instead of in OnData. The 'sender' parameter will be the instance of the IDataConsolidator that invoked the event, but you'll almost never need that! ''' # self.algorithm.Log("SymbolData bar handler fired") self.bar = consolidated # manually update our custom z-score and ema indicators self.ema.Update(self.bar) self.ema_std_dev.Update(self.bar) # add the consolidated bar data to our TradeBar window self.window.Add(consolidated) if self.atr.IsReady: ''' We need the AverageTrueRange indicator to be ready first because we need it's output to be fed into the ATRTrailingStop indicator ''' # this custom indicator takes custom parameters for Update self.atr_trailing_stop.Update(consolidated, self.atr.Current.Value) if not ( self.ema.IsReady and self.ema_std_dev.IsReady and self.atr.IsReady and self.atr_trailing_stop.IsReady ): # don't start populating indicator windows (or doing anything else) until the indicators are ready return # is_invested = self.algorithm.Portfolio.Contains(self.symbol) and self.algorithm.Portfolio[self.symbol].Invested is_invested = self.algorithm.Portfolio[self.symbol].Invested is_long = is_invested and self.algorithm.Portfolio[self.symbol].IsLong is_short = is_invested and self.algorithm.Portfolio[self.symbol].IsShort current_position = 1 if is_invested and is_long else -1 if is_invested and is_short else 0 # add consolidated indicator data to our indicator windows self.ema_window.Add(self.ema.Current) self.ema_std_dev_window.Add(self.ema_std_dev.Current) self.atr_tsl_window.Add(self.atr_trailing_stop.Current) self.position_window.Add(IndicatorDataPoint(self.algorithm.Time, current_position)) if not (self.ema_std_dev_window.IsReady and self.atr_tsl_window.IsReady and self.position_window.IsReady): # don't do anything else until the rolling windows are ready return if self.algorithm.is_training_symbol_data: return # prevent signals from being generated outside of market hours if not CustomHelpers.IsMarketHours(self.algorithm): return if CustomHelpers.IndicatorCrossedValue(self.ema_std_dev_window, self.ztp, self.algorithm): self.zcrossedzero = self.zcrossedzero + 1 if CustomHelpers.IndicatorCrossoverValue(self.position_window, 0.5, self.algorithm): self.zcrossatbuy = self.zcrossedzero if CustomHelpers.IndicatorCrossunderValue(self.ema_std_dev_window, self.crash_line, self.algorithm): self.dippedbelow4 = self.zcrossedzero if CustomHelpers.IndicatorCrossedValue(self.ema_std_dev_window, self.short_ztp, self.algorithm): self.szcrossedSZTP = self.szcrossedSZTP + 1 if CustomHelpers.IndicatorCrossunderValue(self.position_window, -0.5, self.algorithm): self.szcrossatbuy = self.szcrossedSZTP # Update z_trail if self.ema_std_dev_window[1].Value < self.bottom_zx and not is_invested: trail_value = self.ema_std_dev_window[0].Value * (1 - self.z_trail_perc) self.z_trail.appendleft(min(trail_value, self.z_trail[1])) else: self.z_trail.appendleft(99999) # Update short_z_trail if self.ema_std_dev_window[1].Value > self.top_zx and not is_invested: short_trail_value = self.ema_std_dev_window[0].Value * (1 - self.short_z_trail_perc) self.short_z_trail.appendleft(max(short_trail_value, self.short_z_trail[1])) else: self.short_z_trail.appendleft(-99999) ################# # Trade Signals # ################# buy_long_signal = False buy_short_signal = False sell_long_signal = False sell_short_signal = False ### BUY SIGNALS ### # self.algorithm.Plot("Long Z-Trail Windows", "Z-Score[0]", self.ema_std_dev_window[0].Value) # self.algorithm.Plot("Long Z-Trail Windows", "Z-Trail[0]", self.z_trail[0]) # self.algorithm.Plot("Long Z-Trail Windows", "Z-Score[1]", self.ema_std_dev_window[1].Value) # self.algorithm.Plot("Long Z-Trail Windows", "Z-Trail[1]", self.z_trail[1]) # self.algorithm.Plot("Short Z-Trail Windows", "Z-Score[0]", self.ema_std_dev_window[0].Value) # self.algorithm.Plot("Short Z-Trail Windows", "Z-Trail[0]", self.short_z_trail[0]) # self.algorithm.Plot("Short Z-Trail Windows", "Z-Score[1]", self.ema_std_dev_window[1].Value) # self.algorithm.Plot("Short Z-Trail Windows", "Z-Trail[1]", self.short_z_trail[1]) # self.algorithm.Plot("Custom", "Z-Score", self.ema_std_dev.Current.Value) # self.algorithm.Plot("Custom", "EMA", self.ema.Current.Value) # self.algorithm.Plot("Custom", "ATR Trail", self.atr_tsl_window[0].Value) # self.algorithm.Plot("Custom", "Close", self.bar.Close) if ( CustomHelpers.IndicatorCrossoverValueWindow(self.ema_std_dev_window, self.z_trail, self.algorithm) and self.CheckCrashFilter() and self.algorithm.CheckVixFilter() ): buy_long_signal = True if not is_invested: self.algorithm.Log(f"Buy Signal - {self.symbol.Value} - Long") if CustomHelpers.IndicatorCrossunderValueWindow(self.ema_std_dev_window, self.short_z_trail, self.algorithm): buy_short_signal = True if not is_invested: self.algorithm.Log(f"Buy Signal - {self.symbol.Value} - Short") ### SELL SIGNALS ### long_atr_ts = False long_atr_bail = False long_mean_bail = False short_atr_ts = False short_atr_bail = False short_mean_bail = False # Long ATR Trailing Stop if ( CustomHelpers.EquityCrossunderIndicator(self.window, self.atr_tsl_window, self.algorithm) and self.zcrossedzero > self.zcrossatbuy ): # sell_long_signal = True long_atr_ts = True if is_invested: self.algorithm.Log(f"Sell Signal - {self.symbol.Value} - Long ATR Trailing Stop") # Long ATR Bail if ( CustomHelpers.IndicatorCrossoverValue(self.ema_std_dev_window, self.ztp, self.algorithm) and self.ema_std_dev_window[0].Value > self.window[0].Close ): # sell_long_signal = True long_atr_bail = True if is_invested: self.algorithm.Log(f"Sell Signal - {self.symbol.Value} - Long ATR Bail") # Long Mean Bail (Ruh Roh) if is_invested and CustomHelpers.IndicatorCrossunderValue( self.ema_window, self.algorithm.Portfolio[self.symbol].AveragePrice, self.algorithm ): # sell_long_signal = True long_mean_bail = True if is_invested: self.algorithm.Log(f"Sell Signal - {self.symbol.Value} - Long Mean Bail") # Combine signals if long_atr_ts or long_atr_bail or long_mean_bail: sell_long_signal = True # Short ATR Trailing Stop if ( CustomHelpers.EquityCrossoverIndicator(self.window, self.atr_tsl_window, self.algorithm) and self.szcrossedSZTP > self.szcrossatbuy ): # sell_short_signal = True short_atr_ts = True if is_invested: self.algorithm.Log(f"Sell Signal - {self.symbol.Value} - Short ATR Trailing Stop") # Short ATR Bail if ( CustomHelpers.IndicatorCrossunderValue(self.atr_tsl_window, self.short_ztp, self.algorithm) and self.ema_std_dev_window[0].Value < self.window[0].Close ): # sell_short_signal = True short_atr_bail = True if is_invested: self.algorithm.Log(f"Sell Signal - {self.symbol.Value} - Short ATR Bail") # Short Mean Bail (Ruh Roh) if is_invested and CustomHelpers.IndicatorCrossoverValue( self.ema_window, self.algorithm.Portfolio[self.symbol].AveragePrice, self.algorithm ): # sell_long_signal = True short_mean_bail = True if is_invested: self.algorithm.Log(f"Sell Signal - {self.symbol.Value} - Short Mean Bail") # Combine signals if short_atr_ts or short_atr_bail or short_mean_bail: sell_short_signal = True ### WHAT TO DO WITH MULTIPLE SIGNALS ### long_signal = None if (not is_invested and buy_long_signal and sell_long_signal) \ else 'buy' if buy_long_signal and not sell_long_signal \ else 'sell' if \ (is_invested and buy_long_signal and sell_long_signal) \ or (sell_long_signal and not buy_long_signal) \ else self.long_signal["signal"] if long_signal != self.long_signal["signal"]: self.long_signal = { "signal": long_signal, "zscore": self.ema_std_dev.Current.Value } short_signal = None if (not is_invested and buy_short_signal and sell_short_signal) \ else 'buy' if buy_short_signal and not sell_short_signal \ else 'sell' if \ (is_invested and buy_short_signal and sell_short_signal) \ or (sell_short_signal and not buy_short_signal) \ else self.short_signal["signal"] if short_signal != self.short_signal["signal"]: self.short_signal = { "signal": short_signal, "zscore": self.ema_std_dev.Current.Value } # @property # def Return(self): # return float(self.ROC.Current.Value) def CanEmit(self, update_close): ''' Determines if this symbol can be traded on the current bar in OnData Args: update_close (float): current close price, typically passed from OnData Returns: bool ''' # self.algorithm.Log("Can Emit:") # self.algorithm.Log("self.bar.Close == update_close: " + str(self.bar.Close == update_close)) # self.algorithm.Log("self.ema_std_dev.IsReady: " + str(self.ema_std_dev.IsReady)) # self.algorithm.Log("self.atr.IsReady: " + str(self.atr.IsReady)) # self.algorithm.Log("self.atr_trailing_stop.IsReady: " + str(self.atr_trailing_stop.IsReady)) # self.algorithm.Log("self.window.IsReady: " + str(self.window.IsReady)) # self.algorithm.Log("self.ema_std_dev_window.IsReady: " + str(self.ema_std_dev_window.IsReady)) # self.algorithm.Log("self.atr_tsl_window.IsReady: " + str(self.atr_tsl_window.IsReady)) # self.algorithm.Log("self.position_window.IsReady: " + str(self.position_window.IsReady)) # self.algorithm.Log("") return self.bar.Close == update_close \ and self.ema.IsReady \ and self.ema_std_dev.IsReady \ and self.atr.IsReady \ and self.atr_trailing_stop.IsReady \ and self.window.IsReady \ and self.ema_window.IsReady \ and self.ema_std_dev_window.IsReady \ and self.atr_tsl_window.IsReady \ and self.position_window.IsReady def CheckCrashFilter(self): '''Returns True if we are NOT in a severe z-score dip''' return self.dippedbelow4 != self.zcrossedzero # def ResetPriceTracking(self): # self.highest_price_since_last_investment = 0 # self.trailing_stop_loss_price = 0 # # self.hard_stop_market_ticket = None # def UpdateHighestPrice(self): # if (self.bar.Close > self.highest_price_since_last_investment): # self.highest_price_since_last_investment = self.bar.Close # def IsInTrailingStopLossRange(self): # return self.bar.Close < (1 - self.trailing_stop_loss_perc) * self.highest_price_since_last_investment # def IsInHardStopLossRange(self): # return self.bar.Close < (1 - self.stop_loss_perc) * self.algorithm.Portfolio[self.symbol].AveragePrice # def IsInTakeProfitRange(self): # return CustomHelpers.EquityCrossoverValue( # self.window, # self.algorithm.Portfolio[self.symbol].AveragePrice * (1 + self.ema_length), # self.algorithm # ) # def CheckHighFilter(self, returnSingleBuffer = False): # # get the highest closing price between 25 bars back and the end of # # the rolling window # high_period = max(list(map(lambda bar: bar.Close, self.window))[25:]) # buffer_1 = high_period * self.high_buffer # buffer_2 = high_period * self.low_buffer # buffers = { # 'buffer_1': buffer_1, # 'buffer_2': buffer_2 # } # return buffers[returnSingleBuffer] \ # if returnSingleBuffer != False \ # else self.bar.Close < buffer_1 and self.bar.Close > buffer_2 # # if returnSingleBuffer != False: # # return buffers[returnSingleBuffer] # # elif (returnSingleBuffer == False): # # return self.bar.lose < buffer_1 and self.bar.lose > buffer_2 # def HasMovedEnough(self): # if not self.algorithm.Portfolio[self.symbol].Invested: # self.algorithm.Debug("HasMovedEnough called when not invested") # return False # avg_price = self.algorithm.Portfolio[self.symbol].AveragePrice # mindown = avg_price * self.moved_enough_min_down # minup = avg_price * self.moved_enough_min_up # return mindown >= self.bar.Close or minup <= self.bar.Close # def CheckLastExitFilter(self): # exitbuffer1 = self.last_exit_price * self.last_exit_high_buffer # exitbuffer2 = self.last_exit_price * self.last_exit_low_buffer # return self.bar.Close < exitbuffer1 and self.bar.Close > exitbuffer2
from AlgorithmImports import * from custom.CustomEma import CustomEma from CustomHelpers import CustomHelpers from QuantConnect.Data import Custom from statsmodels.tsa.stattools import adfuller, kpss import numpy as np from datetime import timedelta class UniverseSelectionData(): '''Contains data specific to a fine universe selection required by The Reverend alogrithm''' def __init__( self, algorithm, symbol, hour_history, daily_history ): self.ema_length = int(algorithm.GetParameter("ema-length")) self.bar_length_in_minutes = int(algorithm.GetParameter("bar-length-in-minutes")) self.min_avg_daily_dollar_volume = int(algorithm.GetParameter("min-avg-daily-dollar-volume")) self.min_avg_daily_stock_price = int(algorithm.GetParameter("min-avg-daily-stock-price")) self.universe_selection_sma_length = int(algorithm.GetParameter("selection-sma-length")) self.universe_selection_hour_bar_window_size = int(algorithm.GetParameter("selection-hour-bar-window-size")) self.adf_p_value = float(algorithm.GetParameter("adf-p-value")) self.adf_use_ema_diffs = int(algorithm.GetParameter("enable-adf-use-ema-diffs")) self.kpss_p_value = float(algorithm.GetParameter("kpss-p-value")) self.variance_ratio_p_value = float(algorithm.GetParameter("variance-ratio-p-value")) self.hurst_value = float(algorithm.GetParameter("hurst-value")) self.enable_sma_filter = int(algorithm.GetParameter("enable-sma-filter")) self.enable_volume_liquidity_filter = int(algorithm.GetParameter("enable-volume-liquidity-filter")) self.enable_price_qualifier_filter = int(algorithm.GetParameter("enable-price-qualifier-filter")) self.enable_adf_filter = int(algorithm.GetParameter("enable-adf-filter")) self.enable_kpss_filter = int(algorithm.GetParameter("enable-kpss-filter")) self.enable_hurst_variance_filter = int(algorithm.GetParameter("enable-hurst-variance-filter")) self.symbol = symbol self.algorithm = algorithm hour_ema_length = round(self.ema_length/12) # self.algorithm.Log("hour_ema_length: " + str(hour_ema_length)) self.ema = CustomEma( '{}.UniverseEMA({})'.format(symbol, hour_ema_length), hour_ema_length, ) self.daily_bar_window = RollingWindow[TradeBar](self.universe_selection_sma_length) self.hour_ema_window = RollingWindow[IndicatorDataPoint](self.universe_selection_hour_bar_window_size) self.hour_bar_window = RollingWindow[TradeBar](self.universe_selection_hour_bar_window_size) # we're not using self.algorithm.SubscriptionManager.AddConsolidator here because # we're manually passing minute bars to self.five_min_consolidator via # self.five_min_consolidator.Update with data that is coming from a history call # and you can't add a Subscription before a symbol is in the universe # # self.five_min_consolidator = TradeBarConsolidator(timedelta(minutes=5)) # self.five_min_consolidator.DataConsolidated += self.FiveMinuteBarConsolidator self.daily_consolidator = TradeBarConsolidator(timedelta(days=1)) self.daily_consolidator.DataConsolidated += self.DailyBarHandler algorithm.SubscriptionManager.AddConsolidator(symbol, self.daily_consolidator) self.hours_consolidator = TradeBarConsolidator(timedelta(hours=1)) self.hours_consolidator.DataConsolidated += self.HourlyBarHandler algorithm.SubscriptionManager.AddConsolidator(symbol, self.hours_consolidator) for time, row in daily_history.iterrows(): tradeBar = TradeBar() tradeBar.Close = row['close'] tradeBar.Volume = row['volume'] tradeBar.Time = time tradeBar.Symbol = self.symbol self.daily_bar_window.Add(tradeBar) # for time, row in minute_history.iterrows(): for time, row in hour_history.iterrows(): tradeBar = TradeBar() tradeBar.Close = row['close'] tradeBar.Open = row['open'] tradeBar.High = row['high'] tradeBar.Low = row['low'] tradeBar.Volume = row['volume'] tradeBar.Time = time tradeBar.Symbol = self.symbol # self.five_min_consolidator.Update(tradeBar) self.hour_bar_window.Add(tradeBar) self.ema.Update(tradeBar) if self.ema.IsReady: self.hour_ema_window.Add(self.ema.Current) def IsReady(self): # self.algorithm.Log(f"{self.symbol.Value} daily_bar_window.IsReady: {self.daily_bar_window.IsReady}") # self.algorithm.Log(f"{self.symbol.Value} ema.IsReady: {self.ema.IsReady}") # self.algorithm.Log(f"{self.symbol.Value} hour_ema_window.IsReady: {self.hour_ema_window.IsReady}") # self.algorithm.Log(f"{self.symbol.Value} hour_bar_window.IsReady: {self.hour_bar_window.IsReady}") is_ready = self.daily_bar_window.IsReady \ and self.ema.IsReady \ and self.hour_ema_window.IsReady \ and self.hour_bar_window.IsReady return is_ready def CanTrade(self): '''Determines if this symbol meets our trading criteria''' trade_days_in_a_month = 21 daily_close_prices = [x.Close for x in self.daily_bar_window] daily_trade_volumes = [x.Volume for x in self.daily_bar_window] ################################# # Above Moving Average Criteria # ################################# # 200 day sma sma = sum(daily_close_prices[0:self.universe_selection_sma_length])/self.universe_selection_sma_length is_above_sma = daily_close_prices[0] > sma if self.enable_sma_filter: if not is_above_sma: return False ###################################### # Trade Volume & Liquidity Qualifier # ###################################### # Is the average daily trade volume over a certain number the past month (21 trade days)? # Note that volume by default in tradingview is the number of shares traded. # We convert that to dollars to normalize it and make trade volume comparible enough to judge liquidity. # We do this by taking the sum of trade volume over the past month and multiplying it by the average price for the month, then dividing by the days in the month (len). average_price = sum(daily_close_prices[0:trade_days_in_a_month])/trade_days_in_a_month combined_daily_share_volume = sum(daily_trade_volumes[0:trade_days_in_a_month]) average_daily_dollar_volume = (combined_daily_share_volume * average_price) / trade_days_in_a_month # average_daily_dollar_volume = sum(daily_trade_volumes[0:trade_days_in_a_month])/trade_days_in_a_month has_enough_share_volume = average_daily_dollar_volume > self.min_avg_daily_dollar_volume if self.enable_volume_liquidity_filter: if not has_enough_share_volume: return False ################### # Price Qualifier # ################### # Only trade things with an average price over X in the past month. X should probably # land somewhere in between $5-$10. has_high_enough_avg_price = sum(daily_close_prices[0:trade_days_in_a_month])/trade_days_in_a_month > self.min_avg_daily_stock_price if self.enable_price_qualifier_filter: if not has_high_enough_avg_price: return False ############ # ADF Test # ############ # create a list with the difference between the EMA and close price for every bar in the rolling window bar_ema_diffs_ar = np.array([self.hour_ema_window[idx].Value - val.Close for idx, val in enumerate(self.hour_bar_window)]) close_ar = np.array([x.Close for x in self.hour_bar_window]) # bar_vals = [ # { # "close": val.Close, # "ema": self.five_min_ema_window[idx].Value, # "diff": bar_ema_diffs_ar[idx] # } for idx, val in enumerate(self.five_min_bar_window) # ] # self.algorithm.Log("ADF Bar Vals: " + str(bar_vals)) # self.algorithm.Log("Bar-EMA Diffs: " + str(bar_ema_diffs_ar)) adf_result = adfuller( bar_ema_diffs_ar if self.adf_use_ema_diffs else close_ar, None, 'ctt' ) adf_p = adf_result[1] # self.algorithm.Log(str(self.symbol.Value) + "," + str(close_p) + "," + str(close_critical['1%']) + "," + str(close_critical['5%']) + "," + str(close_critical['10%'])) # self.algorithm.Log("EMA :: " + str(self.symbol.Value) + " P: " + str(bar_ema_diffs_adf_result[1]) + " C: " + str(bar_ema_diffs_adf_result[4])) # self.algorithm.Log("Close :: " + str(self.symbol.Value) + " P: " + str(close_adf_result[1]) + " C: " + str(close_adf_result[4])) if self.enable_adf_filter: if adf_p >= self.adf_p_value: return False ############# # KPSS Test # ############# # bar_vals = [ # { # "close": val.Close, # "ema": self.five_min_ema_window[idx].Value, # "diff": bar_ema_diffs_ar[idx] # } for idx, val in enumerate(self.five_min_bar_window) # ] # self.algorithm.Log("ADF Bar Vals: " + str(bar_vals)) # self.algorithm.Log("Bar-EMA Diffs: " + str(bar_ema_diffs_ar)) # bar_ema_diffs_adf_result = kpss(bar_ema_diffs_ar, 'ct') kpss_result = kpss(close_ar, 'ct') kpss_p = kpss_result[1] # self.algorithm.Log(str(self.symbol.Value) + "," + str(close_p) + "," + str(close_critical['1%']) + "," + str(close_critical['5%']) + "," + str(close_critical['10%'])) # self.algorithm.Log("EMA :: " + str(self.symbol.Value) + " P: " + str(bar_ema_diffs_adf_result[1]) + " C: " + str(bar_ema_diffs_adf_result[4])) # self.algorithm.Log("Close :: " + str(self.symbol.Value) + " P: " + str(close_adf_result[1]) + " C: " + str(close_adf_result[4])) if self.enable_kpss_filter: if kpss_p <= self.kpss_p_value: return False ################################### # Hurst Exponent & Variance Ratio # ################################### hurst = CustomHelpers.hurst(close_ar) vratio = CustomHelpers.vratio(close_ar) vratio_p = vratio[2] if self.enable_hurst_variance_filter: if hurst > self.hurst_value and vratio_p > self.variance_ratio_p_value: return False ############## # Good to go # ############## # if we made it this far, return True return True def DailyBarHandler(self, daily_bar): ''' Update the self.daily_bar_window with a new trade bar Args: time - datetime object for the bar price - close price for the bar volume - volume for the bars ''' self.daily_bar_window.Add(daily_bar) def HourlyBarHandler(self, hourly_bar): ''' Loops over a history of hour bars to update our windows Args: history - history dataframe in minute bars ''' self.hour_bar_window.Add(hourly_bar) self.ema.Update(hourly_bar) if self.ema.IsReady: self.hour_ema_window.Add(self.ema.Current) # def RemoveConsolidators(self): # if self.five_min_consolidator is not None: # self.algorithm.SubscriptionManager.RemoveConsolidator(self.symbol, self.five_min_consolidator) # def WarmUpIndicators(self, history): # for time, row in history.iterrows(): # tradeBar = TradeBar() # tradeBar.Close = row['close'] # tradeBar.Open = row['open'] # tradeBar.High = row['high'] # tradeBar.Low = row['low'] # tradeBar.Volume = row['volume'] # tradeBar.Time = time # # tradeBar.Time = index[1] # tradeBar.Symbol = self.symbol # self.five_min_consolidator.Update(tradeBar) # def FiveMinuteBarConsolidator(self, sender, consolidated): # # self.algorithm.Log("FiveMinuteBarConsolidator Time :" + str(consolidated.Time)) # self.five_min_bar_window.Add(consolidated) # self.ema.Update(consolidated) # if self.ema.IsReady: # self.five_min_ema_window.Add(self.ema.Current)