Overall Statistics |
Total Trades 521 Average Win 0.99% Average Loss -0.57% Compounding Annual Return 42.026% Drawdown 12.500% Expectancy 0.178 Net Profit 26.412% Sharpe Ratio 1.693 Probabilistic Sharpe Ratio 68.260% Loss Rate 57% Win Rate 43% Profit-Loss Ratio 1.72 Alpha 0.293 Beta -0.031 Annual Standard Deviation 0.174 Annual Variance 0.03 Information Ratio 1.332 Tracking Error 0.25 Treynor Ratio -9.438 Total Fees $38358.45 Estimated Strategy Capacity $19000.00 Lowest Capacity Asset BHTG WTJGQV6N9U91 |
#region imports from AlgorithmImports import * #endregion """ Library of indicators @version: 0.11 """ import pandas as pd def filter_bars(bars, start, end): time_idx = bars.index.get_level_values("time") return bars.iloc[time_idx.indexer_between_time(start, end)] def rename(bars, name): return bars.rename(name) if isinstance(bars, pd.Series) \ else bars.add_prefix(f"{name}_") def get_daygrouper(): return [pd.Grouper(level="symbol"), pd.Grouper(level="time", freq="1D")] # Daily indicators def roll_max(bars, window, groupby="symbol"): groups = bars.groupby(groupby) output = groups.apply(lambda x: x.rolling(window, min_periods=1).max()) return output def roll_min(bars, window, groupby="symbol"): groups = bars.groupby(groupby) return groups.apply(lambda x: x.rolling(window).min()) def roll_average(bars, window, groupby="symbol", mean_type="arit"): mean_func = (lambda x: x.ewm(span=window).mean()) if mean_type=="exp" \ else (lambda x: x.rolling(window).mean()) return bars.groupby(groupby).apply(mean_func) def roll_range(bars, window): max_high = roll_max(bars["high"], window).squeeze() min_low = roll_min(bars["low"], window).squeeze() avg_close = roll_average(bars["close"], window).squeeze() return (avg_close-min_low)/(max_high-min_low) def roll_change(bars, window): return bars.groupby("symbol").pct_change(window) def position_range(bars, window): yesterday_bars = bars.groupby("symbol").shift(1) # Not including trading date max_high = roll_max(yesterday_bars["high"], window).squeeze() min_low = roll_min(yesterday_bars["low"], window).squeeze() return (bars["open"]-min_low)/(max_high-min_low) def gap(bars): yesterday_bars = bars.groupby("symbol").shift(1) # Not including trading date return bars["open"]/yesterday_bars["close"]-1 def extension(bars, window): max_high = roll_max(bars["high"], window).squeeze() min_low = roll_max(bars["low"], window).squeeze() return (bars["high"]-max_high)/(max_high-min_low) def retracement(bars, window): max_high = roll_max(bars["high"], window).squeeze() min_low = roll_max(bars["low"], window).squeeze() return (max_high-bars["low"])/(max_high-min_low) def gap_extension(bars): yesterday_bars = bars.groupby("symbol").shift(1) # Not including trading date return (yesterday_bars["high"]-bars["open"])/(bars["open"]-yesterday_bars["close"]) def day_range(bars): return bars.eval("(open-low)/(high-low)") def gap_retracement(bars): yesterday_bars = bars.groupby("symbol").shift(1) # Not including trading date return (bars["open"]-yesterday_bars["low"])/(bars["open"]-yesterday_bars["close"]) def roll_vwap(bars, window): price_volume = bars[["high","low","close"]].mean(axis=1)*bars["volume"] avg_price_volume = price_volume.groupby("symbol").apply(lambda x: x.rolling(window, min_periods=1).sum()) avg_volume = bars["volume"].groupby("symbol").apply(lambda x: x.rolling(window, min_periods=1).sum()) return avg_price_volume/avg_volume def shift(bars, shift): return bars.groupby("symbol").shift(shift) def divergence(num_bars, den_bars): return num_bars/den_bars-1 # Intra day indicators def intra_change(bars): grouper = bars.groupby(get_daygrouper()) return grouper.last()/grouper.first()-1 def intra_vwap(bars): price_volume = bars.eval("(high + low + close)/3 * volume") price_volume = price_volume.groupby("symbol").cumsum() volume = bars["volume"].groupby("symbol").cumsum() return price_volume/volume def intra_average(bars): return bars.groupby(get_daygrouper()).average() def intra_max(bars): return bars.groupby(get_daygrouper()).max() def intra_min(bars): return bars.groupby(get_daygrouper()).min() def intra_gapext(daily_bars, intra_bars): # Gap Extension numerator = intra_max(intra_bars["high"])-daily_bars["open"] denominator = daily_bars["open"] - daily_bars["close"].groupby("symbol").shift(1) return numerator.divide(denominator, axis="index") def intra_highext(daily_bars, intra_bars): # Total High Extension intra_high = intra_max(intra_bars["high"]) intra_low = intra_min(intra_bars["low"]) return (daily_bars["high"]-intra_high).divide(intra_high-intra_low, axis="index") def intra_retrace(bars): # Retrace grouper = bars.groupby(get_daygrouper()) start_bars = grouper.first() end_bars = grouper.last() return (end_bars["high"]-start_bars["high"])/(start_bars["high"]-start_bars["low"]) def intra_divup(bars): # Divergence Up vwap = intra_vwap(bars) return (bars["high"] - vwap) / vwap def intra_divdown(bars): # Divergence Down vwap = intra_vwap(bars) return (vwap - bars["low"]) / vwap def intra_position_range(bars): # Posin Range grouper = bars.groupby(get_daygrouper()) return (grouper["close"].last()-grouper["low"].min())/(grouper["high"].max()-grouper["low"].min()) def intra_relvolume(daily_bars, intra_bars, avg_days=10): grouper = intra_bars.groupby(get_daygrouper()) intra_volume = grouper["volume"].sum() avg_volume = shift(roll_average(daily_bars["volume"], avg_days), 1) # Shift 1 day later to match with intra-day data return intra_volume/avg_volume.squeeze() def intra_volume_hod(bars): grouper = bars.groupby(get_daygrouper()) index = grouper.apply(lambda x: x.idxmax()[1]) return grouper["volume"].cumsum()[index].groupby(get_daygrouper()).last()
""" Multi-Entry ML Liquidation Strategy @version: 0.2 @creation date: 8/9/2022 """ from AlgorithmImports import * import numpy as np import pandas as pd pd.set_option('mode.use_inf_as_na', True) from io import StringIO from ast import literal_eval from sklearn.model_selection import cross_val_score from sklearn.ensemble import GradientBoostingClassifier import indicators as idx from timeseriescv import TimeSeriesSplitGroups TICKERS_CSV = "https://drive.google.com/uc?export=download&id=1did0Sk3F9Sn5Il_nUX252jOB_n0UFqat" DATE_COL = "Agreement Start Date" SYMBOL_COL = "ticker" AGG_OPS = {"open": "first", "close": "last", "high": "max", "low": "min", "volume": "sum"} SECS_PER_DAY = 24 * 60 * 60 class LiquidationBasicML(QCAlgorithm): def Initialize(self): self.SetStartDate(2021, 10, 1) self.SetEndDate(2022, 6, 1) self.benchmark = self.GetParameter("benchmark") self.capital = literal_eval(self.GetParameter("capital")) self.min_gap = literal_eval(self.GetParameter("min_gap")) self.kelly_frac = literal_eval(self.GetParameter("kelly_frac")) self.SetCash(self.capital) self.atm = self.get_atm() self.atm_start = self.atm.index.get_level_values("time").min() self.AddEquity(self.benchmark, Resolution.Minute) self.SetBenchmark(self.benchmark) self.last_update = datetime(2000, 1, 1) self.last_training = datetime(2000, 1, 1) self.gaplist, self.features, self.targets = None, None, None self.model = GradientBoostingClassifier(n_iter_no_change=3) self.cv = TimeSeriesSplitGroups(n_splits=10) self.edge = 0 self.UniverseSettings.DataNormalizationMode = DataNormalizationMode.Raw every_day = self.DateRules.EveryDay(self.benchmark) at = self.TimeRules.At self.Train(every_day, at(0, 0), self.train_model) self.Schedule.On(every_day, at(9, 31), self.update_gaplist) self.Schedule.On(every_day, at(9, 35), self.enter_trades) self.Schedule.On(every_day, at(9, 45), self.enter_trades) self.Schedule.On(every_day, at(10, 30), self.exit_trades) self.Schedule.On(every_day, at(15, 55), self.liquidate) def train_model(self): days_since_training = (self.Time - self.last_training).days if self.features is None \ or (days_since_training <= 30 and self.edge > 0): return clean_idx = self.features.index.intersection(self.targets.index) # Removing features without matching targets self.features = self.features.loc[clean_idx] self.targets = self.targets.loc[clean_idx] fit_params = dict(sample_weight=abs(self.targets)) # Weighting each sample by the log return abs value date_groups = self.features.index.get_level_values("time") y_binary = (self.targets>0).astype(float) # Converting the target return to up down direction cv_scores = cross_val_score(self.model, X=self.features, y=y_binary, cv=self.cv, groups=date_groups, fit_params=fit_params) self.edge = np.mean(cv_scores - (1 - cv_scores)) self.model.fit(self.features, y_binary, **fit_params) self.print(f"Training Points: {len(self.features)} Edge: {self.edge:.1%}") self.Plot("ML", "Edge", self.edge) self.last_training = self.Time def enter_trades(self): self.update_features() x_pred = self.features.query("time == @self.Time.date()") # TODO: Fix indexing x_pred = x_pred.groupby("symbol").head(1) if self.edge <= 0 or len(x_pred) == 0: return x_pred.index = x_pred.index.droplevel("time") y_pred = pd.Series(self.model.predict(x_pred), index=x_pred.index) positions = y_pred * self.edge * self.kelly_frac # Kelly positions for symbol, pos in positions.items(): self.SetHoldings(symbol, -pos) self.print(f"Trading {pos:.1%} of {symbol}") def exit_trades(self): for s in self.ActiveSecurities.Keys: self.Transactions.CancelOpenOrders(s) qty = self.Portfolio[s].Quantity if qty != 0: self.LimitOrder(s, -qty, self.Portfolio[s].Price) self.update_targets() def liquidate(self): self.Transactions.CancelOpenOrders() self.Liquidate() def update_features(self): new_features = self.gaplist.query("time > @self.last_update") now = self.Time entry_hr, entry_mn = now.hour, now.minute minute_bars = [self.History([symbol], day.replace(hour=7, minute=1), day.replace(hour=entry_hr, minute=entry_mn), Resolution.Minute) for symbol, day in new_features.index] try: minute_bars = pd.concat(minute_bars) pm_bar = agg_bars(minute_bars, "07:01", "09:30") opening_bar = agg_bars(minute_bars, "09:31", f"{entry_hr}:{entry_mn}") except (KeyError, ValueError) as e: self.print(e) return new_features = new_features.join(opening_bar.add_prefix("opening_")) new_features["opening_range"] = opening_bar.eval("(close-low)/(high-low)") new_features["pm_volume_usd"] = pm_bar.eval("close * volume") last_atm = self.atm.query("time <= @self.Time").groupby("symbol").last() new_features = new_features.join(last_atm[["atm_date", "atm_size", "atm_offer"]]) seconds_since_atm = (self.Time.timestamp() - new_features["atm_date"]) new_features["atm_days"] = seconds_since_atm / SECS_PER_DAY opening_bars = idx.filter_bars(minute_bars, "09:31", f"{entry_hr}:{entry_mn}") divergence = opening_bars["close"] / idx.intra_vwap(opening_bars) - 1 grouper = [pd.Grouper(level="symbol"), pd.Grouper(level="time", freq="1D")] new_features["max_divergence"] = divergence.groupby(grouper).max() new_features["min_divergence"] = divergence.groupby(grouper).min() new_features["seconds_to_exit"] = (now.replace(hour=10, minute=30) - now).seconds new_features.eval("pm_volume_atm = pm_volume_usd / atm_size", inplace=True) # TODO: Add shares as % of float self.features = pd.concat([new_features.dropna(), self.features]) self.features["gap_days"] = self.features.groupby(["symbol", "atm_date"]).cumcount() self.print(f"Stored {len(new_features)} new features, total: {len(self.features)}") self.Log(new_features.to_string()) def update_targets(self): new_features = self.features.query("time > @self.last_update") exit_hr, exit_mn = self.Time.hour, self.Time.minute minute_bars = [self.History([symbol], day.replace(hour=exit_hr, minute=exit_mn)-timedelta(minutes=1), day.replace(hour=exit_hr, minute=exit_mn), Resolution.Minute) for symbol, day in new_features.index] try: minute_bars = pd.concat(minute_bars) target_bar = agg_bars(minute_bars, "09:30", f"{exit_hr}:{exit_mn}") except (KeyError, ValueError) as e: self.print(e) return new_targets = -(target_bar["close"] / new_features["opening_close"] - 1) # Inverted return since we are shorting self.targets = pd.concat([new_targets.dropna(), self.targets]) self.last_update = self.Time self.print(f"Stored {len(new_targets)} new targets, total: {len(self.targets)}") def update_gaplist(self): last_update = self.atm_start if self.gaplist is None \ else self.gaplist.index.get_level_values("time").max() last_valid_atm = last_update - timedelta(365) valid_atm = self.atm.query("(time >= @last_valid_atm) and (time <= @self.Time)") tickers = valid_atm.index.get_level_values("symbol").unique().tolist() day_bars = self.History(tickers, last_update, self.Time, Resolution.Daily) shifted_time_idx = day_bars.index.levels[1].shift(-1, freq="D") day_bars.index = day_bars.index.set_levels(shifted_time_idx, level=1) today_start = self.Time.replace(hour=9, minute=30) if self.Time > today_start: # adding manually the last day bar if missing last_day_bars = self.History(tickers, today_start, self.Time, Resolution.Minute) last_day_bar = agg_bars(last_day_bars, "09:31", "16:00") day_bars = pd.concat([day_bars, last_day_bar]) yesterday_close = day_bars["close"].groupby("symbol").shift(1) gaps = day_bars["open"] / yesterday_close - 1 new_gaplist = gaps[gaps >= self.min_gap].to_frame("gap") self.gaplist = pd.concat([new_gaplist, self.gaplist]) def get_atm(self): csv = StringIO(self.Download(TICKERS_CSV)) atms = pd.read_csv(csv, parse_dates=[DATE_COL], dayfirst=True, thousands=",") atms.eval("atm_offer = OfferingType == 'ATM'", inplace=True) atms = atms.rename(columns={DATE_COL: "time", SYMBOL_COL: "symbol", "Total ATM Capacity": "atm_size"}) [self.AddEquity(s, Resolution.Minute, extendedMarketHours=True) for s in atms["symbol"].unique()] atms["symbol"] = atms["symbol"].apply(lambda x: str(self.Symbol(x).ID)) atms["atm_date"] = atms["time"].apply(lambda x: x.timestamp()) return atms.set_index(["symbol", "time"]) def print(self, msg): self.Debug(f"{self.Time} {msg}") def agg_bars(minute_bars, start_time, end_time): grouper = [pd.Grouper(level="symbol"), pd.Grouper(level="time", freq="1D")] filtered_bars = idx.filter_bars(minute_bars, start_time, end_time) return filtered_bars.groupby(grouper).agg(AGG_OPS)
#region imports from AlgorithmImports import * #endregion import math import numpy as np from math import factorial from itertools import combinations from sklearn.model_selection._split import _BaseKFold, indexable class TimeSeriesSplitGroups(_BaseKFold): def __init__(self, n_splits=5, purge_groups=0): super().__init__(n_splits, shuffle=False, random_state=None) self.purge_groups = purge_groups def split(self, X, y=None, groups=None): X, y, groups = indexable(X, y, groups) n_folds = self.n_splits + 1 group_list = np.unique(groups) n_groups = len(group_list) if n_folds + self.purge_groups > n_groups: raise ValueError((f"Cannot have number of folds plus purged groups " f"={n_folds+self.purge_groups} greater than the " f"number of groups: {n_groups}.")) test_size = (n_groups-self.purge_groups) // n_folds test_starts = [n_groups-test_size*c for c in range(1, n_folds)] for tstart in test_starts: train_idx = np.isin(groups, group_list[:tstart - self.purge_groups]) test_idx = np.isin(groups, group_list[tstart:tstart + test_size]) yield (np.nonzero(train_idx)[0], np.nonzero(test_idx)[0]) class CombinatorialPurgedCV(_BaseKFold): def __init__(self, n=4, k=2, purge=0, embargo=0): self.n = n self.k = k self.purge = purge self.embargo = embargo n_splits = int(factorial(n)/(factorial(k)*factorial(n-k))) super().__init__(n_splits, shuffle=False, random_state=None) def split(self, X, y=None, groups=None): X, y, groups = indexable(X, y, groups) unique_groups = list(np.unique(groups)) required_folds = self.n_splits if required_folds > len(unique_groups): raise ValueError((f"Required folds ={required_folds} greater than " f"the number of groups: {len(unique_groups)}.")) fold_size = int(math.ceil(len(unique_groups) / self.n)) test_folds = combinations(range(self.n), self.k) for test_fold in test_folds: train_groups, test_groups = [], [] for c in range(self.n): start = c * fold_size stop = min((c + 1) * fold_size, len(unique_groups)) # To avoid going out of bound if c in test_fold: test_groups += unique_groups[start:stop] else: # Naive fold sizing, should be distributed before train/test split if c-1 in test_fold: start += self.embargo if c+1 in test_fold: stop -= (self.purge+self.embargo) train_groups += unique_groups[start:stop] train_idx = np.nonzero(np.isin(groups, train_groups))[0] test_idx = np.nonzero(np.isin(groups, test_groups))[0] yield train_idx, test_idx