Overall Statistics |
Total Trades 318 Average Win 0.77% Average Loss -0.83% Compounding Annual Return 48.848% Drawdown 11.500% Expectancy 0.395 Net Profit 64.724% Sharpe Ratio 2.619 Probabilistic Sharpe Ratio 93.832% Loss Rate 28% Win Rate 72% Profit-Loss Ratio 0.93 Alpha 0.324 Beta 0.027 Annual Standard Deviation 0.124 Annual Variance 0.015 Information Ratio 1.671 Tracking Error 0.199 Treynor Ratio 11.788 Total Fees $867.33 Estimated Strategy Capacity $5200000.00 Lowest Capacity Asset BWV XW3T2OYN5MJP |
""" Big Bertha Strategy with Machine Learning Last changes: v0.27: Minimum probability parameter v0.26: Added both TP and SL capabilities (naive triple barrier targets) v0.25: Individual probability-based sizing (in addition to general Kelly sizing) v0.24: Offline model storage v0.23: Lookback parameter @version: 0.27 @creation date: 05/07/2022 """ from AlgorithmImports import * import pickle import numpy as np import pandas as pd from sklearn.model_selection import cross_val_score from sklearn.ensemble import GradientBoostingClassifier import indicators as idx from timeseriescv import TimeSeriesSplitGroups pd.set_option('mode.use_inf_as_na', True) GROUPER = [pd.Grouper(level="symbol"), pd.Grouper(level="time", freq="1D")] AGG_OPS = {"open": "first", "close": "last", "high": "max", "low": "min", "volume": "sum"} def catch_errors(func): def wrap(self, *args, **kwargs): try: result = func(self, *args, **kwargs) return result except (KeyError, ValueError) as e: self.print(e) return return wrap class BigBerthaML(QCAlgorithm): def Initialize(self): self.min_usd_volume = self.GetParameter("min_usd_volume", 1e9) # Minimum trading volume in previous trading day self.target_gain = self.GetParameter("target_gain", 0.05) # Minimum target gain to enter the trade self.capital = self.GetParameter("capital", 80000) # Starting capital self.lookback = self.GetParameter("lookback", 365) # Trading days used for model training self.strategy = self.GetParameter("strategy", 0) # -1 short only, +1 long only, 0 long/short self.benchmark = self.GetParameter("benchmark", "SPY") # Performance benchmark self.cv_splits = self.GetParameter("cv_splits", 10) # Number of splits for model cross validation self.store_model = self.GetParameter("store_model", None) # Model name if it needs to be stored self.sl_retr = self.GetParameter("retracement_sl", 0) # Retracement percentage to use for the Stop Loss, disabled if 0 self.tp_ext = self.GetParameter("extension_tp", 0) # Extension percentage to use for the Take Profit, disabled if 0 self.SetStartDate(2021, 6, 1) self.SetEndDate(2022, 9, 1) self.SetCash(self.capital) self.UniverseSettings.Resolution = Resolution.Minute self.UniverseSettings.ExtendedMarketHours = True self.AddUniverse(self.coarse_filter) self.AddEquity(self.benchmark, Resolution.Minute) self.SetBenchmark(self.benchmark) #self.ObjectStore.Delete(self.store_model) # Deleting existing data if self.store_model is not None and self.ObjectStore.ContainsKey(self.store_model): self.model = pickle.loads(bytes(self.ObjectStore.ReadBytes(self.store_model))) else: self.model = GradientBoostingClassifier(n_iter_no_change=10) self.model.edge = 0 self.cv = TimeSeriesSplitGroups(n_splits=self.cv_splits) self.features, self.targets = None, None at = self.TimeRules.At every_day = self.DateRules.EveryDay(self.benchmark) self.Train(self.DateRules.WeekStart(), at(0, 0), self.train_model) self.Schedule.On(every_day, at(9, 35), self.enter_trades) self.Schedule.On(every_day, at(15, 55), self.exit_trades) def coarse_filter(self, coarse): return [x.Symbol for x in coarse if x.HasFundamentalData and x.DollarVolume > self.min_usd_volume] def train_model(self): if self.features is None or self.targets is None: return idx = self.features.index.intersection(self.targets.index) # Removing features without matching targets idx = idx[idx.get_level_values("time") > self.Time - timedelta(self.lookback)] self.features = self.features.loc[idx] self.targets = self.targets.loc[idx] training_days = idx.get_level_values("time") if len(training_days.unique()) <= 21: return # Require more than one month of training data cv_scores = cross_val_score(self.model, X=self.features, y=self.targets, cv=self.cv, groups=training_days, scoring="balanced_accuracy") self.model.fit(self.features, self.targets) if self.store_model is not None: self.ObjectStore.SaveBytes(self.store_model, pickle.dumps(self.model)) score = np.mean(cv_scores) n_classes = len(self.model.classes_) self.model.edge = (n_classes * score - 1) / (n_classes - 1) # Kelly edge calculation with multiple classes self.print(f"Training: {self.targets.value_counts()} Edge:{self.model.edge:.1%}") self.Plot("ML", "Edge", self.model.edge) def enter_trades(self): self.store_features() if self.model.edge <= 0: return x_pred = self.features.query("time == @self.Time.date()") x_pred.index = x_pred.index.droplevel("time") y_proba = pd.DataFrame(self.model.predict_proba(x_pred), index=x_pred.index, columns=self.model.classes_) y_pred = y_proba.idxmax(axis=1) sizes = (y_proba.max(axis=1) - 0.5).clip(0, 1) * 2 # Selecting only prob > 50% and scaling to 100% positions = y_pred * sizes * self.model.edge # Sizing based on Kelly and individual probabilty if sum(abs(positions)) > 1: positions /= sum(abs(positions)) # Ensuring no leverage is used self.print(f"Trading: {y_pred.value_counts()}") for symbol, pos in positions[positions != 0].items(): qty = self.CalculateOrderQuantity(symbol, pos) self.MarketOrder(symbol, qty) features = x_pred.loc[symbol] window = (features.bb_high - features.bb_low) if self.sl_retr > 0: stop_loss = features.bb_high - window * self.sl_retr if pos > 0 \ else features.bb_low + window * self.sl_retr self.StopLimitOrder(symbol, -qty, stop_loss, stop_loss) if self.tp_ext > 0: take_profit = features.bb_low + window * self.tp_ext if pos > 0 \ else features.bb_high - window * self.tp_ext self.LimitOrder(symbol, -qty, take_profit) def exit_trades(self): self.Transactions.CancelOpenOrders() self.Liquidate() self.store_targets() @catch_errors def store_features(self): start = self.Time.replace(hour=7, minute=1, second=0, microsecond=0) tickers = list(self.ActiveSecurities.Keys) last_minute = self.Time.replace(second=0, microsecond=0) minute_bars = self.History(tickers, start, last_minute, Resolution.Minute) pm_bar = agg_bars(minute_bars, "07:01", "09:30") entry_hr, entry_mn = last_minute.hour, last_minute.minute bertha_bar = agg_bars(minute_bars, "09:31", f"{entry_hr}:{entry_mn}") new_features = bertha_bar.add_prefix("bb_") new_features.eval("bb_size = (bb_high-bb_low)/bb_open", inplace=True) new_features.eval("bb_close_range = (bb_close-bb_low)/(bb_high-bb_low)", inplace=True) new_features.eval("bb_open_range = (bb_open-bb_low)/(bb_high-bb_low)", inplace=True) new_features["pm_volume_usd"] = pm_bar.eval("close * volume") yesterday_bar = self.History(tickers, 1, Resolution.Daily) yesterday_close = yesterday_bar["close"].droplevel("time") new_features["gap"] = bertha_bar["open"] / yesterday_close - 1 self.features = pd.concat([new_features.dropna(), self.features]) self.print(f"Stored new features, total: {len(self.features)}") @catch_errors def store_targets(self): last_features = self.features.query("time == @self.Time.date()") self.Log(last_features) tickers = list(last_features.index.get_level_values("symbol")) last_minute = self.Time.replace(second=0, microsecond=0) self.Log(f"Target time: {last_minute}") minute_bars = self.History(tickers, last_minute - timedelta(minutes=1), last_minute, Resolution.Minute) self.Log(minute_bars) trading_bar = minute_bars.droplevel("time").join(last_features) new_targets = trading_bar.apply(self.calc_target, axis=1) self.targets = pd.concat([new_targets.dropna(), self.targets]) self.print(f"Stored new targets, total: {len(self.targets)}") def calc_target(self, price_bar): entry_price, exit_price = price_bar.bb_close, price_bar.close window = (price_bar.bb_high - price_bar.bb_low) if exit_price >= entry_price * (1 + self.target_gain) and self.strategy >= 0: # long trade if self.sl_retr > 0: stop_loss = price_bar.bb_high - window * self.sl_retr return +1 if price_bar.low > stop_loss else 0 # 1 if profitable long and not touching the SL else: return +1 elif exit_price <= entry_price * (1 - self.target_gain) and self.strategy <= 0: # short trade if self.sl_retr > 0: stop_loss = price_bar.bb_low + window * self.sl_retr return -1 if price_bar.high < stop_loss else 0 # -1 if profitable short and not touching the SL else: return -1 else: return 0 def print(self, msg): self.Debug(f"{self.Time} {msg}") def agg_bars(minute_bars, start_time, end_time): filtered_bars = idx.filter_bars(minute_bars, start_time, end_time) return filtered_bars.groupby(GROUPER).agg(AGG_OPS)