Overall Statistics |
Total Trades 13 Average Win 37.12% Average Loss -0.22% Compounding Annual Return 315.468% Drawdown 13.800% Expectancy 55.167 Net Profit 82.111% Sharpe Ratio 7.429 Probabilistic Sharpe Ratio 99.903% Loss Rate 67% Win Rate 33% Profit-Loss Ratio 167.50 Alpha 0 Beta 0 Annual Standard Deviation 0.497 Annual Variance 0.247 Information Ratio 7.429 Tracking Error 0.497 Treynor Ratio 0 Total Fees $13.00 Estimated Strategy Capacity $110000000.00 Lowest Capacity Asset GME SC72NCBXXAHX |
import pandas as pd from io import StringIO class Dropbox: def __init__(self, context): self.context = context self.current_universe = [] self.live_url = "https://www.dropbox.com/s/yyx5dhlfurffkyw/cur_ment.csv?dl=1" self.backtest_url = "https://www.dropbox.com/s/015ej53p1w7xgj2/backtest_debug-Sheet1-3.csv?dl=1" self.backtest_df = None self.df = None self.num_mentions_include = 500 self.num_mentions_dropout = 100 self.timer_threshold = timedelta(days=14) def get_tickers(self): df = self.get_df() if df is None: tickers = self.check_timers() self.current_universe = tickers return tickers tickers = self.filter_tickers(df) self.current_universe = tickers self.df = df return tickers def check_timers(self): universe = [] for ticker in self.current_universe: data = self.context.data[ticker] if data.timer is True: self.context.Debug(f"{ticker} is being checked @ {self.context.Time}, timer_exp: {data.timer_exp}") if self.context.Time < data.timer_exp: universe.append(ticker) else: self.context.Debug(f"Timer has expired for {ticker}, removing from Universe") continue else: universe.append(ticker) return universe def get_df(self): if self.context.LiveMode: try: url = self.live_url csv = self.context.Download(url) self.context.Debug(f"csv: {csv}") df = pd.read_csv(StringIO(csv)) self.context.Debug(f"df: {df}") df.columns = ['ticker', 'current_mentions'] df.set_index('ticker', inplace=True) self.context.Debug(f"Returning DF: {df}") return df except: self.context.Debug("-- CRITICAL -- Could NOT get csv in LiveMode") return None if self.backtest_df is not None: row = self.get_row() return row url = self.backtest_url csv = self.context.Download(url) df = pd.read_csv(StringIO(csv)) cols = df.columns.values cols[0] = 'datetime' df.columns = cols df.set_index('datetime', inplace=True) self.backtest_df = df row = self.get_row() return row def get_row(self): # in live mode we are calling at t+1, therefore in backtesting we need to get the datetime of t-1 in order to pull it from the rows. dt = self.context.Time - timedelta(minutes=self.context.check_buffer) dt = dt.strftime('%Y-%m-%d %H:%M:00') self.context.Debug(f"Trying to locate dt: {dt}") try: row = self.backtest_df.loc[[dt]] except: self.context.Debug(f"Could not find row {dt} in df") return None df = row.transpose() df.index.rename('ticker', inplace=True) df.columns = ['current_mentions'] self.context.Debug(f"transposed df: \n{df}") return df def filter_tickers(self, df): ''' filter_tickers takes the current tickers and tickers existing in the universe already and evaluates the inclusion in the new (updated) universe. Arguments: df: pd.DF of mentions indexed by ticker Returns: list of tickers that should be included in the new (updated) universe. ''' universe = [] ticker_list = [ ticker for ticker in df.index ] for ticker in ticker_list: mentions = df.loc[ticker, 'current_mentions'] if ticker in self.current_universe: data = self.context.data[ticker] if mentions > self.num_mentions_dropout: if data.timer is False: # ticker has not fallen below dropout threshold yet. Keep it. universe.append(ticker) else: # ticker has previously triggered dropout but is now back above the dropout threshold. if mentions > self.num_mentions_include: # ticker has enough mentions to include and reset the timer. Keep it. data.reset_timer() universe.append(ticker) else: # ticker is above dropout threshold but not yet at the include threshold. self.context.Debug(f"{ticker} is being checked @ {self.context.Time}, timer_exp: {data.timer_exp}") if self.context.Time < data.timer_exp: # ticker has not yet triggered timer threshold. Keep it. universe.append(ticker) else: if data.in_trade is True: self.context.Debug(f"{ticker} has expired but is in a trade, keeping in Universe") universe.append(ticker) else: # ticker is above the dropout threshold, but has not met the include threshold and has triggered the timer threshold. Remove it. self.context.Debug(f"Timer has expired for {ticker}, removing from Universe") continue else: # ticker is below dropout threshold. if data.timer is False: # start the timer. Keep it. data.start_timer(self.timer_threshold) universe.append(ticker) else: self.context.Debug(f"{ticker} is being checked @ {self.context.Time}, timer_exp: {data.timer_exp}") if self.context.Time < data.timer_exp: # ticker is below dropout threshold but has not triggered timer threshold. Keep it. universe.append(ticker) else: if data.in_trade is True: universe.append(ticker) else: # ticker is below dropout threshold and has triggered timer threshold. Remove it. self.context.Debug(f"Timer has expired for {ticker}, removing from Universe") continue else: if mentions > self.num_mentions_include: universe.append(ticker) # get 'other tickers' that are in the current universe but have not been included in the csv this iteration other_tickers = [ ticker for ticker in self.current_universe if ticker not in ticker_list ] for ticker in other_tickers: data = self.context.data[ticker] if data.timer is False: # ticker previously was in universe but had not triggered dropout. data.start_timer(self.timer_threshold) universe.append(ticker) else: # ticker previously was in universe and had triggered dropout. self.context.Debug(f"{ticker} is being checked @ {self.context.Time}, timer_exp: {data.timer_exp}") if self.context.Time < data.timer_exp: universe.append(ticker) else: self.context.Debug(f"Timer has expired for {ticker}, removing from Universe") continue return universe
from SymbolData import SymbolData from MyModels import MyKMeansModel from Dropbox import Dropbox import math import pandas as pd from io import StringIO import pickle class OptimizedParticleEngine(QCAlgorithm): def Initialize(self): self.SetStartDate(2021, 1, 12) # Set Start Date # self.SetEndDate(2021, 6, 15) # Set End Date self.SetCash(10000) self.SetBrokerageModel( InteractiveBrokersBrokerageModel() ) self.SetSecurityInitializer(lambda x: x.SetDataNormalizationMode(DataNormalizationMode.Raw)) self.base_res = Resolution.Minute self.data = {} self.models = {} self.use_dropbox = False self.dropbox = Dropbox(self) self.UniverseSettings.Resolution = self.base_res self.UniverseSettings.MinimumTimeInUniverse = TimeSpan.Zero self.AddUniverse("dropbox", Resolution.Minute, self.get_universe) self.dropbox_minutes = [0, 15, 30, 45] self.check_buffer = 1 self.dropbox_check_minutes = [minute+self.check_buffer for minute in self.dropbox_minutes] # self.reloaded = False # self.current_universe = [] # if self.ObjectStore.ContainsKey("current_universe"): # self.reload_universe() # self.reload_dict = None # self.reloaded = True ### PARAMS self.risk_percent = 0.05 self.check_gap_opens = True self.gap_close_percent = 100 self.previous = None self.price_cutoff = 200 self.marketcap_cutoff = 1e10 # $10b=largecaps self.num_largest = 25 self.threshold_mult = 3 self.can_trade_again = True self.check_for_retraining = True def reload_universe(self): self.Debug(f"Reloading Universe from StoredObjects") deserialized = bytes(self.ObjectStore.ReadBytes("current_universe")) # self.Debug(f"Deserialized: {deserialized}") universe_dict = pickle.loads(deserialized) self.reload_dict = universe_dict self.current_universe = list(universe_dict.keys()) self.Debug(f"Previous Current Universe Reloaded: {self.reload_dict}, self.current_universe: {self.current_universe}") def get_universe(self, date): if self.use_dropbox is False: all_tickers = ['GME', 'AMC', 'BB', 'RKT', 'TSLA', 'SPCE', 'PLTR', 'SNDL', 'NOK', 'MVIS', 'BABA', 'NVDA', 'AMD', 'CLOV'] one = ['GME'] tickers = one#all_tickers return tickers if date.minute not in self.dropbox_check_minutes: return self.current_universe self.Debug(f"Checking Dropbox for new Universe @ {self.Time}") tickers = self.dropbox.get_tickers() self.current_universe = tickers self.Debug(f"Returning Universe: {self.current_universe}") # fresh = [ticker for ticker in tickers if ticker not in self.current_universe] # for ticker in fresh: # self.data[ticker] = None # current_data = {ticker:self.data[ticker] for ticker in tickers if ticker in self.data.keys()} # current_data = {ticker: None for ticker in tickers} current_data = {ticker: self.data[ticker].timer_exp if ticker in self.data else None for ticker in tickers} serialized = pickle.dumps(current_data) self.ObjectStore.SaveBytes("current_universe", serialized) return tickers def evaluate(self, symbol, bar): ticker = symbol.Value self.Debug(f"Evaluating Data for {ticker}") if ticker not in self.data: return data = self.data[ticker] if not data.IsReady(): self.Debug(f"{self.Time}: {ticker} data is not ready!") return good_price = bar.Close < self.price_cutoff if not good_price: # self.Debug(f"Refusing to trade {ticker} @ {bar.Close}") return # if self.models[ticker] is None: # self.Debug(f"{ticker} Model is not trained, training model.") # self.train_model(ticker) # # trained model and threshold saved in symboldata # df = data.build_df(live=True) # if df is None: # self.Debug(f"build_df returned None") # return # model = self.models[ticker] # features = model.get_features(df, live=True) # distances = model.model_transform(features) # current = distances.values[-1] # threshold = data.threshold # get indicator values upper_channel_prev = data.channel.upper_history[1] lower_channel_prev = data.channel.upper_history[1] mid_channel = data.channel.mid_history[0] st_fifteen = data.supertrend_fifteen.Value st_fifteen_prev = data.supertrend_fifteen.val_window[1] st_hour = data.supertrend_hour.Value st_hour_prev = data.supertrend_hour.val_window[1] ema_fast = data.ema_fast.Current.Value ema_slow = data.ema_slow.Current.Value # get bar values hour_bar = data.hour_bars[0] # define conditions # anomaly = current > threshold long_break = bar.Close > upper_channel_prev short_break = bar.Close < lower_channel_prev # up_diff = bar.Close > data.bars[1].Close # long_environment = ema_fast > ema_slow can_trade = data.can_trade go_long = long_break and can_trade exit_trigger = hour_bar.Close < st_hour exit_long = exit_trigger # self.Debug(f"Current Distance: {current}, Threshold: {threshold}, IsAnomaly? {current > threshold}") # self.Debug(f"ema_fast: {ema_fast}, ema_slow: {ema_slow}") if self.Portfolio[symbol].Invested: if data.no_exit > 0: data.no_exit += -1 self.Debug(f"{data.no_exit}") if data.no_exit > 0: exit_long = False return if exit_long: self.Debug(f"-- {ticker} LONG EXIT @ {hour_bar.Close} --") self.Debug(f"Open: {hour_bar.Open}, High: {hour_bar.High}, Low: {hour_bar.Low}, Close: {hour_bar.Close}, Volume: {hour_bar.Volume}, BarTime: {hour_bar.Time}") self.Debug(f"supertrend_hour: {st_hour}, supertrend_hour_prev: {st_hour_prev}") self.Liquidate(symbol, 'Liquidate Position (Exit Long)') data.in_trade = False if self.can_trade_again: data.can_trade = True # if self.check_for_retraining: # data.retrain_model = True if not self.Portfolio[symbol].Invested: if go_long: self.Debug(f"-- {ticker} LONG ENTRY @ {bar.Close}")#", Current Distance: {current}, Threshold: {threshold}") self.Debug(f"Open: {bar.Open}, High: {bar.High}, Low: {bar.Low}, Close: {bar.Close}, Volume: {bar.Volume}, BarTime: {bar.Time}") self.Debug(f"supertrend_fifteen: {st_fifteen}, supertrend_fifteen_prev: {st_fifteen_prev}") sl_price = st_fifteen # sl_price = bar.Close - (bar.Close*0.1) sl_price = round(sl_price, 2) quantity = self.calculate_quantity(symbol, bar.Close, sl_price) entry = self.MarketOrder(symbol, quantity, False, 'Long Entry') sl = self.StopMarketOrder(symbol, -quantity, sl_price, 'Long SL') data.in_trade = True # data.can_trade = False data.no_exit = 3 def OnData(self, base_data): '''OnData event is the primary entry point for your algorithm. Each new data point will be pumped in here. Arguments: base_data: Slice object keyed by symbol containing the stock data at base_res ''' if self.check_gap_opens: if self.previous is not None and self.Time.date() == self.previous.date(): return for ticker in self.data.keys(): symbol = self.data[ticker].symbol if self.Portfolio[symbol].Invested: if not base_data.Bars.ContainsKey(symbol): self.Debug(f"{ticker} not in OnData slice. Time: {self.Time}") continue data = self.data[ticker] current_open = base_data[symbol].Open previous_close = data.bars[0].Close distance = (current_open - previous_close) / previous_close * 100 if distance >= self.gap_close_percent: self.Liquidate(symbol, "Liquidate Position (Gap Open)") self.Debug(f"--!! Gap open of: {round(distance, 2)}%, closing position !!--") self.Debug(f"Last BarTime: {data.bars[0].EndTime}, Close: {previous_close}") self.Debug(f"Current BarTime: {base_data[symbol].EndTime}, Open: {current_open}") data.in_trade = False if self.can_trade_again: data.can_trade = True if self.check_for_retraining: data.retrain_model = True # if self.check_for_retraining: # for ticker in self.data.keys(): # data = self.data[ticker] # if data.retrain_model: # self.train_model(ticker) # data.retrain_model = False # by setting self.previous we only check gaps self.previous = self.Time def calculate_quantity(self, symbol, close, sl_value): equity = self.Portfolio.TotalPortfolioValue risk = self.risk_percent conversion = self.Securities[symbol].QuoteCurrency.ConversionRate # quantity = (equity * risk) / (abs(close - sl_value) * conversion) quantity = (equity * risk) / close * conversion # quantity = cash_amount / close * conversion quantity = math.floor(quantity) self.Debug(f"Calculated Quantity as: {quantity}") return quantity # def train_model(self, ticker): # self.Debug(f"-- Started Training the Model for {ticker} @ {self.Time}") # data = self.data[ticker] # history = data.build_df(training=True) # self.Debug(f"Got {len(history)} of History for {ticker}") # kmeans = MyKMeansModel(self, ticker) # features = kmeans.get_features(history, training=True) # self.Debug(f"Got {len(features)} Features for {ticker}") # kmeans.create_model() # self.Debug(f"{ticker} Model Created") # kmeans.fit_model(features) # self.Debug(f"{ticker} Model Fit") # distances = kmeans.model_transform(features) # # self.Debug(f"Features Transformed: {len(distance)}") # num_largest = self.num_largest # threshold_mult = self.threshold_mult # threshold = distances.nlargest(num_largest).sum() / num_largest * threshold_mult # self.data[ticker].threshold = threshold # self.models[ticker] = kmeans # self.Debug(f"-- Finished training model for {ticker}, threshold calculated and saved in data: {self.data[ticker].threshold}") def OnSecuritiesChanged(self, changes): self.Debug(f"-- OnSecuritiesChanged triggered, added: {len(changes.AddedSecurities)}, removed: {len(changes.RemovedSecurities)}") for removed in changes.RemovedSecurities: ticker = removed.Symbol.Value data = self.data.pop(ticker, None) if data is not None: data.remove_consolidators() model = self.models.pop(ticker, None) self.Debug(f"{ticker} Removed from Universe in OnSecuritiesChanged") symbols = [ x.Symbol for x in changes.AddedSecurities ] history = self.History(symbols, 10000, Resolution.Minute) if history.empty: self.Debug(f"-- Finished making changes to the Universe") self.Debug(f"Active Securities: {[symbol.Value for symbol in self.ActiveSecurities.Keys]}") self.Debug(f"Securities: {[symbol.Value for symbol in self.Securities.Keys]}") return for added in changes.AddedSecurities: symbol = added.Symbol ticker = symbol.Value if ticker not in self.data: self.data[ticker] = SymbolData(self, symbol) elif self.data[ticker] is None: self.data[ticker] = SymbolData(self, symbol) # self.models[ticker] = None try: ticker_history = history.loc[symbol] except: self.Debug(f"Cannot add {ticker} to Universe because it is not in History!") continue if len(ticker_history) < 5000: self.Debug(f"Cannot add {ticker} to Universe because it does not have enough History! Len: {len(ticker_history)}") continue self.data[ticker].warm_up_history(ticker_history) self.Debug(f"{ticker} Added to Universe in OnSecuritiesChanged") # self.train_model(ticker) self.Debug(f"-- Finished making changes to the Universe") self.Debug(f"Active Securities: {[symbol.Value for symbol in self.ActiveSecurities.Keys]}") self.Debug(f"Securities: {[symbol.Value for symbol in self.Securities.Keys]}")
import numpy as np import pandas as pd from sklearn.cluster import KMeans from sklearn.preprocessing import MinMaxScaler class MyKMeansModel: def __init__(self, context, symbol): self.context = context self.symbol = symbol self.model = None self.scaler = None self.should_drop_ohlcv = False self.should_winsorize = False self.winsorize_by = 'close_shift' self.winsorize_percent = 5 def create_model(self): n_clusters = 5 init = 'k-means++' n_init = 10 tol = 1e-04 # rs = 42 #None kmeans = KMeans( n_clusters=n_clusters, init=init, n_init=n_init, tol=tol, # random_state=rs ) self.model = kmeans def fit_model(self, scaled_df): model = self.model model.fit(scaled_df) def model_transform(self, scaled_df): # self.context.Debug(f"Transforming LIVE Data") transformed = self.model.transform(scaled_df).min(axis=1) # self.context.Debug(f"Transformed LIVE Data") distance = pd.Series(transformed) # self.context.Debug(f"Made pd.Series of Distances") return distance def get_features(self, history, training=False, live=False): # symbol_hist = history.loc[symbol] df = history.copy() df.dropna(axis=0, inplace=True) # self.context.Debug(df.head()) closes = df['close'] fast_len = 20 slow_len = 50 # df['ema_fast'] = closes.rolling(window=fast_len).mean() # df['ema_slow'] = closes.rolling(window=slow_len).mean() ema_fast = closes.rolling(window=fast_len).mean() ema_slow = closes.rolling(window=slow_len).mean() # self.context.Debug("Computed MAs") df['trend_fast'] = closes / ema_fast df['trend_slow'] = closes / ema_slow df['close_shift'] = closes - closes.shift(1) df['volume_shift'] = df['volume'] - df['volume'].shift(1) df['close_diff'] = closes.diff(1) df['volume_diff'] = df['volume'].diff(1) df['HL'] = df['high'] - df['low'] df['HL_vol'] = df['HL'].rolling(window=50).std() df.dropna(axis=0, inplace=True) if self.should_drop_ohlcv: to_drop = ['open', 'high', 'low', 'close', 'volume'] df.drop(to_drop, axis=1, inplace=True) # self.context.Debug("Ready to scale and/or winsorize") if training: if self.should_winsorize: wins_df = self.winsorize_df(df) scaled_df = self.scale_df(wins_df, training=True) else: scaled_df = self.scale_df(df, training=True) df = scaled_df elif live: scaled_df = self.scale_df(df, live=True) df = scaled_df return df def scale_df(self, df, training=False, live=False): df = df.copy() if training: # self.context.Debug(f"Scaling TRAINING Features, columns:{len(df.columns)}") scaler = MinMaxScaler() scaler = scaler.fit(df) self.scaler = scaler # self.context.Debug(self.scaler) scaled = self.scaler.transform(df) elif live: # self.context.Debug(f"Scaling LIVE Features, columns:{len(df.columns)}") scaler = self.scaler scaled = scaler.transform(df) return scaled def winsorize_df(self, df): # self.context.Debug("Winsorizing") wins_df = df.copy() to_wins = wins_df[self.winsorize_by] drop_percent = self.winsorize_percent / 2 lower = 0 + (drop_percent/100) upper = 1 - (drop_percent/100) # self.context.Debug("Calculated upper/lower for wins") wins = to_wins[ to_wins.between(to_wins.quantile(lower), to_wins.quantile(upper)) ] # self.context.Debug(f"Got wins, len: {len(wins)}, {wins.index}") wins_df = df.reindex(wins.index) # self.context.Debug(f"Got wins_df, len: {len(wins_df)}") dropped = df[~df.isin(wins_df)].dropna() # dropped = len(df) - len(wins_df) # self.context.Debug(f"Finished winsorizing, dropped samples len: {len(dropped)}") return wins_df
from CustomIndicators import SuperTrend, Channel import pandas as pd class SymbolData: def __init__(self, context, symbol): self.context = context self.symbol = symbol self.ticker = symbol.Value self.window_period = 500 self.bars = RollingWindow[TradeBar](self.window_period) self.hour_bars = RollingWindow[TradeBar](24) self.channel = Channel(120) self.supertrend_fifteen = SuperTrend(7, 3) self.supertrend_hour = SuperTrend(7, 3) self.ema_fast = ExponentialMovingAverage(20) self.ema_slow = ExponentialMovingAverage(50) self.fifteen_consolidator = TradeBarConsolidator(timedelta(minutes=15)) self.fifteen_consolidator.DataConsolidated += self.fifteen_consolidated context.SubscriptionManager.AddConsolidator(symbol, self.fifteen_consolidator) context.RegisterIndicator(symbol, self.channel, self.fifteen_consolidator) context.RegisterIndicator(symbol, self.supertrend_fifteen, self.fifteen_consolidator) context.RegisterIndicator(symbol, self.ema_fast, self.fifteen_consolidator) context.RegisterIndicator(symbol, self.ema_slow, self.fifteen_consolidator) self.threshold = None self.can_trade = True self.in_trade = False self.retrain_model = False self.timer = False self.timer_exp = None self.no_exit = 0 def fifteen_consolidated(self, sender, bar): self.bars.Add(bar) ### Why not use self.context.Time and create own custom hourly bars and push to own consolidated function and execute relevant logic?? t = bar.EndTime # self.context.Debug(f"{self.ticker} 15min t: {t}, close: {bar.Close}") if t.minute == 30: bars = list(self.bars)[:4] tb = self.make_tradebar(bars) self.hour_consolidated(tb) elif t.hour == 16: bars = list(self.bars)[:2] tb = self.make_tradebar(bars) self.hour_consolidated(tb) self.context.evaluate(self.symbol, bar) def make_tradebar(self, bars): tb = TradeBar() tb.Time = bars[-1].Time tb.Open = bars[-1].Open tb.High = max([bar.High for bar in bars]) tb.Low = min([bar.Low for bar in bars]) tb.Close = bars[0].Close tb.Volume = sum([bar.Volume for bar in bars]) tb.EndTime = bars[0].EndTime return tb def hour_consolidated(self, bar): self.hour_bars.Add(bar) self.supertrend_hour.Update(bar) # self.context.Debug(f"{self.ticker} 60min t: {bar.EndTime}, close: {bar.Close}") # self.context.Debug(f"{self.ticker} hour supertrend: {self.supertrend_hour.Value}") # if not self.context.IsWarmingUp: # self.context.Debug(f"Hour Consolidated, BarTime: {bar.Time}, Open: {bar.Open}, Close: {bar.Close}") def remove_consolidators(self): self.context.SubscriptionManager.RemoveConsolidator(self.symbol, self.fifteen_consolidator) # self.context.SubscriptionManager.RemoveConsolidator(self.symbol, self.hour_consolidator) self.context.Debug(f"Consolidators for {self.ticker} have been removed") def warm_up_history(self, history, consolidators=[15, 60]): hist = history[['open', 'high', 'low', 'close', 'volume']].copy() for period in consolidators: resampled = self.resample(hist, period) for idx, bar in resampled.iterrows(): tb = TradeBar() tb.Open = bar.open tb.High = bar.high tb.Low = bar.low tb.Close = bar.close tb.Volume = bar.volume tb.Time = idx - timedelta(minutes=15) tb.EndTime = idx if period == 15: self.bars.Add(tb) self.channel.Update(tb) self.supertrend_fifteen.Update(tb) self.ema_fast.Update(tb.EndTime, tb.Close) self.ema_slow.Update(tb.EndTime, tb.Close) elif period == 60: self.hour_bars.Add(tb) self.supertrend_hour.Update(tb) self.context.Debug(f"Finished warming up data for {self.ticker}, self.bars len: {self.bars.Count}") def resample(self, history, mins): logic = {'open' : 'first', 'high' : 'max', 'low' : 'min', 'close' : 'last', 'volume': 'sum'} resampled = history.resample(f'{mins}min', closed='right', label='right', base=30).apply(logic) resampled.dropna(axis=0, inplace=True) # self.context.Debug(f"resampled: {resampled.to_string()}") return resampled def IsReady(self): bars = self.bars.IsReady indicators = self.supertrend_fifteen.IsReady and self.supertrend_hour.IsReady and self.channel.IsReady return bars and indicators def build_df(self, training=False, live=False): if self.bars.Count != self.window_period: self.context.Debug(f"self.bars not full so can't build_df, len: {self.bars.Count}") return None # self.context.Debug(f"Building DF, live={live}, bars len: {self.bars.Count}") dts = [] opens = [] highs = [] lows = [] closes = [] volumes = [] for bar in self.bars: dts.append(bar.EndTime) opens.append(bar.Open) highs.append(bar.High) lows.append(bar.Low) closes.append(bar.Close) volumes.append(bar.Volume) bars_dict = {'open': opens, 'high': highs, 'low': lows, 'close': closes, 'volume': volumes} df = pd.DataFrame(bars_dict, index=dts) # self.context.Debug(f"DF len: {len(df)}") df.sort_index(axis=0, ascending=True, inplace=True) if training: df = df.tail(450) elif live: df = df.tail(160) return df def start_timer(self, td): self.timer = True self.timer_exp = self.context.Time + td self.context.Debug(f"Timer has been STARTED for {self.ticker} with expiration of {self.timer_exp}") def reset_timer(self): self.timer = False self.timer_exp = None self.context.Debug(f"Timer has been RESET for {self.ticker}")
class SuperTrend: def __init__(self, period, mult): self.Name = "SuperTrend ({}, {})".format(period, mult) self.Value = 0 self.IsReady = False self.bars = RollingWindow[IBaseData](period) self.atr = AverageTrueRange(period) self.period = period self.mult = mult self.upper_prev = 0 self.lower_prev = 0 self.trend_prev = 0 self.st_prev = 0 self.val_window = RollingWindow[float](5) def Update(self, bar): self.bars.Add(bar) self.atr.Update(bar) close = bar.Close prev_close = self.bars[1].Close if (self.bars.Count > 1) else 0 hl2 = (bar.High + bar.Low) / 2 atr = self.atr.Current.Value factor = self.mult * atr upper_basic = hl2 + factor lower_basic = hl2 - factor upper_final = upper_basic if ((upper_basic < self.upper_prev) or (prev_close > self.upper_prev)) else self.upper_prev lower_final = lower_basic if ((lower_basic > self.lower_prev) or (prev_close < self.lower_prev)) else self.lower_prev st = 0 if self.st_prev == self.upper_prev: st = upper_final if (close <= upper_final) else lower_final if self.st_prev == self.lower_prev: st = lower_final if (close >= lower_final) else upper_final self.upper_prev = upper_final self.lower_prev = lower_final self.st_prev = st self.Value = st self.val_window.Add(st) self.IsReady = self.bars.IsReady class Channel: def __init__(self, lookback, channel_type='close'): self.Name = f"Channel ({lookback})" self.Value = 0 self.IsReady = False self.channel_type = channel_type self.data = RollingWindow[IBaseData](lookback) self.upper_history = RollingWindow[float](2) self.lower_history = RollingWindow[float](2) self.mid_history = RollingWindow[float](2) self.upper = None self.lower = None self.mid = None def Update(self, bar): self.data.Add(bar) if not self.data.IsReady: return if self.channel_type == 'close': self.upper = max([bar.Close for bar in self.data]) self.lower = min([bar.Close for bar in self.data]) self.mid = (self.upper + self.lower) / 2 elif self.channel_type == 'high_low': self.upper = max([bar.High for bar in self.data]) self.lower = min([bar.Low for bar in self.data]) self.mid = (self.upper + self.lower) / 2 self.upper_history.Add(self.upper) self.lower_history.Add(self.lower) self.mid_history.Add(self.mid) if self.IsReady is False and self.upper_history.IsReady: self.IsReady = True