Overall Statistics |
Total Trades 261 Average Win 0.07% Average Loss -0.02% Compounding Annual Return 4.308% Drawdown 0.700% Expectancy 0.906 Net Profit 2.857% Sharpe Ratio 3.141 Probabilistic Sharpe Ratio 99.837% Loss Rate 49% Win Rate 51% Profit-Loss Ratio 2.75 Alpha 0.03 Beta 0.004 Annual Standard Deviation 0.009 Annual Variance 0 Information Ratio 0.427 Tracking Error 0.173 Treynor Ratio 7.902 Total Fees $1697.60 Estimated Strategy Capacity $380000.00 Lowest Capacity Asset ALNA 2T |
""" Basic Liquidation System Strategy @version: 0.5 @creation date: 10/06/2022 - At Open, do 15min VWAP/TWAP entry for a total position of $15,000. - At 9:45 set stop at HOD - At 10:05 if P/L > 0, exit 50%. If P/L < 0, exit 100% (Adjust stop size accordingly) - At 10:30 exit all. """ import pandas as pd from io import StringIO from AlgorithmImports import * from ast import literal_eval TICKERS_CSV = "https://drive.google.com/uc?export=download&id=1did0Sk3F9Sn5Il_nUX252jOB_n0UFqat" AGG_OPS = {"open": "first", "close": "last", "high": "max", "low": "min", "volume": "sum"} class LiquidationBasic(QCAlgorithm): def Initialize(self): self.capital = literal_eval(self.GetParameter("capital")) self.entry_size = literal_eval(self.GetParameter("entry_size")) # Negative value for shorts self.wap_type = literal_eval(self.GetParameter("wap_type")) # VWAP or TWAP self.wap_res = literal_eval(self.GetParameter("wap_resolution")) # Resolution, in seconds, for WAP calculation self.wap_fract = self.wap_res/(15*60) self.SetCash(self.capital) # Set Strategy Cash self.SetStartDate(2019, 5, 1) self.SetEndDate(2022, 6, 1) csv = StringIO(self.Download(TICKERS_CSV)) self.overhang = pd.read_csv(csv, parse_dates=["Agreement Start Date"], dayfirst=True) self.overhang["Date"] = self.overhang["Agreement Start Date"].dt.date self.AddUniverse(self.coarse_filter) self.resolution = Resolution.Second self.UniverseSettings.Resolution = self.resolution self.SetSecurityInitializer(lambda x: x.SetMarketPrice(self.GetLastKnownPrice(x))) every_day = self.DateRules.EveryDay() every_second = self.TimeRules.Every(TimeSpan.FromSeconds(self.wap_res)) at = self.TimeRules.At self.Schedule.On(every_day, every_second, self.open_trade) self.Schedule.On(every_day, at(9, 45), self.set_stop) self.Schedule.On(every_day, at(10, 5), self.adjust_position) self.Schedule.On(every_day, at(10, 30), self.close_trade) def open_trade(self): if time(9, 30) < self.Time.time() < time(9, 45): symbols = list(self.ActiveSecurities.Keys) history = self.History(symbols, self.Time.date(), self.Time, resolution=self.resolution) if len(history) > 0: self.Transactions.CancelOpenOrders() for symbol in symbols: order_value = self.entry_size*self.wap_fract price = self.Securities[symbol].Price quantity = int(order_value / price) self.LimitOrder(symbol, quantity, price) def set_stop(self): symbols = self.get_owned_stocks() history = self.History(symbols, self.Time.date(), self.Time, resolution=self.resolution) if len(history) > 0: self.Debug(f"{self.Time} - Set Stop") self.Transactions.CancelOpenOrders() today_bar = history.groupby("symbol").agg(AGG_OPS) limits = today_bar.eval("high + (high - low)*0.5") # Intra range 1.05 as limit price for s in symbols: self.StopLimitOrder(s, -self.Portfolio[s].Quantity, today_bar["high"][s], limits[s]) def adjust_position(self): symbols = self.get_owned_stocks() history = self.History(symbols, self.Time.date(), self.Time, resolution=self.resolution) if len(history) > 0: self.Debug(f"{self.Time} - Adjust Position") self.Transactions.CancelOpenOrders() today_bar = history.groupby("symbol").agg(AGG_OPS) limits = today_bar.eval("high + (high - low)*0.5") # Intra range 1.05 as limit price for s in symbols: pl = self.Portfolio[s].get_Profit() \ + self.Portfolio[s].get_UnrealizedProfit() price = self.Securities[s].Price qty = self.Portfolio[s].Quantity if pl > 0: self.LimitOrder(s, -int(qty/2), price) self.StopLimitOrder(s, int(qty/2)-qty, today_bar["high"][s], limits[s]) else: self.LimitOrder(s, -int(qty), price) def get_owned_stocks(self): return [s for s in self.ActiveSecurities.Keys if self.Portfolio[s].Quantity != 0] def close_trade(self): if len(list(self.ActiveSecurities.Keys)) > 0: self.Debug(f"{self.Time} - Close Trade") self.Transactions.CancelOpenOrders() self.Liquidate() def coarse_filter(self, coarse): tickers = self.overhang.query("Date == @self.Time.date()") universe = [] if len(tickers) == 0 else \ [x.Symbol for x in coarse if (x.Symbol.Value == tickers["ticker"]).any()] self.Debug(f"{self.Time} - Universe {len(tickers)} tickers") return universe
""" Basic Liquidation System Strategy @version: 0.7 @creation date: 10/06/2022 - At Open, do 15min VWAP/TWAP entry for a total position of $15,000. - At 9:45 set stop at HOD - At 10:05 if P/L > 0, exit 50%. If P/L < 0, exit 100% (Adjust stop size accordingly) - At 10:30 exit all. """ import pandas as pd from io import StringIO from AlgorithmImports import * from ast import literal_eval TICKERS_CSV = "https://drive.google.com/uc?export=download&id=1did0Sk3F9Sn5Il_nUX252jOB_n0UFqat" AGG_OPS = {"open": "first", "close": "last", "high": "max", "low": "min", "volume": "sum"} class LiquidationBasic(QCAlgorithm): def Initialize(self): self.capital = literal_eval(self.GetParameter("capital")) self.entry_size = literal_eval(self.GetParameter("entry_size")) # Negative value for shorts self.wap_type = literal_eval(self.GetParameter("wap_type")) # VWAP or TWAP self.wap_res = literal_eval(self.GetParameter("wap_resolution")) # Resolution, in seconds, for WAP calculation self.wap_fract = self.wap_res/(15*60) self.SetCash(self.capital) # Set Strategy Cash self.SetStartDate(2019, 5, 1) self.SetEndDate(2022, 6, 1) csv = StringIO(self.Download(TICKERS_CSV)) self.overhang = pd.read_csv(csv, parse_dates=["Agreement Start Date"], dayfirst=True) self.overhang["Date"] = self.overhang["Agreement Start Date"].dt.date self.AddUniverse(self.coarse_filter) self.resolution = Resolution.Second self.UniverseSettings.Resolution = self.resolution self.SetSecurityInitializer(lambda x: x.SetMarketPrice(self.GetLastKnownPrice(x))) every_day = self.DateRules.EveryDay() every_second = self.TimeRules.Every(TimeSpan.FromSeconds(self.wap_res)) at = self.TimeRules.At self.Schedule.On(every_day, every_second, self.open_trade) self.Schedule.On(every_day, at(9, 45), self.set_stop) self.Schedule.On(every_day, at(10, 5), self.adjust_position) self.Schedule.On(every_day, at(10, 30), self.close_trade) def open_trade(self): if time(9, 30) < self.Time.time() < time(9, 45): symbols = list(self.ActiveSecurities.Keys) history = self.History(symbols, self.Time.date(), self.Time, resolution=self.resolution) if len(history) > 0: self.Transactions.CancelOpenOrders() for symbol in symbols: order_value = self.entry_size*self.wap_fract price = self.Securities[symbol].Price quantity = int(order_value / price) self.LimitOrder(symbol, quantity, price) def set_stop(self): symbols = self.get_owned_stocks() history = self.History(symbols, self.Time.date(), self.Time, resolution=self.resolution) if len(history) > 0: self.Debug(f"{self.Time} - Set Stop") self.Transactions.CancelOpenOrders() today_bar = history.groupby("symbol").agg(AGG_OPS) limits = today_bar.eval("high + (high - low)*0.05") # Intra range 1.05 as limit price for s in symbols: self.StopLimitOrder(s, -self.Portfolio[s].Quantity, today_bar["high"][s], limits[s]) def adjust_position(self): symbols = self.get_owned_stocks() history = self.History(symbols, self.Time.date(), self.Time, resolution=self.resolution) if len(history) > 0: self.Debug(f"{self.Time} - Adjust Position") self.Transactions.CancelOpenOrders() today_bar = history.groupby("symbol").agg(AGG_OPS) limits = today_bar.eval("high + (high - low)*0.05") # Intra range 1.05 as limit price for s in symbols: pl = self.Portfolio[s].get_Profit() \ + self.Portfolio[s].get_UnrealizedProfit() price = self.Securities[s].Price qty = self.Portfolio[s].Quantity if pl > 0: self.LimitOrder(s, -int(qty/2), price) self.StopLimitOrder(s, int(qty/2)-qty, today_bar["high"][s], limits[s]) else: self.LimitOrder(s, -int(qty), price) def get_owned_stocks(self): return [s for s in self.ActiveSecurities.Keys if self.Portfolio[s].Quantity != 0] def close_trade(self): if len(list(self.ActiveSecurities.Keys)) > 0: self.Debug(f"{self.Time} - Close Trade") self.Transactions.CancelOpenOrders() self.Liquidate() def coarse_filter(self, coarse): tickers = self.overhang.query("Date == @self.Time.date()") universe = [] if len(tickers) == 0 else \ [x.Symbol for x in coarse if (x.Symbol.Value == tickers["ticker"]).any()] self.Debug(f"{self.Time} - Universe {len(tickers)} tickers") return universe
""" Basic ML Liquidation Strategy @version: 0.4 @creation date: 16/07/2022 - At Open, predict the probability of success (triple labeling, up, stop loss or null) - Enter with limit order at 9:45 price - Exit with limit order at 10:30 price - Liquidate at 15:55 if there are still positions open """ from AlgorithmImports import * import numpy as np import pandas as pd from io import StringIO from ast import literal_eval from sklearn.model_selection import cross_val_score from sklearn.ensemble import GradientBoostingClassifier import indicators as idx pd.set_option('mode.use_inf_as_na', True) TICKERS_CSV = "https://drive.google.com/uc?export=download&id=1bzOypNRbhLMRsQzS5DJUxG0OaIRi7hI8" AGG_OPS = {"open": "first", "close": "last", "high": "max", "low": "min", "volume": "sum"} class LiquidationBasicML(QCAlgorithm): def Initialize(self): self.SetStartDate(2021, 10, 1) self.SetEndDate(2022, 6, 1) self.model = GradientBoostingClassifier(n_iter_no_change=1) self.benchmark = self.GetParameter("benchmark") self.entry_size = -abs(literal_eval(self.GetParameter("entry_size"))) self.capital = literal_eval(self.GetParameter("capital")) self.SetCash(self.capital) self.atms = self.get_atms() self.AddEquity(self.benchmark, Resolution.Daily) self.SetBenchmark(self.benchmark) self.test_score = 0 self.Train(self.DateRules.MonthStart(), self.TimeRules.At(0, 0), self.train_model) every_day = self.DateRules.EveryDay(self.benchmark) at = self.TimeRules.At self.Schedule.On(every_day, at(9, 45), self.enter_trades) self.Schedule.On(every_day, at(10, 30), self.exit_trades) self.Schedule.On(every_day, at(11, 30), self.liquidate) def train_model(self): train_start = self.atms.index.get_level_values("time").min() x, y = self.get_data(train_start, self.Time) fit_params = dict(sample_weight=abs(y)) cv_scores = cross_val_score(self.model, X=x, y=(y>0).astype(float), scoring="precision", fit_params=fit_params) self.test_score = np.mean(cv_scores) self.model.fit(x, (y > 0).astype(float), **fit_params) self.Debug(f"Training Points: {len(x)} Test Score: {self.test_score:.1%}") self.Plot("ML", "Test Score", self.test_score) def enter_trades(self): start, end = self.Time-timedelta(1), self.Time today_atms = self.atms.query("(time >= @start) and (time <= @end)") if (len(today_atms) > 0) and (self.test_score > 0.5): x_pred = self.get_data(start, end, with_target=False) if len(x_pred) > 0: y_proba = pd.Series(self.model.predict_proba(x_pred)[:, 1], index=x_pred.index).groupby("symbol").last() positions = y_proba[y_proba > 0.5] for symbol, position in positions.items(): order_value = self.entry_size * position self.Debug(f"{self.Time} - Trading {symbol} Value: {order_value}") self.MarketOrder(symbol, order_value) else: self.Debug("No data available!") def exit_trades(self): for s in self.get_owned_stocks(): self.Transactions.CancelOpenOrders(s) self.LimitOrder(s, -self.Portfolio[s].Quantity, self.Portfolio[s].Price) def liquidate(self): self.Transactions.CancelOpenOrders() self.Liquidate() #for s in self.get_owned_stocks(): # self.LimitOrder(s, -self.Portfolio[s].Quantity, 2*self.Portfolio[s].Price) def get_owned_stocks(self): return [s for s in self.ActiveSecurities.Keys if self.Portfolio[s].Quantity != 0] def get_data(self, start, end, with_target=True): datapoints = self.atms.query("(time >= @start) and (time <= @end)") minute_bars = [self.History([s], t, min(t + timedelta(1), self.Time), Resolution.Minute) for s, t in datapoints.index] minute_bars = pd.concat(minute_bars) opening_bars = idx.filter_bars(minute_bars, "09:30", "09:45") day_grouper = [pd.Grouper(level="symbol"), pd.Grouper(level="time", freq="1D")] opening_bar = opening_bars.groupby(day_grouper).agg(AGG_OPS) features = pd.DataFrame() #atm_capacity = self.atms.loc[opening_bar.index, "Total ATM Capacity"] #features["opening_volume_atm"] = opening_bar.eval("volume*close") / atm_capacity features["opening_range"] = opening_bar.eval("(close-low)/(high-low)") if len(features.dropna()) == 0: self.Debug("No data available!") features.dropna(inplace=True) if with_target: target_bars = idx.filter_bars(minute_bars, "09:45", "10:30") target_bar = target_bars.groupby(day_grouper).agg(AGG_OPS) returns = target_bar.eval("1-close/open").apply(np.log1p).dropna() index = returns.index.intersection(features.index) return features.loc[index], returns.loc[index] else: return features def get_atms(self): csv = StringIO(self.Download(TICKERS_CSV)) atms = pd.read_csv(csv, parse_dates=["Date"], dayfirst=False) atms["Date"] = atms["Date"].dt.date atms.rename(columns={"Date": "time", "Symbol": "symbol"}, inplace=True) [self.AddEquity(s, Resolution.Minute) for s in atms["symbol"].unique()] atms["symbol"] = atms["symbol"].apply(lambda x: str(self.Symbol(x).ID)) atms.set_index(["symbol", "time"], inplace=True) return atms[~atms.index.duplicated(keep='first')]
""" Basic ML Liquidation Strategy @version: 0.8 @creation date: 16/07/2022 ! Added gap size and vwap divergence to features ! At Open, predict the probability of success (triple labeling, up, stop loss or null) ! Enter with limit order at 9:45 price ! Exit with limit order at 10:30 price ! Liquidate at 15:55 if there are still positions open ? multiple entry points """ from AlgorithmImports import * import numpy as np import pandas as pd from io import StringIO from ast import literal_eval from sklearn.model_selection import cross_val_score from sklearn.ensemble import GradientBoostingClassifier import indicators as idx from timeseriescv import TimeSeriesSplitGroups pd.set_option('mode.use_inf_as_na', True) TICKERS_CSV = "https://drive.google.com/uc?export=download&id=1bzOypNRbhLMRsQzS5DJUxG0OaIRi7hI8" AGG_OPS = {"open": "first", "close": "last", "high": "max", "low": "min", "volume": "sum"} class LiquidationBasicML(QCAlgorithm): def Initialize(self): self.SetStartDate(2021, 10, 1) self.SetEndDate(2022, 6, 1) self.model = GradientBoostingClassifier(n_iter_no_change=3) self.benchmark = self.GetParameter("benchmark") self.capital = literal_eval(self.GetParameter("capital")) self.SetCash(self.capital) self.atms = self.get_atms() self.train_start = self.atms.index.get_level_values("time").min() self.AddEquity(self.benchmark, Resolution.Minute) self.SetBenchmark(self.benchmark) self.precision = 0 self.cv = TimeSeriesSplitGroups(n_splits=10) self.Train(self.DateRules.MonthStart(), self.TimeRules.At(0, 0), self.train_model) every_day = self.DateRules.EveryDay(self.benchmark) at = self.TimeRules.At self.Schedule.On(every_day, at(9, 45), self.enter_trades) self.Schedule.On(every_day, at(10, 30), self.exit_trades) self.Schedule.On(every_day, at(11, 30), self.liquidate) def train_model(self): x, y = self.get_data(self.train_start, self.Time) fit_params = dict(sample_weight=abs(np.log1p(y))) time_groups = x.index.get_level_values("time") cv_scores = cross_val_score(self.model, X=x, y=(y>0).astype(float), cv=self.cv, groups=time_groups, scoring="precision", fit_params=fit_params) self.precision = np.mean(cv_scores) self.model.fit(x, (y > 0).astype(float), **fit_params) self.Debug(f"Training Points: {len(x)} Precision: {self.precision:.1%}") self.Plot("ML", "Precision", self.precision) def enter_trades(self): start, end = self.Time-timedelta(1), self.Time today_atms = self.atms.query("(time >= @start) and (time <= @end)") if (len(today_atms) > 0) and (self.precision > 0.5): x_pred = self.get_data(start, end, with_target=False) y_pred = pd.Series(self.model.predict(x_pred), index=x_pred.index.get_level_values("symbol")) # scaling the position by the predicted probability of going up # and model precision (not accuracy, True Positives / (TP + FP)) trades = y_pred[y_pred == 1] for symbol in trades.index: self.Debug(f"{self.Time} - Trading {symbol}") qty = self.CalculateOrderQuantity(symbol, -0.01) last_price = self.Securities[symbol].Price self.LimitOrder(symbol, qty, last_price) def exit_trades(self): for s in self.get_owned_stocks(): self.Transactions.CancelOpenOrders(s) self.LimitOrder(s, -self.Portfolio[s].Quantity, self.Portfolio[s].Price) def liquidate(self): self.Transactions.CancelOpenOrders() self.Liquidate() def get_owned_stocks(self): return [s for s in self.ActiveSecurities.Keys if self.Portfolio[s].Quantity != 0] def get_data(self, start, end, with_target=True): datapoints = self.atms.query("(time >= @start) and (time <= @end)") minute_bars = [] for symbol, date in datapoints.index: from_date = self.get_last_day(date) to_date = min(date + timedelta(1), self.Time) minute_bars += [self.History([symbol], from_date, to_date, Resolution.Minute)] minute_bars = pd.concat(minute_bars) grouper = [pd.Grouper(level="symbol"), pd.Grouper(level="time", freq="1D")] opening_bar = agg_bars(minute_bars, "09:31", "09:45", grouper) pm_bar = agg_bars(minute_bars, "07:01", "09:30", grouper) day_bar = agg_bars(minute_bars, "09:31", "16:00", grouper) features = pd.DataFrame() features["opening_range"] = opening_bar.eval("(close-low)/(high-low)") features["pm_volume_usd"] = pm_bar.eval("close * volume") opening_bars = idx.filter_bars(minute_bars, "09:31", "09:45") vwap = idx.intra_vwap(opening_bars).groupby(grouper).last() features["divergence"] = vwap / opening_bar["close"] - 1 # TODO: Add shares as % of float yesterday_bar = day_bar["close"].groupby("symbol").shift(1) features["gap"] = day_bar["open"]/yesterday_bar - 1 features.dropna(inplace=True) if with_target: target_bar = agg_bars(minute_bars, "09:46", "10:30", grouper) returns = -target_bar.eval("close/open - 1").dropna() # Inverted return since we are shorting index = returns.index.intersection(features.index) return features.loc[index], returns.loc[index] else: return features def get_atms(self): csv = StringIO(self.Download(TICKERS_CSV)) atms = pd.read_csv(csv, parse_dates=["Date"], dayfirst=False) atms["Date"] = atms["Date"].dt.date atms.rename(columns={"Date": "time", "Symbol": "symbol"}, inplace=True) [self.AddEquity(s, Resolution.Minute, extendedMarketHours=True) for s in atms["symbol"].unique()] atms["symbol"] = atms["symbol"].apply(lambda x: str(self.Symbol(x).ID)) atms.set_index(["symbol", "time"], inplace=True) return atms[~atms.index.duplicated(keep='first')] def get_last_day(self, date): trade_days = self.TradingCalendar.GetTradingDays(date - timedelta(7), date - timedelta(1)) return list(filter(lambda p: p.BusinessDay and not p.PublicHoliday, trade_days))[-1].Date def agg_bars(minute_bars, start_time, end_time, grouper): filtered_bars = idx.filter_bars(minute_bars, start_time, end_time) return filtered_bars.groupby(grouper).agg(AGG_OPS)
""" Basic ML Liquidation Strategy @version: 0.9 @creation date: 16/07/2022 ! Added gap size and vwap divergence to features ! Reading the ATM file and dynamic gap filtering TODO: multiple entry points TODO: Offline storage of features to improve performance """ from AlgorithmImports import * import numpy as np import pandas as pd from io import StringIO from ast import literal_eval from sklearn.model_selection import cross_val_score from sklearn.ensemble import GradientBoostingClassifier import indicators as idx from timeseriescv import TimeSeriesSplitGroups pd.set_option('mode.use_inf_as_na', True) TICKERS_CSV = "https://drive.google.com/uc?export=download&id=1did0Sk3F9Sn5Il_nUX252jOB_n0UFqat" DATE_COLUMN = "Agreement Start Date" SYMBOL_COLUMN = "ticker" MIN_GAP = 0.15 AGG_OPS = {"open": "first", "close": "last", "high": "max", "low": "min", "volume": "sum"} class LiquidationBasicML(QCAlgorithm): def Initialize(self): self.SetStartDate(2021, 10, 1) self.SetEndDate(2022, 6, 1) self.model = GradientBoostingClassifier(n_iter_no_change=3) self.benchmark = self.GetParameter("benchmark") self.capital = literal_eval(self.GetParameter("capital")) self.SetCash(self.capital) self.atms = self.get_atms() self.train_start = self.atms.index.get_level_values("time").min() self.AddEquity(self.benchmark, Resolution.Minute) self.SetBenchmark(self.benchmark) self.precision = 0 self.cv = TimeSeriesSplitGroups(n_splits=10) self.Train(self.DateRules.MonthStart(), self.TimeRules.At(0, 0), self.train_model) every_day = self.DateRules.EveryDay(self.benchmark) at = self.TimeRules.At self.Schedule.On(every_day, at(9, 45), self.enter_trades) self.Schedule.On(every_day, at(10, 30), self.exit_trades) self.Schedule.On(every_day, at(11, 30), self.liquidate) def train_model(self): x, y = self.get_data(self.train_start, self.Time) fit_params = dict(sample_weight=abs(np.log1p(y))) time_groups = x.index.get_level_values("time") cv_scores = cross_val_score(self.model, X=x, y=(y>0).astype(float), cv=self.cv, groups=time_groups, scoring="precision", fit_params=fit_params) self.precision = np.nanmean(cv_scores) self.model.fit(x, (y > 0).astype(float), **fit_params) self.Debug(f"Training Points: {len(x)} Precision: {self.precision:.1%}") self.Plot("ML", "Precision", self.precision) def enter_trades(self): if self.precision > 0.5: start, end = self.Time-timedelta(1), self.Time x_pred = self.get_data(start, end, with_target=False) x_pred = x_pred.query("time == @self.Time.date()") if len(x_pred) > 0: y_pred = pd.Series(self.model.predict(x_pred), index=x_pred.index.get_level_values("symbol")) trades = y_pred[y_pred == 1] # Trading only the stocks predicted to go down (label=1) for symbol in trades.index: self.Debug(f"{self.Time} - Trading {symbol}") qty = self.CalculateOrderQuantity(symbol, -0.01) last_price = self.Securities[symbol].Price self.LimitOrder(symbol, qty, last_price) def exit_trades(self): for s in self.get_owned_stocks(): self.Transactions.CancelOpenOrders(s) self.LimitOrder(s, -self.Portfolio[s].Quantity, self.Portfolio[s].Price) def liquidate(self): self.Transactions.CancelOpenOrders() self.Liquidate() def get_owned_stocks(self): return [s for s in self.ActiveSecurities.Keys if self.Portfolio[s].Quantity != 0] def get_data(self, start, end, with_target=True): datapoints = self.atms.query("(time >= @start) and (time <= @end)") minute_bars = [] for symbol, date in datapoints.index: from_date = self.get_last_day(date) to_date = min(date + timedelta(1), self.Time) minute_bars += [self.History([symbol], from_date, to_date, Resolution.Minute)] minute_bars = pd.concat(minute_bars) grouper = [pd.Grouper(level="symbol"), pd.Grouper(level="time", freq="1D")] opening_bar = agg_bars(minute_bars, "09:31", "09:45", grouper) pm_bar = agg_bars(minute_bars, "07:01", "09:30", grouper) day_bar = agg_bars(minute_bars, "09:31", "16:00", grouper) features = pd.DataFrame() features["opening_range"] = opening_bar.eval("(close-low)/(high-low)") features["pm_volume"] = pm_bar.eval("volume") opening_bars = idx.filter_bars(minute_bars, "09:31", "09:45") vwap = idx.intra_vwap(opening_bars).groupby(grouper).last() features["divergence"] = vwap / opening_bar["close"] - 1 # TODO: Add shares as % of float yesterday_bar = day_bar["close"].groupby("symbol").shift(1) features["gap"] = day_bar["open"]/yesterday_bar - 1 features = features.query("gap >= @MIN_GAP") # Not including also negative gaps? last_atms = datapoints.groupby("symbol").last() features = features.join(last_atms[["Total ATM Capacity", "atm_date"]]) features.rename(columns={"Total ATM Capacity": "atm_shares", "atm_date": "atm_days"}, inplace=True) features.eval("pm_volume_atm = pm_volume / atm_shares", inplace=True) time_idx = features.index.get_level_values("time") features["atm_days"] = (time_idx - features["atm_days"]) / np.timedelta64(1, 'D') features = features.query("atm_days <= 365") features.dropna(inplace=True) if with_target: target_bar = agg_bars(minute_bars, "09:46", "10:30", grouper) returns = -target_bar.eval("close/open - 1").dropna() # Inverted return since we are shorting index = returns.index.intersection(features.index) return features.loc[index], returns.loc[index] else: return features def get_atms(self): csv = StringIO(self.Download(TICKERS_CSV)) atms = pd.read_csv(csv, parse_dates=[DATE_COLUMN], dayfirst=True, thousands=",") atms[DATE_COLUMN] = atms[DATE_COLUMN].dt.date atms.rename(columns={DATE_COLUMN: "time", SYMBOL_COLUMN: "symbol"}, inplace=True) [self.AddEquity(s, Resolution.Minute, extendedMarketHours=True) for s in atms["symbol"].unique()] atms["symbol"] = atms["symbol"].apply(lambda x: str(self.Symbol(x).ID)) atms.set_index(["symbol", "time"], inplace=True) atms["atm_date"] = atms.index.get_level_values("time") return atms.query("OfferingType == 'ATM'") def get_last_day(self, date): trade_days = self.TradingCalendar.GetTradingDays(date - timedelta(7), date - timedelta(1)) return list(filter(lambda p: p.BusinessDay and not p.PublicHoliday, trade_days))[-1].Date def agg_bars(minute_bars, start_time, end_time, grouper): filtered_bars = idx.filter_bars(minute_bars, start_time, end_time) return filtered_bars.groupby(grouper).agg(AGG_OPS)
""" Basic ML Liquidation Strategy @version: 0.11 @creation date: 16/07/2022 ! Added gap size and vwap divergence to features ! Reading the ATM file and dynamic gap filtering TODO: multiple entry points TODO: Offline storage of features to improve performance """ from AlgorithmImports import * import numpy as np import pandas as pd pd.set_option('mode.use_inf_as_na', True) from io import StringIO from ast import literal_eval from sklearn.model_selection import cross_val_score from sklearn.ensemble import GradientBoostingClassifier import indicators as idx from timeseriescv import TimeSeriesSplitGroups TICKERS_CSV = "https://drive.google.com/uc?export=download&id=1did0Sk3F9Sn5Il_nUX252jOB_n0UFqat" DATE_COLUMN = "Agreement Start Date" SYMBOL_COLUMN = "ticker" MIN_GAP = 0.15 AGG_OPS = {"open": "first", "close": "last", "high": "max", "low": "min", "volume": "sum"} class LiquidationBasicML(QCAlgorithm): def Initialize(self): self.SetStartDate(2021, 10, 27) self.SetEndDate(2022, 6, 1) self.model = GradientBoostingClassifier(n_iter_no_change=3) self.benchmark = self.GetParameter("benchmark") self.capital = literal_eval(self.GetParameter("capital")) self.SetCash(self.capital) self.atms = self.get_atms() self.train_start = self.atms.index.get_level_values("time").min() self.AddEquity(self.benchmark, Resolution.Minute) self.SetBenchmark(self.benchmark) self.precision = 0 self.cv = TimeSeriesSplitGroups(n_splits=10) self.Train(self.DateRules.MonthStart(), self.TimeRules.At(0, 0), self.train_model) every_day = self.DateRules.EveryDay(self.benchmark) at = self.TimeRules.At self.Schedule.On(every_day, at(9, 45), self.enter_trades) self.Schedule.On(every_day, at(10, 30), self.exit_trades) self.Schedule.On(every_day, at(11, 30), self.liquidate) def train_model(self): x, y = self.get_data(self.train_start, self.Time) fit_params = dict(sample_weight=abs(np.log1p(y))) time_groups = x.index.get_level_values("time") cv_scores = cross_val_score(self.model, X=x, y=(y>0).astype(float), cv=self.cv, groups=time_groups, scoring="precision", fit_params=fit_params) self.precision = np.mean(cv_scores) self.model.fit(x, (y > 0).astype(float), **fit_params) self.Debug(f"Training Points: {len(x)} Precision: {self.precision:.1%}") self.Plot("ML", "Precision", self.precision) def enter_trades(self): if self.precision <= 0.5: return start, end = self.get_last_day(self.Time), self.Time x_pred = self.get_data(start, end, with_target=False) if x_pred is None: return y_pred = pd.Series(self.model.predict(x_pred), index=x_pred.index.get_level_values("symbol")) trades = y_pred[y_pred == 1] # Trading only the stocks predicted to go down (label=1) for symbol in trades.index: self.Debug(f"{self.Time} - Trading {symbol}") qty = self.CalculateOrderQuantity(symbol, -0.01) last_price = self.Securities[symbol].Price self.LimitOrder(symbol, qty, last_price) def exit_trades(self): for s in self.get_owned_stocks(): self.Transactions.CancelOpenOrders(s) self.LimitOrder(s, -self.Portfolio[s].Quantity, self.Portfolio[s].Price) def liquidate(self): self.Transactions.CancelOpenOrders() self.Liquidate() def get_owned_stocks(self): return [s for s in self.ActiveSecurities.Keys if self.Portfolio[s].Quantity != 0] def get_data(self, start, end, with_target=True): gaplist = self.get_gaplist(start, end) if len(gaplist) == 0: return None minute_bars = [] for symbol, day in gaplist.index: end = min(day + timedelta(1), self.Time) minute_bars += [self.History([symbol], day, end, Resolution.Minute)] minute_bars = pd.concat(minute_bars) if len(minute_bars) == 0: return None # TODO: Fix gaplist function opening_bar = agg_bars(minute_bars, "09:31", "09:45") pm_bar = agg_bars(minute_bars, "07:01", "09:30") features = gaplist.to_frame("gap") features["opening_range"] = opening_bar.eval("(close-low)/(high-low)") features["pm_volume_usd"] = pm_bar.eval("close * volume") opening_bars = idx.filter_bars(minute_bars, "09:31", "09:45") grouper = [pd.Grouper(level="symbol"), pd.Grouper(level="time", freq="1D")] vwap = idx.intra_vwap(opening_bars).groupby(grouper).last() features["divergence"] = vwap / opening_bar["close"] - 1 # TODO: Add shares as % of float """ TODO: TO BE REVIEWED atm_start = start - timedelta(365) valid_atms = self.atms.query("(time >= @atm_start) and (time <= @end)") last_atms = valid_atms.groupby("symbol").last() features = features.join(last_atms[["Total ATM Capacity", "atm_date"]]) features.rename(columns={"Total ATM Capacity": "atm_shares", "atm_date": "atm_days"}, inplace=True) features.eval("pm_volume_atm = pm_volume / atm_shares", inplace=True) time_idx = features.index.get_level_values("time") features["atm_days"] = (time_idx - features["atm_days"]) / np.timedelta64(1, 'D') features = features.query("atm_days <= 365") """ features.dropna(inplace=True) if with_target: target_bar = agg_bars(minute_bars, "09:46", "10:30") returns = -target_bar.eval("close/open - 1").dropna() # Inverted return since we are shorting index = returns.index.intersection(features.index) return features.loc[index], returns.loc[index] else: return features def get_atms(self): csv = StringIO(self.Download(TICKERS_CSV)) atms = pd.read_csv(csv, parse_dates=[DATE_COLUMN], dayfirst=True, thousands=",") atms[DATE_COLUMN] = atms[DATE_COLUMN].dt.date atms.rename(columns={DATE_COLUMN: "time", SYMBOL_COLUMN: "symbol"}, inplace=True) [self.AddEquity(s, Resolution.Minute, extendedMarketHours=True) for s in atms["symbol"].unique()] atms["symbol"] = atms["symbol"].apply(lambda x: str(self.Symbol(x).ID)) atms.set_index(["symbol", "time"], inplace=True) atms["atm_date"] = atms.index.get_level_values("time") return atms.query("OfferingType == 'ATM'") def get_gaplist(self, start, end): atm_start = start - timedelta(365) valid_atms = self.atms.query("(time >= @atm_start) and (time <= @end)") symbols = valid_atms.index.get_level_values("symbol").unique().tolist() day_start = self.get_last_day(start) day_bars = self.History(symbols, day_start, end, Resolution.Daily) last_index = day_bars.index.get_level_values("time").max() if end > last_index: # adding manually the last day bar if missing day_start = end.replace(hour=9, minute=30, second=0) day_end = min(end.replace(hour=16, minute=0, second=0), self.Time) last_day_bars = self.History(symbols, day_start, day_end, Resolution.Minute) last_day_bar = agg_bars(last_day_bars, "09:31", "16:00") day_bars = pd.concat([day_bars, last_day_bar]) yesterday_bars = day_bars.groupby("symbol").shift(1) gaps = day_bars["open"]/yesterday_bars["close"]-1 return gaps[gaps > MIN_GAP] def get_last_day(self, date): trade_days = self.TradingCalendar.GetTradingDays(date - timedelta(7), date - timedelta(1)) return list(filter(lambda p: p.BusinessDay and not p.PublicHoliday, trade_days))[-1].Date def agg_bars(minute_bars, start_time, end_time): grouper = [pd.Grouper(level="symbol"), pd.Grouper(level="time", freq="1D")] filtered_bars = idx.filter_bars(minute_bars, start_time, end_time) return filtered_bars.groupby(grouper).agg(AGG_OPS)
""" Basic ML Liquidation Strategy @version: 0.12 @creation date: 16/07/2022 ! Added gap size and vwap divergence to features ! Reading the ATM file and dynamic gap filtering TODO: multiple entry points TODO: Offline storage of features to improve performance """ from AlgorithmImports import * import numpy as np import pandas as pd pd.set_option('mode.use_inf_as_na', True) from io import StringIO from ast import literal_eval from sklearn.model_selection import cross_val_score from sklearn.ensemble import GradientBoostingClassifier import indicators as idx from timeseriescv import TimeSeriesSplitGroups TICKERS_CSV = "https://drive.google.com/uc?export=download&id=1did0Sk3F9Sn5Il_nUX252jOB_n0UFqat" DATE_COLUMN = "Agreement Start Date" SYMBOL_COLUMN = "ticker" MIN_GAP = 0.15 AGG_OPS = {"open": "first", "close": "last", "high": "max", "low": "min", "volume": "sum"} class LiquidationBasicML(QCAlgorithm): def Initialize(self): self.SetStartDate(2021, 10, 1) self.SetEndDate(2022, 6, 1) self.model = GradientBoostingClassifier(n_iter_no_change=3) self.benchmark = self.GetParameter("benchmark") self.capital = literal_eval(self.GetParameter("capital")) self.SetCash(self.capital) self.atms = self.get_atms() self.train_start = self.atms.index.get_level_values("time").min() self.AddEquity(self.benchmark, Resolution.Minute) self.SetBenchmark(self.benchmark) self.confidence = 0 self.cv = TimeSeriesSplitGroups(n_splits=10) self.UniverseSettings.DataNormalizationMode = DataNormalizationMode.Raw every_day = self.DateRules.EveryDay(self.benchmark) at = self.TimeRules.At self.Train(self.DateRules.MonthStart(), at(0, 0), self.train_model) self.Schedule.On(every_day, at(9, 45), self.enter_trades) self.Schedule.On(every_day, at(10, 30), self.exit_trades) self.Schedule.On(every_day, at(11, 30), self.liquidate) def train_model(self): x, y = self.get_data(self.train_start, self.Time, with_target=True) fit_params = dict(sample_weight=abs(np.log1p(y))) time_groups = x.index.get_level_values("time") y_binary = (y>0).astype(float) cv_scores = cross_val_score(self.model, X=x, y=y_binary, cv=self.cv, groups=time_groups, fit_params=fit_params) self.confidence = max(np.mean(cv_scores) - 0.5, 0) * 2 self.model.fit(x, y_binary, **fit_params) self.Debug(f"Training Points: {len(x)} Confidence: {self.confidence:.1%}") self.Plot("ML", "Confidence", self.confidence) def enter_trades(self): if self.confidence == 0: return start, end = self.get_last_day(self.Time), self.Time x_pred = self.get_data(start, end, with_target=False) if x_pred is None: return y_proba = pd.Series(self.model.predict_proba(x_pred)[:, 1], index=x_pred.index.get_level_values("symbol")) positions = -y_proba.apply(lambda x: max(x-0.5, 0) * 2) * self.confidence for symbol, pos in positions[positions != 0].items(): self.Debug(f"{self.Time} - Trading {symbol}") qty = self.CalculateOrderQuantity(symbol, pos) last_price = self.Securities[symbol].Price self.LimitOrder(symbol, qty, last_price) def exit_trades(self): for s in self.ActiveSecurities.Keys: self.Transactions.CancelOpenOrders(s) qty = self.Portfolio[s].Quantity price = self.Portfolio[s].Price if qty != 0: self.LimitOrder(s, -self.Portfolio[s].Quantity, price) def liquidate(self): self.Transactions.CancelOpenOrders() self.Liquidate() def get_data(self, start, end, with_target=True): gaplist = self.get_gaplist(start, end) if len(gaplist) == 0: return None minute_bars = [] for symbol, day in gaplist.index: end = min(day + timedelta(1), self.Time) minute_bars += [self.History([symbol], day, end, Resolution.Minute)] minute_bars = pd.concat(minute_bars) if len(minute_bars) == 0: return None # TODO: Fix gaplist function opening_bar = agg_bars(minute_bars, "09:31", "09:45") pm_bar = agg_bars(minute_bars, "07:01", "09:30") features = gaplist.to_frame("gap") features["opening_range"] = opening_bar.eval("(close-low)/(high-low)") features["pm_volume_usd"] = pm_bar.eval("close * volume") opening_bars = idx.filter_bars(minute_bars, "09:31", "09:45") grouper = [pd.Grouper(level="symbol"), pd.Grouper(level="time", freq="1D")] vwap = idx.intra_vwap(opening_bars).groupby(grouper).last() features["divergence"] = opening_bar["close"]/vwap - 1 # TODO: Rolling MAX close/vwap # TODO: Add shares as % of float # TODO: atm usd volume """ TODO: TO BE REVIEWED atm_start = start - timedelta(365) valid_atms = self.atms.query("(time >= @atm_start) and (time <= @end)") last_atms = valid_atms.groupby("symbol").last() features = features.join(last_atms[["Total ATM Capacity", "atm_date"]]) features.rename(columns={"Total ATM Capacity": "atm_shares", "atm_date": "atm_days"}, inplace=True) features.eval("pm_volume_atm = pm_volume / atm_shares", inplace=True) time_idx = features.index.get_level_values("time") features["atm_days"] = (time_idx - features["atm_days"]) / np.timedelta64(1, 'D') features = features.query("atm_days <= 365") """ # TODO: Add gap days since last ATM features.dropna(inplace=True) if with_target: target_bar = agg_bars(minute_bars, "09:46", "10:29") targets = -target_bar.eval("close/open - 1").dropna() # Inverted return since we are shorting index = targets.index.intersection(features.index) return features.loc[index], targets.loc[index] else: return features def get_atms(self): csv = StringIO(self.Download(TICKERS_CSV)) atms = pd.read_csv(csv, parse_dates=[DATE_COLUMN], dayfirst=True, thousands=",") atms[DATE_COLUMN] = atms[DATE_COLUMN].dt.date atms.rename(columns={DATE_COLUMN: "time", SYMBOL_COLUMN: "symbol"}, inplace=True) [self.AddEquity(s, Resolution.Minute, extendedMarketHours=True) for s in atms["symbol"].unique()] atms["symbol"] = atms["symbol"].apply(lambda x: str(self.Symbol(x).ID)) atms.set_index(["symbol", "time"], inplace=True) atms["atm_date"] = atms.index.get_level_values("time") return atms.query("OfferingType == 'ATM'") def get_gaplist(self, start, end): atm_start = start - timedelta(365) valid_atms = self.atms.query("(time >= @atm_start) and (time <= @end)") symbols = valid_atms.index.get_level_values("symbol").unique().tolist() day_start = self.get_last_day(start) day_bars = self.History(symbols, day_start, end, Resolution.Daily) today_start = self.Time.replace(hour=9, minute=30, second=0) if end > today_start: # adding manually the last day bar if missing today_end = min(end, self.Time) last_day_bars = self.History(symbols, today_start, today_end, Resolution.Minute) last_day_bar = agg_bars(last_day_bars, "09:31", "15:59") day_bars = pd.concat([day_bars, last_day_bar]) yesterday_bars = day_bars.groupby("symbol").shift(1) gaps = day_bars["open"]/yesterday_bars["close"]-1 return gaps[gaps > MIN_GAP] def get_last_day(self, date): trade_days = self.TradingCalendar.GetTradingDays(date - timedelta(7), date - timedelta(1)) return list(filter(lambda p: p.BusinessDay and not p.PublicHoliday, trade_days))[-1].Date def agg_bars(minute_bars, start_time, end_time): grouper = [pd.Grouper(level="symbol"), pd.Grouper(level="time", freq="1D")] filtered_bars = idx.filter_bars(minute_bars, start_time, end_time) return filtered_bars.groupby(grouper).agg(AGG_OPS)
""" Big Bertha Strategy with Machine Learning Done - New Features (bb volume and open) - Offline data storage to avoid symbols limitation - Trade execution on high probability trades Todo - Risk management with stop loss @version: 0.11 @creation date: 05/07/2022 """ from AlgorithmImports import * import numpy as np import pandas as pd from ast import literal_eval from sklearn.model_selection import cross_val_score from sklearn.ensemble import GradientBoostingClassifier import indicators as idx pd.set_option('mode.use_inf_as_na', True) GROUPER = [pd.Grouper(level="symbol"), pd.Grouper(level="time", freq="1D")] AGG_OPS = {"open": "first", "close": "last", "high": "max", "low": "min", "volume": "sum"} class BigBerthaML(QCAlgorithm): def Initialize(self): self.min_usd_volume = literal_eval(self.GetParameter("min_usd_volume")) self.capital = literal_eval(self.GetParameter("capital")) self.benchmark = self.GetParameter("benchmark") self.SetStartDate(2021, 1, 1) self.SetEndDate(2022, 1, 1) self.SetCash(self.capital) self.UniverseSettings.Resolution = Resolution.Minute self.UniverseSettings.ExtendedMarketHours = True self.AddUniverse(self.coarse_filter) self.AddEquity(self.benchmark, Resolution.Minute) self.SetBenchmark(self.benchmark) self.accuracy = None self.features, self.targets = None, None self.model = GradientBoostingClassifier(n_iter_no_change=3) at = self.TimeRules.At every_day = self.DateRules.EveryDay(self.benchmark) self.Train(self.DateRules.WeekStart(), at(0, 0), self.train_model) self.Schedule.On(every_day, at(9, 35), self.store_data) self.Schedule.On(every_day, at(9, 35), self.trade) self.Schedule.On(every_day, at(15, 55), self.stop_trading) def coarse_filter(self, coarse): return [x.Symbol for x in coarse if x.HasFundamentalData and x.DollarVolume > self.min_usd_volume] def train_model(self): if self.features is None or self.targets is None: return self.Debug(f"{self.Time} Training") x, y = self.get_train_data() fit_params = dict(sample_weight=abs(y)) cv_scores = cross_val_score(self.model, X=x, y=(y > 0).astype(float), cv=10, fit_params=fit_params) self.accuracy = np.mean(cv_scores) self.model.fit(x, (y > 0).astype(float)) self.Debug(f"{self.Time} Points:{len(x)} Accuracy:{self.accuracy:.1%}") self.Plot("ML", "Accuracy", self.accuracy) def trade(self): if self.accuracy is None: return self.Debug(f"{self.Time} Trading") x_pred = self.get_pred_data() y_proba = pd.Series(self.model.predict_proba(x_pred)[:, 1], index=x_pred.index).groupby("symbol").last() self.Debug(f"Predictions: {len(y_proba)} - Proba {min(y_proba):.0%}-{max(y_proba):.0%}") positions = (y_proba[(y_proba <= 0.4)|(y_proba >= 0.6)] - 0.5) * self.accuracy # Model and trade confidence if sum(abs(positions)) > 1: positions /= sum(abs(positions)) # Ensuring no leverage is used [self.SetHoldings(symbol, pos) for symbol, pos in positions.items()] def stop_trading(self): self.Transactions.CancelOpenOrders() self.Liquidate() def store_data(self): trade_days = self.TradingCalendar.GetTradingDays(self.Time - timedelta(7), self.Time - timedelta(1)) last_day = list(filter(lambda p: p.BusinessDay and not p.PublicHoliday, trade_days))[-1].Date start = last_day.replace(hour=9, minute=30, second=0) end = self.Time.replace(hour=9, minute=35, second=0) tickers = list(filter(lambda x: str(x) not in self.benchmark, self.ActiveSecurities.Keys)) minute_bars = self.History(tickers, start, end, Resolution.Minute) self.add_features(minute_bars) self.add_targets(minute_bars) self.Debug(f"{self.Time} Data updated: {len(self.features)} datapoints") def add_features(self, minute_bars): day_bar = self.agg_bars(minute_bars, "09:31", "16:00") pm_bar = self.agg_bars(minute_bars, "00:01", "09:30") min5_bar = self.agg_bars(minute_bars, "09:31", "09:35") features = min5_bar.add_prefix("bb_") features["bb_size"] = min5_bar.eval("(high-low)/open") features["bb_close_range"] = min5_bar.eval("(close-low)/(high-low)") features["bb_open_range"] = min5_bar.eval("(open-low)/(high-low)") features["pm_volume_usd"] = pm_bar.eval("close * volume") yesterday_close = day_bar["close"].groupby("symbol").shift(1) features["gap"] = day_bar["open"] / yesterday_close-1 features.dropna(inplace=True) if self.features is not None: new_idx = features.index.difference(self.features.index) # Removing potential duplicates self.features = pd.concat([self.features, features.loc[new_idx]]) else: self.features = features def add_targets(self, minute_bars): trading_bar = self.agg_bars(minute_bars, "09:36", "15:55") targets = trading_bar.eval("close/open-1").dropna() if self.targets is not None: new_idx = targets.index.difference(self.targets.index) # Removing potential duplicates self.targets = pd.concat([self.targets, targets.loc[new_idx]]) else: self.targets = targets def get_train_data(self): train_idx = self.targets.index.intersection(self.features.index) return self.features.loc[train_idx], self.targets.loc[train_idx] def get_pred_data(self): return self.features.query("time == @self.Time.date()") def agg_bars(self, minute_bars, start_time, end_time): filtered_bars = idx.filter_bars(minute_bars, start_time, end_time) return filtered_bars.groupby(GROUPER).agg(AGG_OPS)
""" Big Bertha Strategy with Machine Learning Done - New Features (bb volume and open) - Offline data storage to avoid symbols limitation - Trade execution on high probability trades Todo - Risk management with stop loss @version: 0.12 @creation date: 05/07/2022 """ from AlgorithmImports import * import numpy as np import pandas as pd from ast import literal_eval from sklearn.model_selection import cross_val_score from sklearn.ensemble import GradientBoostingClassifier import indicators as idx pd.set_option('mode.use_inf_as_na', True) GROUPER = [pd.Grouper(level="symbol"), pd.Grouper(level="time", freq="1D")] AGG_OPS = {"open": "first", "close": "last", "high": "max", "low": "min", "volume": "sum"} RETR_PCT = 0.7 class BigBerthaML(QCAlgorithm): def Initialize(self): self.min_usd_volume = literal_eval(self.GetParameter("min_usd_volume")) self.capital = literal_eval(self.GetParameter("capital")) self.benchmark = self.GetParameter("benchmark") self.SetStartDate(2021, 1, 1) self.SetEndDate(2022, 1, 1) self.SetCash(self.capital) self.UniverseSettings.Resolution = Resolution.Minute self.UniverseSettings.ExtendedMarketHours = True self.AddUniverse(self.coarse_filter) self.AddEquity(self.benchmark, Resolution.Minute) self.SetBenchmark(self.benchmark) self.accuracy = 0 self.features, self.targets = None, None self.model = GradientBoostingClassifier(n_iter_no_change=3) at = self.TimeRules.At every_day = self.DateRules.EveryDay(self.benchmark) self.Train(self.DateRules.WeekStart(), at(0, 0), self.train_model) self.Schedule.On(every_day, at(9, 35), self.store_data) self.Schedule.On(every_day, at(9, 35), self.trade) self.Schedule.On(every_day, at(15, 55), self.stop_trading) def coarse_filter(self, coarse): return [x.Symbol for x in coarse if x.HasFundamentalData and x.DollarVolume > self.min_usd_volume] def train_model(self): if self.features is None or self.targets is None: return self.Debug(f"{self.Time} Training") x, y = self.get_train_data() cv_scores = cross_val_score(self.model, X=x, y=y, cv=10) self.accuracy = np.mean(cv_scores) self.model.fit(x, y) self.Debug(f"{self.Time} Points:{len(x)} Accuracy:{self.accuracy:.1%}") self.Plot("ML", "Accuracy", self.accuracy) def trade(self): if self.accuracy <= 0.5: return self.Debug(f"{self.Time} Trading") x_pred = self.get_pred_data() y_proba = pd.DataFrame(self.model.predict_proba(x_pred), index=x_pred.index, columns=self.model.classes_).groupby("symbol").last() actions = y_proba.idxmax(axis=1) positions = actions.apply(lambda x: 0.01 if x=="long" else -0.01 if x=="short" else 0) self.Debug(f"Predictions: {len(y_proba)} - Proba {y_proba}") for symbol, pos in positions.items(): qty = self.CalculateOrderQuantity(symbol, pos) self.MarketOrder(symbol, qty) feats = x_pred.loc[symbol].iloc[0] # TODO: Refactor window = (feats.bb_high - feats.bb_low) * RETR_PCT stop_loss = feats.bb_high - window if pos > 0 \ else feats.bb_low + window # TODO: Refactor self.StopMarketOrder(symbol, -qty, stop_loss) def stop_trading(self): self.Transactions.CancelOpenOrders() self.Liquidate() def store_data(self): trade_days = self.TradingCalendar.GetTradingDays(self.Time - timedelta(7), self.Time - timedelta(1)) last_day = list(filter(lambda p: p.BusinessDay and not p.PublicHoliday, trade_days))[-1].Date start = last_day.replace(hour=9, minute=30, second=0) end = self.Time.replace(hour=9, minute=35, second=0) tickers = list(filter(lambda x: str(x) not in self.benchmark, self.ActiveSecurities.Keys)) minute_bars = self.History(tickers, start, end, Resolution.Minute) new_features = self.calc_features(minute_bars).dropna() if self.features is not None: new_idx = new_features.index.difference(self.features.index) # Removing potential duplicates self.features = pd.concat([self.features, new_features.loc[new_idx]]) else: self.features = new_features new_targets = self.calc_targets(minute_bars).dropna() if self.targets is not None: new_idx = new_targets.index.difference(self.targets.index) # Removing potential duplicates self.targets = pd.concat([self.targets, new_targets.loc[new_idx]]) else: self.targets = new_targets self.Debug(f"{self.Time} Data updated: {len(self.features)} datapoints") def calc_features(self, minute_bars): day_bar = self.agg_bars(minute_bars, "09:31", "16:00") pm_bar = self.agg_bars(minute_bars, "00:01", "09:30") min5_bar = self.agg_bars(minute_bars, "09:31", "09:35") features = min5_bar.add_prefix("bb_") features["bb_size"] = min5_bar.eval("(high-low)/open") features["bb_close_range"] = min5_bar.eval("(close-low)/(high-low)") features["bb_open_range"] = min5_bar.eval("(open-low)/(high-low)") features["pm_volume_usd"] = pm_bar.eval("close * volume") yesterday_close = day_bar["close"].groupby("symbol").shift(1) features["gap"] = day_bar["open"] / yesterday_close-1 return features def calc_targets(self, minute_bars): trading_bar = self.agg_bars(minute_bars, "09:36", "15:55") min5_bar = self.agg_bars(minute_bars, "09:31", "09:35") trading_bar = trading_bar.join(min5_bar.add_prefix("bb_")) return trading_bar.apply(calc_exit_price, axis=1) def get_train_data(self): train_idx = self.targets.index.intersection(self.features.index) return self.features.loc[train_idx], self.targets.loc[train_idx] def get_pred_data(self): return self.features.query("time == @self.Time.date()") def agg_bars(self, minute_bars, start_time, end_time): filtered_bars = idx.filter_bars(minute_bars, start_time, end_time) return filtered_bars.groupby(GROUPER).agg(AGG_OPS) def get_dataset_days(self): return len(self.features.index.get_level_values("time").unique()) \ if self.features is not None else 0 def calc_exit_price(row, retr_pct=RETR_PCT): window = (row.bb_high-row.bb_low)*retr_pct if row.close > row.open: # long trade stop_loss = row.bb_high - window target = "long" if row.low > stop_loss else "pass" # 1 if profitable long and not touching the SL else: # short trade stop_loss = row.bb_low + window target = "short" if row.high < stop_loss else "pass" # -1 if profitable short and not touching the SL return target
""" Full Liquidation System Strategy @version: 0.1 @creation date: 13/7/2022 """ import pandas as pd from io import StringIO from AlgorithmImports import * from ast import literal_eval from datetime import datetime, timedelta AGG_OPS = {"open": "first", "high": "max", "low": "min", "close": "last", "volume": "sum"} TICKERS_CSV = "https://drive.google.com/uc?export=download&id=1bzOypNRbhLMRsQzS5DJUxG0OaIRi7hI8" #"https://drive.google.com/uc?export=download&id=1cReDW0EPToXFmOfIWtdK9-bR5doecw0-" class LiquidationFulll(QCAlgorithm): def Initialize(self): self.capital = literal_eval(self.GetParameter("capital")) self.entry_size = literal_eval(self.GetParameter("entry_size")) # Negative value for shorts self.SetCash(self.capital) # Set Strategy Cash self.SetStartDate(2021, 10, 1) self.SetEndDate(2022, 7, 1) csv = StringIO(self.Download(TICKERS_CSV)) self.overhang = pd.read_csv(csv, parse_dates=["Date"], dayfirst=False) self.overhang["Date"] = self.overhang["Date"].dt.date self.AddUniverse(self.coarse_filter) self.resolution = Resolution.Second self.UniverseSettings.Resolution = self.resolution every_day = self.DateRules.EveryDay() every_second = self.TimeRules.Every(TimeSpan.FromSeconds(30)) every_minute = self.TimeRules.Every(TimeSpan.FromMinutes(15)) at = self.TimeRules.At self.Schedule.On(every_day, every_second, self.open_trade) self.Schedule.On(every_day, every_second, self.adding_trade) self.Schedule.On(every_day, at(7, 5), self.PM_entry) self.Schedule.On(every_day, at(7, 15), self.PM_stop) self.Schedule.On(every_day, at(8, 10), self.PM_second_entry) self.Schedule.On(every_day, at(8, 15), self.PM_second_stop) self.Schedule.On(every_day, at(9, 45), self.set_stop) self.Schedule.On(every_day, at(10, 5), self.adjust_position) self.Schedule.On(every_day, at(10, 30), self.adjust_second_position) self.Schedule.On(every_day, every_minute, self.set_second_stop) self.Schedule.On(every_day, at(15, 55), self.close_trade) self.Schedule.On(every_day, at(15, 58), self.liquidate_trade) self.SetSecurityInitializer(lambda x: x.SetMarketPrice(self.GetLastKnownPrice(x))) def PM_entry(self): symbols = list(self.ActiveSecurities.Keys) history = self.History(symbols, self.Time.date(), self.Time, resolution=self.resolution) if len(history) > 0: self.Transactions.CancelOpenOrders() self.Debug(f"{self.Time} - PM entry1") hod = history["high"].groupby("symbol").max() lod = history["low"].groupby("symbol").min() intra_range = hod - lod intra_range1 = (hod - lod) * .1 vwap_bars = history.eval("volume*(high+low+close)/3") total_vwap = vwap_bars.groupby("symbol").sum() vwap = total_vwap/history["volume"].groupby("symbol").sum() limit_price = vwap - intra_range1 for s in symbols: lasts = self.Securities[s].Price order_val = self.entry_size quantity = int(order_val/vwap[s]/8) self.LimitOrder(s, quantity, lasts) def PM_stop(self): symbols = self.get_owned_stocks() history = self.History(symbols, self.Time.replace(hour=7,minute=00,second=00), self.Time, resolution=self.resolution) if len(history) > 0: self.Transactions.CancelOpenOrders() self.Debug(f"{self.Time} - PM stop1") hod = history["high"].groupby("symbol").max() lod = history["low"].groupby("symbol").min() intra_range = hod - lod intra_range1 = (hod - lod) * .1 vwap_bars = history.eval("volume*(high+low+close)/3") total_vwap = vwap_bars.groupby("symbol").sum() vwap = total_vwap/history["volume"].groupby("symbol").sum() limit_price = hod + intra_range1 for s in symbols: lasts = self.Securities[s].Price order_val = self.entry_size qty = self.Portfolio[s].Quantity self.StopLimitOrder(s, -qty, hod[s],limit_price[s]) def PM_second_entry(self): symbols = self.get_owned_stocks() history = self.History(symbols, self.Time.replace(hour=7,minute=00,second=00), self.Time, resolution=self.resolution) if len(history) > 0: self.Transactions.CancelOpenOrders() self.Debug(f"{self.Time} - PM entry2") hod = history["high"].groupby("symbol").max() lod = history["low"].groupby("symbol").min() intra_range = hod - lod intra_range1 = (hod - lod) * .1 vwap_bars = history.eval("volume*(high+low+close)/3") total_vwap = vwap_bars.groupby("symbol").sum() vwap = total_vwap/history["volume"].groupby("symbol").sum() limit_price = hod + intra_range1 for s in symbols: lasts = self.Securities[s].Price order_val = self.entry_size holding = self.Portfolio[s] pos_avg = holding.Price stop_limit = pos_avg + intra_range1[s] #_current = data.Bars[self.symbol.Symbol] pl = self.Portfolio[s].get_UnrealizedProfit() if pl >= 0: qty = self.Portfolio[s].Quantity #self.MarketOrder(s, -int(qty/2)) #self.LimitOrder(s, -int(qty/2),target_price[s]) self.LimitOrder(s, qty,lasts) #self.StopLimitOrder(s, int(qty/2)-qty, hod[s],limit_price[s]) self.StopLimitOrder(s, -qty, hod[s],limit_price[s]) else: qty = self.Portfolio[s].Quantity self.LimitOrder(s, -int(qty),lasts) def PM_second_stop(self): symbols = self.get_owned_stocks() history = self.History(symbols, self.Time.replace(hour=7,minute=00,second=00), self.Time, resolution=self.resolution) if len(history) > 0: self.Transactions.CancelOpenOrders() self.Debug(f"{self.Time} - PM stop2") hod = history["high"].groupby("symbol").max() lod = history["low"].groupby("symbol").min() intra_range = hod - lod intra_range1 = (hod - lod) * .1 vwap_bars = history.eval("volume*(high+low+close)/3") total_vwap = vwap_bars.groupby("symbol").sum() vwap = total_vwap/history["volume"].groupby("symbol").sum() limit_price = hod + intra_range1 for s in symbols: lasts = self.Securities[s].Price order_val = self.entry_size qty = self.Portfolio[s].Quantity self.StopLimitOrder(s, -self.Portfolio[s].Quantity, hod[s],limit_price[s]) def open_trade(self): if time(9, 30) < self.Time.time() < time(9, 35): symbols = self.get_owned_stocks() history = self.History(symbols, self.Time.replace(hour=9,minute=30,second=00), self.Time, resolution=self.resolution) if len(history) > 0: self.Transactions.CancelOpenOrders() hod = history["high"].groupby("symbol").max() lod = history["low"].groupby("symbol").min() intra_range = hod - lod intra_range1 = (hod - lod) * .1 vwap_bars = history.eval("volume*(high+low+close)/3") total_vwap = vwap_bars.groupby("symbol").sum() vwap = total_vwap/history["volume"].groupby("symbol").sum() limit_price = vwap - intra_range1 for s in symbols: lasts = self.Securities[s].Price holding = self.Portfolio[s] order_val = self.entry_size pl = self.Portfolio[s].get_UnrealizedProfit() if pl >= 0: quantity = int(order_val/vwap[s]/20) #limit_price = vwap[symbol] - intra_range1[symbol] self.LimitOrder(s, quantity, lasts) #self.LimitOrder(symbol,quantity,self.lasts_close[symbol]) else: qty = self.Portfolio[s].Quantity self.LimitOrder(s, -int(qty), lasts) def adding_trade(self): if time(9, 35) < self.Time.time() < time(9, 45): symbols = self.get_owned_stocks() history = self.History(symbols, self.Time.replace(hour=9,minute=30,second=00), self.Time, resolution=self.resolution) if len(history) > 0: self.Transactions.CancelOpenOrders() hod = history["high"].groupby("symbol").max() lod = history["low"].groupby("symbol").min() intra_range = hod - lod intra_range1 = (hod - lod) * .1 vwap_bars = history.eval("volume*(high+low+close)/3") total_vwap = vwap_bars.groupby("symbol").sum() vwap = total_vwap/history["volume"].groupby("symbol").sum() limit_price = vwap - intra_range1 for s in symbols: lasts = self.Securities[s].Price holding = self.Portfolio[s] order_val = self.entry_size pl = self.Portfolio[s].get_UnrealizedProfit() quantity = int(order_val/vwap[s]/80) #limit_price = vwap[symbol] - intra_range1[symbol] self.LimitOrder(s, quantity, lasts) #self.LimitOrder(symbol,quantity,self.lasts_close[symbol]) def set_stop(self): symbols = self.get_owned_stocks() history = self.History(symbols, self.Time.replace(hour=9,minute=30,second=00), self.Time, resolution=self.resolution) if len(history) > 0: self.Debug(f"{self.Time} - Set Stop") self.Transactions.CancelOpenOrders() hod = history["high"].groupby("symbol").max() lod = history["low"].groupby("symbol").min() intra_range = hod - lod intra_range1 = (hod - lod) *.05 intra_range2 = (hod - lod) *.15 stop_price = hod + intra_range1 limit_price = hod + intra_range2 target_price = lod + intra_range1 for s in symbols: holding = self.Portfolio[s] pos_avg = holding.Price lasts = self.Securities[s].Price qty = self.Portfolio[s].Quantity #self.StopLimitOrder(s, -self.Portfolio[s].Quantity, self.Portfolio[s].Price, hod[s]) self.StopLimitOrder(s, -self.Portfolio[s].Quantity, stop_price[s],limit_price[s]) #self.LimitOrder(s, -int(qty/4),target_price[s]) def adjust_position(self): symbols = self.get_owned_stocks() history = self.History(symbols, self.Time.replace(hour=9,minute=30,second=00), self.Time, resolution=self.resolution) if len(history) > 0: self.Debug(f"{self.Time} - Adjust Position") self.Transactions.CancelOpenOrders() hod = history["high"].groupby("symbol").max() lod = history["low"].groupby("symbol").min() intra_range = hod - lod intra_range1 = (hod - lod) * .05 limit_price = hod + intra_range1 target_price = lod + intra_range1 for s in symbols: lasts = self.Securities[s].Price holding = self.Portfolio[s] pos_avg = holding.Price stop_limit = pos_avg + intra_range1[s] pl = self.Portfolio[s].get_UnrealizedProfit() # pl = self.Portfolio[s].get_Profit() \ # + self.Portfolio[s].get_UnrealizedProfit() if pl > 0: qty = self.Portfolio[s].Quantity #self.MarketOrder(s, -int(qty/2)) #self.LimitOrder(s, -int(qty/2),target_price[s]) #self.LimitOrder(s, -int(qty/4),lasts) #self.StopLimitOrder(s, int(qty/2)-qty, hod[s],limit_price[s]) self.StopLimitOrder(s, -int(qty), pos_avg, stop_limit) else: qty = self.Portfolio[s].Quantity self.LimitOrder(s, -int(qty),lasts) #self.Liquidate(s) def adjust_second_position(self): symbols = self.get_owned_stocks() history = self.History(symbols, self.Time.replace(hour=9,minute=30,second=00), self.Time, resolution=self.resolution) if len(history) > 0: self.Debug(f"{self.Time} - Adjust Position_2") self.Transactions.CancelOpenOrders() hod = history["high"].groupby("symbol").max() lod = history["low"].groupby("symbol").min() intra_range = hod - lod intra_range1 = (hod - lod) * .05 intra_range2 = (hod - lod) * .15 limit_price = hod + intra_range1 target_price = lod + intra_range1 for s in symbols: lasts = self.Securities[s].Price holding = self.Portfolio[s] pos_avg = holding.Price stop_limit = pos_avg + intra_range2[s] pl = self.Portfolio[s].get_UnrealizedProfit() if pl > 0: qty = self.Portfolio[s].Quantity #self.MarketOrder(s, -int(qty/2)) self.LimitOrder(s, int(qty/2),target_price[s]) #self.StopLimitOrder(s, int(qty/2)-qty, hod[s],limit_price[s]) self.StopLimitOrder(s, -int(qty), pos_avg, stop_limit) else: qty = self.Portfolio[s].Quantity self.LimitOrder(s, -int(qty),lasts) #self.Liquidate(s) def set_second_stop(self): if time(10, 35) < self.Time.time() < time(15, 45): symbols = self.get_owned_stocks() history = self.History(symbols, self.Time.replace(hour=9,minute=30,second=00), self.Time, resolution=self.resolution) if len(history) > 0: self.Debug(f"{self.Time} - Set Stop2") #self.Transactions.CancelOpenOrders() hod = history["high"].groupby("symbol").max() lod = history["low"].groupby("symbol").min() intra_range = hod - lod intra_range1 = (hod - lod) *.05 intra_range2 = (hod - lod) *.15 stop_price = hod + intra_range1 limit_price = hod + intra_range2 target_price = lod + intra_range1 for s in symbols: holding = self.Portfolio[s] pos_avg = holding.Price lasts = self.Securities[s].Price qty = self.Portfolio[s].Quantity r_pl = self.Portfolio[s].get_Profit() stop_limit = pos_avg + intra_range2[s] #self.StopLimitOrder(s, -self.Portfolio[s].Quantity, self.Portfolio[s].Price, hod[s]) if r_pl > 0: self.Transactions.CancelOpenOrders() self.StopLimitOrder(s, -int(qty), pos_avg, stop_limit[s]) #self.LimitOrder(s, -int(qty/4),target_price[s]) def get_owned_stocks(self): return [s for s in self.ActiveSecurities.Keys if self.Portfolio[s].Quantity != 0] def close_trade(self): #if len(list(self.ActiveSecurities.Keys)) > 0: symbols = self.get_owned_stocks() history = self.History(symbols, self.Time.replace(hour=9,minute=30,second=00), self.Time, resolution=self.resolution) if len(history) > 0: self.Debug(f"{self.Time} - Close Trade") self.Transactions.CancelOpenOrders() hod = history["high"].groupby("symbol").max() lod = history["low"].groupby("symbol").min() intra_range = hod - lod intra_range1 = (hod - lod) * .05 for s in symbols: lasts = self.Securities[s].Price qty = self.Portfolio[s].Quantity limit_price = lasts + intra_range1[s] self.LimitOrder(s, -int(qty), limit_price) #self.Liquidate() def liquidate_trade(self): if len(list(self.ActiveSecurities.Keys)) > 0: self.Debug(f"{self.Time} - Liquidate Trade") self.Transactions.CancelOpenOrders() self.Liquidate() def coarse_filter(self, coarse): tickers = self.overhang.query("Date == @self.Time.date()") universe = [] if len(tickers) == 0 else \ [x.Symbol for x in coarse if (x.Symbol.Value == tickers["Symbol"]).any()] self.Debug(f"{self.Time} - Universe {len(tickers)} tickers") return universe #def OnData(self, data: Slice): # if self.aapl_symbol in data.Bars: # aapl_current_trade = data.Bars[self.aapl_symbol]
""" Basic ML Liquidation Strategy @version: 0.5 @creation date: 16/07/2022 - At Open, predict the probability of success (triple labeling, up, stop loss or null) - Enter with limit order at 9:45 price - Exit with limit order at 10:30 price - Liquidate at 15:55 if there are still positions open """ from AlgorithmImports import * import numpy as np import pandas as pd from io import StringIO from ast import literal_eval from sklearn.model_selection import cross_val_score from sklearn.ensemble import GradientBoostingClassifier import indicators as idx pd.set_option('mode.use_inf_as_na', True) TICKERS_CSV = "https://drive.google.com/uc?export=download&id=1bzOypNRbhLMRsQzS5DJUxG0OaIRi7hI8" AGG_OPS = {"open": "first", "close": "last", "high": "max", "low": "min", "volume": "sum"} class LiquidationBasicML(QCAlgorithm): def Initialize(self): self.SetStartDate(2021, 10, 1) self.SetEndDate(2022, 6, 1) self.benchmark = self.GetParameter("benchmark") self.entry_size = -abs(literal_eval(self.GetParameter("entry_size"))) self.capital = literal_eval(self.GetParameter("capital")) self.SetCash(self.capital) self.UniverseSettings.Resolution = Resolution.Minute self.UniverseSettings.ExtendedMarketHours = True self.model = GradientBoostingClassifier(n_iter_no_change=1) self.atms = self.get_atms() self.AddEquity(self.benchmark) self.SetBenchmark(self.benchmark) self.test_score = 0 self.Train(self.DateRules.MonthStart(), self.TimeRules.At(0, 0), self.train_model) every_day = self.DateRules.EveryDay(self.benchmark) at = self.TimeRules.At self.Schedule.On(every_day, at(9, 45), self.enter_trades) self.Schedule.On(every_day, at(10, 30), self.exit_trades) self.Schedule.On(every_day, at(11, 30), self.liquidate) def train_model(self): train_start = self.atms.index.get_level_values("time").min() x, y = self.get_data(train_start, self.Time) fit_params = dict(sample_weight=abs(y)) cv_scores = cross_val_score(self.model, X=x, y=(y>0).astype(float), scoring="precision", fit_params=fit_params) self.test_score = np.mean(cv_scores) self.model.fit(x, (y > 0).astype(float), **fit_params) self.Debug(f"Training Points: {len(x)} Test Score: {self.test_score:.1%}") self.Plot("ML", "Test Score", self.test_score) def enter_trades(self): start, end = self.Time-timedelta(1), self.Time today_atms = self.atms.query("(time >= @start) and (time <= @end)") if (len(today_atms) > 0) and (self.test_score > 0.5): x_pred = self.get_data(start, end, with_target=False) if len(x_pred) > 0: y_proba = pd.Series(self.model.predict_proba(x_pred)[:, 1], index=x_pred.index).groupby("symbol").last() positions = y_proba[y_proba > 0.5] for symbol, position in positions.items(): order_value = self.entry_size * position self.Debug(f"{self.Time} - Trading {symbol} Value: {order_value}") self.MarketOrder(symbol, order_value) else: self.Debug("No data available!") def exit_trades(self): for s in self.get_owned_stocks(): self.Transactions.CancelOpenOrders(s) self.LimitOrder(s, -self.Portfolio[s].Quantity, self.Portfolio[s].Price) def liquidate(self): self.Transactions.CancelOpenOrders() self.Liquidate() def get_owned_stocks(self): return [s for s in self.ActiveSecurities.Keys if self.Portfolio[s].Quantity != 0] def get_data(self, start, end, with_target=True): datapoints = self.atms.query("(time >= @start) and (time <= @end)") minute_bars = [self.History([s], t, min(t + timedelta(1), self.Time), Resolution.Minute) for s, t in datapoints.index] minute_bars = pd.concat(minute_bars) day_grouper = [pd.Grouper(level="symbol"), pd.Grouper(level="time", freq="1D")] opening_bars = idx.filter_bars(minute_bars, "09:31", "09:45") opening_bar = opening_bars.groupby(day_grouper).agg(AGG_OPS) pm_bars = idx.filter_bars(minute_bars, "07:01", "09:30") pm_bar = pm_bars.groupby(day_grouper).agg(AGG_OPS) features = pd.DataFrame() features["opening_range"] = opening_bar.eval("(close-low)/(high-low)") features["pm_volume_usd"] = pm_bar.eval("close*volume") if len(features.dropna()) == 0: self.Debug("No data available!") features.dropna(inplace=True) if with_target: target_bars = idx.filter_bars(minute_bars, "09:45", "10:30") target_bar = target_bars.groupby(day_grouper).agg(AGG_OPS) returns = target_bar.eval("1-close/open").apply(np.log1p).dropna() index = returns.index.intersection(features.index) return features.loc[index], returns.loc[index] else: return features def get_atms(self): csv = StringIO(self.Download(TICKERS_CSV)) atms = pd.read_csv(csv, parse_dates=["Date"], dayfirst=False) atms["Date"] = atms["Date"].dt.date atms.rename(columns={"Date": "time", "Symbol": "symbol"}, inplace=True) [self.AddEquity(s, Resolution.Minute) for s in atms["symbol"].unique()] atms["symbol"] = atms["symbol"].apply(lambda x: str(self.Symbol(x).ID)) atms.set_index(["symbol", "time"], inplace=True) return atms[~atms.index.duplicated(keep='first')]
""" Full Liquidation System Strategy - Added pm volume filtering @version: 0.6 @creation date: 13/7/2022 """ import pandas as pd from io import StringIO from ast import literal_eval from AlgorithmImports import * import indicators as idx TICKERS_CSV = "https://drive.google.com/uc?export=download&id=1bzOypNRbhLMRsQzS5DJUxG0OaIRi7hI8" AGG_OPS = {"open": "first", "high": "max", "low": "min", "close": "last", "volume": "sum"} class LiquidationFull(QCAlgorithm): def Initialize(self): self.capital = literal_eval(self.GetParameter("capital")) self.benchmark = self.GetParameter("benchmark") self.entry_size = -abs(literal_eval(self.GetParameter("capital"))) self.SetCash(self.capital) # Set Strategy Cash self.SetStartDate(2021, 10, 1) self.SetEndDate(2022, 7, 1) csv = StringIO(self.Download(TICKERS_CSV)) self.atms = pd.read_csv(csv, parse_dates=["Date"], dayfirst=False) self.atms["Date"] = self.atms["Date"].dt.date self.resolution = Resolution.Second self.UniverseSettings.Resolution = self.resolution self.UniverseSettings.ExtendedMarketHours = True self.SetBenchmark(self.benchmark) self.AddEquity(self.benchmark, Resolution.Minute) self.AddUniverse(self.coarse_filter) at = self.TimeRules.At every_day = self.DateRules.EveryDay(self.benchmark) every_30sec = self.TimeRules.Every(TimeSpan.FromSeconds(30)) every_15min = self.TimeRules.Every(TimeSpan.FromMinutes(15)) self.Schedule.On(every_day, every_30sec, self.open_trade) self.Schedule.On(every_day, every_30sec, self.adding_trade) self.Schedule.On(every_day, at(7, 5), self.PM_entry1) self.Schedule.On(every_day, at(7, 15), self.PM_stop1) self.Schedule.On(every_day, at(8, 10), self.PM_entry2) self.Schedule.On(every_day, at(8, 15), self.PM_stop2) self.Schedule.On(every_day, at(9, 45), self.set_stop1) self.Schedule.On(every_day, at(10, 5), self.adjust_position1) self.Schedule.On(every_day, every_15min, self.set_stop2) self.Schedule.On(every_day, at(10, 30), self.adjust_position2) self.Schedule.On(every_day, at(15, 55), self.close_trade) self.Schedule.On(every_day, at(15, 58), self.liquidate_trade) def PM_entry1(self): history = self.History(self.get_universe_stocks(), self.Time.replace(hour=7, minute=0, second=0), # TODO: Start at midnight or 7am? self.Time, resolution=self.resolution) if len(history) > 0: self.Transactions.CancelOpenOrders() self.Debug(f"{self.Time} - PM entry1") vwaps = idx.intra_vwap(history).groupby("symbol").last().dropna() for symbol, vwap in vwaps.items(): last_price = self.Securities[symbol].Price quantity = int(self.entry_size/vwap/8) # TODO: What is this 8? Should we make it parametric? self.LimitOrder(symbol, quantity, last_price) def PM_stop1(self): today = self.get_today_bar(self.Time.replace(hour=7, minute=0, second=0), self.Time) if today is not None: self.Transactions.CancelOpenOrders() self.Debug(f"{self.Time} - PM stop1") limit_prices = today["high"] + (today["high"] - today["low"]) * .1 # TODO: Should we make this .1 parametric? for symbol, limit_price in limit_prices.items(): qty = self.Portfolio[symbol].Quantity self.StopLimitOrder(symbol, -qty, today.loc[symbol, "high"], limit_price) def PM_entry2(self): today = self.get_today_bar(self.Time.replace(hour=7, minute=0, second=0), self.Time) if today is not None: self.Transactions.CancelOpenOrders() self.Debug(f"{self.Time} - PM entry2") limit_prices = today["high"] + (today["high"] - today["low"]) * .1 # TODO: Should we make this .1 parametric? for symbol, limit_price in limit_prices.items(): last_price = self.Securities[symbol].Price pl = self.Portfolio[symbol].get_UnrealizedProfit() qty = self.Portfolio[symbol].Quantity if pl >= 0: self.LimitOrder(symbol, qty, last_price) self.StopLimitOrder(symbol, -qty, today.loc[symbol, "high"], limit_price) else: self.LimitOrder(symbol, -qty, last_price) def PM_stop2(self): today = self.get_today_bar(self.Time.replace(hour=7, minute=0, second=0), self.Time) if today is not None: self.Transactions.CancelOpenOrders() self.Debug(f"{self.Time} - PM stop2") limit_prices = today["high"] + (today["high"] - today["low"]) * .1 # TODO: Should we make this .1 parametric? for symbol, limit_price in limit_prices.items(): self.StopLimitOrder(symbol, -self.Portfolio[symbol].Quantity, today.loc[symbol, "high"], limit_price) def open_trade(self): if time(9, 30) < self.Time.time() < time(9, 35): history = self.History(self.get_owned_stocks(), self.Time.replace(hour=9, minute=30, second=0), self.Time, resolution=self.resolution) if len(history) > 0: self.Transactions.CancelOpenOrders() vwaps = idx.intra_vwap(history).groupby("symbol").last().dropna() for symbol, vwap in vwaps.items(): last_price = self.Securities[symbol].Price pl = self.Portfolio[symbol].get_UnrealizedProfit() if pl >= 0: quantity = int(self.entry_size/vwap/20) # TODO: What is this 20? Should we make it parametric? self.LimitOrder(symbol, quantity, last_price) else: qty = self.Portfolio[symbol].Quantity self.LimitOrder(symbol, -int(qty), last_price) def adding_trade(self): if time(9, 35) < self.Time.time() < time(9, 45): history = self.History(self.get_owned_stocks(), self.Time.replace(hour=9, minute=30, second=0), self.Time, resolution=self.resolution) if len(history) > 0: self.Transactions.CancelOpenOrders() vwaps = idx.intra_vwap(history).groupby("symbol").last().dropna() for symbol, vwap in vwaps.items(): last_price = self.Securities[symbol].Price quantity = int(self.entry_size/vwap/80) # TODO: What is this 80? Should we make it parametric? self.LimitOrder(symbol, quantity, last_price) def set_stop1(self): today = self.get_today_bar(self.Time.replace(hour=9, minute=30, second=0), self.Time) if today is not None: self.Debug(f"{self.Time} - Set Stop") self.Transactions.CancelOpenOrders() intra_ranges = today["high"] - today["low"] stop_price = today["high"] + intra_ranges * 0.5 # TODO: Should we make this parametric? limit_prices = today["high"] + intra_ranges * 0.15 for symbol, limit_price in limit_prices.items(): self.StopLimitOrder(symbol, -self.Portfolio[symbol].Quantity, stop_price[symbol], limit_price) def adjust_position1(self): today = self.get_today_bar(self.Time.replace(hour=9, minute=30, second=0), self.Time) if today is not None: self.Debug(f"{self.Time} - Adjust Position") self.Transactions.CancelOpenOrders() intra_ranges = today["high"] - today["low"] for symbol, intra_range in intra_ranges.items(): last_price = self.Securities[symbol].Price stop_limit = last_price + intra_range * .05 # TODO: Should we make this parametric pl = self.Portfolio[symbol].get_UnrealizedProfit() if pl > 0: qty = self.Portfolio[symbol].Quantity self.StopLimitOrder(symbol, -int(qty), last_price, stop_limit) else: qty = self.Portfolio[symbol].Quantity self.LimitOrder(symbol, -int(qty), last_price) def adjust_position2(self): today = self.get_today_bar(self.Time.replace(hour=9, minute=30, second=0), self.Time) if today is not None: self.Debug(f"{self.Time} - Adjust Position_2") self.Transactions.CancelOpenOrders() intra_ranges = today["high"] - today["low"] target_prices = today["low"] + intra_ranges * .05 # TODO: Should we make this .05 parametric? for symbol, target_price in target_prices.items(): last_price = self.Securities[symbol].Price stop_limit = last_price + intra_ranges[symbol] * .15 pl = self.Portfolio[symbol].get_UnrealizedProfit() qty = self.Portfolio[symbol].Quantity if pl > 0: self.LimitOrder(symbol, int(qty/2), target_price) self.StopLimitOrder(symbol, -int(qty), last_price, stop_limit) else: self.LimitOrder(symbol, -int(qty), last_price) def set_stop2(self): if time(10, 35) < self.Time.time() < time(15, 45): today = self.get_today_bar(self.Time.replace(hour=9, minute=30, second=0), self.Time) if today is not None: self.Debug(f"{self.Time} - Set Stop2") intra_ranges = today["high"] - today["low"] for symbol, intra_range in intra_ranges.items(): last_price = self.Portfolio[symbol].Price qty = self.Portfolio[symbol].Quantity r_pl = self.Portfolio[symbol].get_Profit() stop_limit = last_price + intra_range * .15 # TODO: Should we make this parametric? if r_pl > 0: self.Transactions.CancelOpenOrders() self.StopLimitOrder(symbol, -qty, last_price, stop_limit) def get_owned_stocks(self): return [s for s in self.ActiveSecurities.Keys if self.Portfolio[s].Quantity != 0] def get_universe_stocks(self): history = self.History(list(self.ActiveSecurities.Keys), self.Time.replace(hour=7, minute=0, second=0), self.Time, resolution=self.resolution) if len(history) > 0: median_volume = history["volume"].groupby("symbol").median() universe = median_volume[median_volume>2500].index.to_list() else: universe = [] self.Debug(f"{self.Time} - Universe {len(universe)} tickers") return universe def close_trade(self): today = self.get_today_bar(self.Time.replace(hour=9, minute=30, second=0), self.Time) if today is not None: self.Debug(f"{self.Time} - Close Trade") self.Transactions.CancelOpenOrders() intra_range = today["high"] - today["low"] for symbol in intra_range.index: lasts = self.Securities[symbol].Price qty = self.Portfolio[symbol].Quantity limit_prices = lasts + intra_range[symbol] * .05 # TODO: Should we make this parametric? self.LimitOrder(symbol, -int(qty), limit_prices) def liquidate_trade(self): if len(self.get_owned_stocks()) > 0: self.Debug(f"{self.Time} - Liquidate Trade") self.Transactions.CancelOpenOrders() self.Liquidate() def get_today_bar(self, start, end): history = self.History(self.get_owned_stocks(), start, end, resolution=self.resolution) return history.groupby("symbol").agg(AGG_OPS) if len(history) > 0 \ else None def coarse_filter(self, coarse): trade_days = self.TradingCalendar.GetTradingDays(self.Time, self.Time+timedelta(7)) next_day = list(filter(lambda p: p.BusinessDay and not p.PublicHoliday, trade_days))[0].Date.date() tickers = self.atms.query("Date == @next_day") universe = [] if len(tickers) == 0 else \ [x.Symbol for x in coarse if (x.Symbol.Value == tickers["Symbol"]).any()] return universe
#region imports from AlgorithmImports import * #endregion """ Library of indicators @version: 0.11 """ import pandas as pd def filter_bars(bars, start, end): time_idx = bars.index.get_level_values("time") return bars.iloc[time_idx.indexer_between_time(start, end)] def rename(bars, name): return bars.rename(name) if isinstance(bars, pd.Series) \ else bars.add_prefix(f"{name}_") def get_daygrouper(): return [pd.Grouper(level="symbol"), pd.Grouper(level="time", freq="1D")] # Daily indicators def roll_max(bars, window, groupby="symbol"): groups = bars.groupby(groupby) output = groups.apply(lambda x: x.rolling(window, min_periods=1).max()) return output def roll_min(bars, window, groupby="symbol"): groups = bars.groupby(groupby) return groups.apply(lambda x: x.rolling(window).min()) def roll_average(bars, window, groupby="symbol", mean_type="arit"): mean_func = (lambda x: x.ewm(span=window).mean()) if mean_type=="exp" \ else (lambda x: x.rolling(window).mean()) return bars.groupby(groupby).apply(mean_func) def roll_range(bars, window): max_high = roll_max(bars["high"], window).squeeze() min_low = roll_min(bars["low"], window).squeeze() avg_close = roll_average(bars["close"], window).squeeze() return (avg_close-min_low)/(max_high-min_low) def roll_change(bars, window): return bars.groupby("symbol").pct_change(window) def position_range(bars, window): yesterday_bars = bars.groupby("symbol").shift(1) # Not including trading date max_high = roll_max(yesterday_bars["high"], window).squeeze() min_low = roll_min(yesterday_bars["low"], window).squeeze() return (bars["open"]-min_low)/(max_high-min_low) def gap(bars): yesterday_bars = bars.groupby("symbol").shift(1) # Not including trading date return bars["open"]/yesterday_bars["close"]-1 def extension(bars, window): max_high = roll_max(bars["high"], window).squeeze() min_low = roll_max(bars["low"], window).squeeze() return (bars["high"]-max_high)/(max_high-min_low) def retracement(bars, window): max_high = roll_max(bars["high"], window).squeeze() min_low = roll_max(bars["low"], window).squeeze() return (max_high-bars["low"])/(max_high-min_low) def gap_extension(bars): yesterday_bars = bars.groupby("symbol").shift(1) # Not including trading date return (yesterday_bars["high"]-bars["open"])/(bars["open"]-yesterday_bars["close"]) def day_range(bars): return bars.eval("(open-low)/(high-low)") def gap_retracement(bars): yesterday_bars = bars.groupby("symbol").shift(1) # Not including trading date return (bars["open"]-yesterday_bars["low"])/(bars["open"]-yesterday_bars["close"]) def roll_vwap(bars, window): price_volume = bars[["high","low","close"]].mean(axis=1)*bars["volume"] avg_price_volume = price_volume.groupby("symbol").apply(lambda x: x.rolling(window, min_periods=1).sum()) avg_volume = bars["volume"].groupby("symbol").apply(lambda x: x.rolling(window, min_periods=1).sum()) return avg_price_volume/avg_volume def shift(bars, shift): return bars.groupby("symbol").shift(shift) def divergence(num_bars, den_bars): return num_bars/den_bars-1 # Intra day indicators def intra_change(bars): grouper = bars.groupby(get_daygrouper()) return grouper.last()/grouper.first()-1 def intra_vwap(bars): price_volume = bars.eval("(high + low + close)/3 * volume") price_volume = price_volume.groupby("symbol").cumsum() volume = bars["volume"].groupby("symbol").cumsum() return price_volume/volume def intra_average(bars): return bars.groupby(get_daygrouper()).average() def intra_max(bars): return bars.groupby(get_daygrouper()).max() def intra_min(bars): return bars.groupby(get_daygrouper()).min() def intra_gapext(daily_bars, intra_bars): # Gap Extension numerator = intra_max(intra_bars["high"])-daily_bars["open"] denominator = daily_bars["open"] - daily_bars["close"].groupby("symbol").shift(1) return numerator.divide(denominator, axis="index") def intra_highext(daily_bars, intra_bars): # Total High Extension intra_high = intra_max(intra_bars["high"]) intra_low = intra_min(intra_bars["low"]) return (daily_bars["high"]-intra_high).divide(intra_high-intra_low, axis="index") def intra_retrace(bars): # Retrace grouper = bars.groupby(get_daygrouper()) start_bars = grouper.first() end_bars = grouper.last() return (end_bars["high"]-start_bars["high"])/(start_bars["high"]-start_bars["low"]) def intra_divup(bars): # Divergence Up vwap = intra_vwap(bars) return (bars["high"] - vwap) / vwap def intra_divdown(bars): # Divergence Down vwap = intra_vwap(bars) return (vwap - bars["low"]) / vwap def intra_position_range(bars): # Posin Range grouper = bars.groupby(get_daygrouper()) return (grouper["close"].last()-grouper["low"].min())/(grouper["high"].max()-grouper["low"].min()) def intra_relvolume(daily_bars, intra_bars, avg_days=10): grouper = intra_bars.groupby(get_daygrouper()) intra_volume = grouper["volume"].sum() avg_volume = shift(roll_average(daily_bars["volume"], avg_days), 1) # Shift 1 day later to match with intra-day data return intra_volume/avg_volume.squeeze() def intra_volume_hod(bars): grouper = bars.groupby(get_daygrouper()) index = grouper.apply(lambda x: x.idxmax()[1]) return grouper["volume"].cumsum()[index].groupby(get_daygrouper()).last()
""" Basic ML Liquidation Strategy @version: 0.13 @creation date: 16/07/2022 ! Added gap size and vwap divergence to features ! Reading the ATM file and dynamic gap filtering TODO: multiple entry points TODO: Offline storage of features to improve performance """ from AlgorithmImports import * import numpy as np import pandas as pd pd.set_option('mode.use_inf_as_na', True) from io import StringIO from ast import literal_eval from sklearn.model_selection import cross_val_score from sklearn.ensemble import GradientBoostingClassifier import indicators as idx from timeseriescv import TimeSeriesSplitGroups TICKERS_CSV = "https://drive.google.com/uc?export=download&id=1did0Sk3F9Sn5Il_nUX252jOB_n0UFqat" DATE_COL = "Agreement Start Date" SYMBOL_COL = "ticker" MIN_GAP = 0.15 AGG_OPS = {"open": "first", "close": "last", "high": "max", "low": "min", "volume": "sum"} class LiquidationBasicML(QCAlgorithm): def Initialize(self): self.SetStartDate(2021, 10, 1) self.SetEndDate(2022, 6, 1) self.model = GradientBoostingClassifier(n_iter_no_change=3) self.benchmark = self.GetParameter("benchmark") self.capital = literal_eval(self.GetParameter("capital")) self.SetCash(self.capital) self.atms = self.get_atms() self.train_start = self.atms.index.get_level_values("time").min() self.AddEquity(self.benchmark, Resolution.Minute) self.SetBenchmark(self.benchmark) self.confidence = 0 self.cv = TimeSeriesSplitGroups(n_splits=10) self.UniverseSettings.DataNormalizationMode = DataNormalizationMode.Raw every_day = self.DateRules.EveryDay(self.benchmark) at = self.TimeRules.At self.Train(self.DateRules.MonthStart(), at(0, 0), self.train_model) self.Schedule.On(every_day, at(9, 45), self.enter_trades) self.Schedule.On(every_day, at(10, 30), self.exit_trades) self.Schedule.On(every_day, at(11, 30), self.liquidate) def train_model(self): x, y = self.get_data(self.train_start, self.Time, with_target=True) fit_params = dict(sample_weight=abs(np.log1p(y))) time_groups = x.index.get_level_values("time") y_binary = (y>0).astype(float) cv_scores = cross_val_score(self.model, X=x, y=y_binary, cv=self.cv, groups=time_groups, fit_params=fit_params) self.confidence = max(np.mean(cv_scores) - 0.5, 0) * 2 self.model.fit(x, y_binary, **fit_params) self.Debug(f"Training Points: {len(x)} Confidence: {self.confidence:.1%}") self.Plot("ML", "Confidence", self.confidence) def enter_trades(self): if self.confidence == 0: return start, end = self.get_last_day(self.Time), self.Time x_pred = self.get_data(start, end, with_target=False) if x_pred is None: return y_proba = pd.Series(self.model.predict_proba(x_pred)[:, 1], index=x_pred.index.get_level_values("symbol")) positions = -y_proba.apply(lambda x: max(x-0.5, 0) * 2) * self.confidence for symbol, pos in positions[positions != 0].items(): self.Debug(f"{self.Time} - Trading {symbol}") qty = self.CalculateOrderQuantity(symbol, pos) last_price = self.Securities[symbol].Price self.LimitOrder(symbol, qty, last_price) def exit_trades(self): for s in self.ActiveSecurities.Keys: self.Transactions.CancelOpenOrders(s) qty = self.Portfolio[s].Quantity price = self.Portfolio[s].Price if qty != 0: self.LimitOrder(s, -self.Portfolio[s].Quantity, price) def liquidate(self): self.Transactions.CancelOpenOrders() self.Liquidate() def get_data(self, start, end, with_target=True): gaplist = self.get_gaplist(start, end) if len(gaplist) == 0: return None minute_bars = [] for symbol, day in gaplist.index: end = min(day + timedelta(1), self.Time) minute_bars += [self.History([symbol], day, end, Resolution.Minute)] minute_bars = pd.concat(minute_bars) if len(minute_bars) == 0: return None # TODO: Fix gaplist function opening_bar = agg_bars(minute_bars, "09:31", "09:45") pm_bar = agg_bars(minute_bars, "07:01", "09:30") features = gaplist.to_frame("gap") features["opening_range"] = opening_bar.eval("(close-low)/(high-low)") features["pm_volume_usd"] = pm_bar.eval("close * volume") opening_bars = idx.filter_bars(minute_bars, "09:31", "09:45") grouper = [pd.Grouper(level="symbol"), pd.Grouper(level="time", freq="1D")] divergence = opening_bars["close"]/idx.intra_vwap(opening_bars) - 1 features["max_divergence"] = divergence.groupby(grouper).max() features["min_divergence"] = divergence.groupby(grouper).min() features["atm_days"] = self.atms.atm_days features["last_atm_size"] = self.atms["Total ATM Capacity"] features.eval("pm_volume_atm = pm_volume_usd / last_atm_size", inplace=True) # TODO: Add shares as % of float # features["last_atm_gap_days"] = self.atms.groupby(["symbol", "atm_date"]).cumcount() # TODO: Add gap days since last ATM features.dropna(inplace=True) if with_target: target_bar = agg_bars(minute_bars, "09:46", "10:29") targets = -target_bar.eval("close/open - 1").dropna() # Inverted return since we are shorting index = targets.index.intersection(features.index) return features.loc[index], targets.loc[index] else: return features def get_atms(self): csv = StringIO(self.Download(TICKERS_CSV)) atms = pd.read_csv(csv, parse_dates=[DATE_COL], dayfirst=True, thousands=",") atms.query("OfferingType == 'ATM'", inplace=True) atms[DATE_COL] = atms[DATE_COL].dt.date atms = atms.rename(columns={DATE_COL: "time", SYMBOL_COL: "symbol"}) [self.AddEquity(s, Resolution.Minute, extendedMarketHours=True) for s in atms["symbol"].unique()] atms["symbol"] = atms["symbol"].apply(lambda x: str(self.Symbol(x).ID)) atms.set_index(["symbol", "time"], inplace=True) atms = atms[~atms.index.duplicated(keep='first')] atms["atm_date"] = atms.index.get_level_values("time") # extend last ATM data forward in time to match with trading days time_idx = atms.index.get_level_values("time").unique() periods = (max(time_idx) - min(time_idx) + timedelta(365)).days new_time_idx = pd.date_range(min(time_idx), periods=periods, freq='D') new_symbol_idx = atms.index.get_level_values("symbol").unique() new_idx = pd.MultiIndex.from_product([new_symbol_idx, new_time_idx], names=["symbol", "time"]) atms = atms.reindex(index=new_idx).groupby("symbol").ffill() time_idx = atms.index.get_level_values("time") atms["atm_days"] = (time_idx - atms["atm_date"]) / np.timedelta64(1, 'D') return atms.dropna(subset=["atm_date"]).query("atm_days <= 365") def get_gaplist(self, start, end): valid_atms = self.atms.query("(time >= @start) and (time <= @end)") symbols = valid_atms.index.get_level_values("symbol").unique().tolist() day_start = self.get_last_day(start) day_bars = self.History(symbols, day_start, end, Resolution.Daily) today_start = self.Time.replace(hour=9, minute=30, second=0) if end > today_start: # adding manually the last day bar if missing today_end = min(end, self.Time) last_day_bars = self.History(symbols, today_start, today_end, Resolution.Minute) last_day_bar = agg_bars(last_day_bars, "09:31", "15:59") day_bars = pd.concat([day_bars, last_day_bar]) yesterday_bars = day_bars.groupby("symbol").shift(1) gaps = day_bars["open"]/yesterday_bars["close"]-1 return gaps[gaps > MIN_GAP] def get_last_day(self, date): trade_days = self.TradingCalendar.GetTradingDays(date - timedelta(7), date - timedelta(1)) return list(filter(lambda p: p.BusinessDay and not p.PublicHoliday, trade_days))[-1].Date def agg_bars(minute_bars, start_time, end_time): grouper = [pd.Grouper(level="symbol"), pd.Grouper(level="time", freq="1D")] filtered_bars = idx.filter_bars(minute_bars, start_time, end_time) return filtered_bars.groupby(grouper).agg(AGG_OPS)
#region imports from AlgorithmImports import * #endregion import math import numpy as np from math import factorial from itertools import combinations from sklearn.model_selection._split import _BaseKFold, indexable class TimeSeriesSplitGroups(_BaseKFold): def __init__(self, n_splits=5, purge_groups=0): super().__init__(n_splits, shuffle=False, random_state=None) self.purge_groups = purge_groups def split(self, X, y=None, groups=None): X, y, groups = indexable(X, y, groups) n_folds = self.n_splits + 1 group_list = np.unique(groups) n_groups = len(group_list) if n_folds + self.purge_groups > n_groups: raise ValueError((f"Cannot have number of folds plus purged groups " f"={n_folds+self.purge_groups} greater than the " f"number of groups: {n_groups}.")) test_size = (n_groups-self.purge_groups) // n_folds test_starts = [n_groups-test_size*c for c in range(1, n_folds)] for tstart in test_starts: train_idx = np.isin(groups, group_list[:tstart - self.purge_groups]) test_idx = np.isin(groups, group_list[tstart:tstart + test_size]) yield (np.nonzero(train_idx)[0], np.nonzero(test_idx)[0]) class CombinatorialPurgedCV(_BaseKFold): def __init__(self, n=4, k=2, purge=0, embargo=0): self.n = n self.k = k self.purge = purge self.embargo = embargo n_splits = int(factorial(n)/(factorial(k)*factorial(n-k))) super().__init__(n_splits, shuffle=False, random_state=None) def split(self, X, y=None, groups=None): X, y, groups = indexable(X, y, groups) unique_groups = list(np.unique(groups)) required_folds = self.n_splits if required_folds > len(unique_groups): raise ValueError((f"Required folds ={required_folds} greater than " f"the number of groups: {len(unique_groups)}.")) fold_size = int(math.ceil(len(unique_groups) / self.n)) test_folds = combinations(range(self.n), self.k) for test_fold in test_folds: train_groups, test_groups = [], [] for c in range(self.n): start = c * fold_size stop = min((c + 1) * fold_size, len(unique_groups)) # To avoid going out of bound if c in test_fold: test_groups += unique_groups[start:stop] else: # Naive fold sizing, should be distributed before train/test split if c-1 in test_fold: start += self.embargo if c+1 in test_fold: stop -= (self.purge+self.embargo) train_groups += unique_groups[start:stop] train_idx = np.nonzero(np.isin(groups, train_groups))[0] test_idx = np.nonzero(np.isin(groups, test_groups))[0] yield train_idx, test_idx