Overall Statistics |
Total Trades 4 Average Win 2.78% Average Loss -1.28% Compounding Annual Return 32.867% Drawdown 1.900% Expectancy 0.584 Net Profit 1.462% Sharpe Ratio 2.364 Probabilistic Sharpe Ratio 61.712% Loss Rate 50% Win Rate 50% Profit-Loss Ratio 2.17 Alpha 0.254 Beta -0.098 Annual Standard Deviation 0.104 Annual Variance 0.011 Information Ratio 1.102 Tracking Error 0.151 Treynor Ratio -2.503 Total Fees $219.62 Estimated Strategy Capacity $1900000.00 Lowest Capacity Asset NSA VZY31PE0HILH |
# region imports import numpy as np from AlgorithmImports import * from decimal import Decimal from io import StringIO from collections import deque from time import gmtime, strftime # endregion """ outline: get pairs monthly every day compute the spread using the computed weights if spread is above mean, buy spread reverse position when spread crossovers the mean if spread is below mean, sell spread reverse position when spread crossovers the mean new features: - don't reenter spreads from timeout until it has crossed the sma at least once |- how to do it? store trades, exit type, and flag as not tradeable until currpx > sma for long and vice versa |- updated logic so that spread is calculated on every interval to check spread crossing after trade timeout """ class StockDataSource(PythonData): def GetSource(self, config, date, isLiveMode): url = ( # "https://www.dropbox.com/s/2l73mu97gcehmh7/daily-stock-picker-live.csv?dl=1" # Brian's "https://www.dropbox.com/s/t4nafyooof485h7/pair_csv.csv?dl=1" # mine if isLiveMode # else "https://docs.google.com/spreadsheets/d/e/2PACX-1vRi02Q7O9jJG6Nl04PnpXBld_HhYyRBZchvQocMHnopjCN7jIQz6i1JYRKLPpvu3D5WPOakfEeNxZ-_/pub?gid=0&single=true&output=csv" # Brian's else "https://docs.google.com/spreadsheets/d/1jtMR6fAewQzDb08cg5qsCzXNliq5zpTvWCm7goVdDMc/export?format=csv" ) return SubscriptionDataSource(url, SubscriptionTransportMedium.RemoteFile) def Reader(self, config, line, date, isLiveMode): if not (line.strip() and line[0].isdigit()): return None stocks = StockDataSource() stocks.Symbol = config.Symbol csv = line.split(",") if isLiveMode: stocks.Time = date stocks["Symbols"] = csv else: stocks.Time = datetime.strptime(csv[0], "%Y-%m-%d") stocks["Symbols"] = csv[1:] return stocks def quantize(number, digits=-2): """ convert messy floating point to clean num of digits """ num_places = Decimal(10) ** digits q = Decimal(number).quantize(num_places) q = float(q) return q # Custom slippage implementation class CustomSlippageModel: def __init__(self, algorithm): self.algorithm = algorithm def GetSlippageApproximation(self, asset, order): # custom slippage math slippage = asset.Price * 0.0001 * np.log10(2 * float(order.AbsoluteQuantity)) self.algorithm.Debug(f"CustomSlippageModel: {slippage:.2f}") return slippage class symbolData: def __init__( self, pair, bb_multiple, window, algorithm, ): self.pair = pair self.symbol1 = self.pair[0] self.symbol2 = self.pair[1] self.bb_multiple = bb_multiple self.window = window self.spread = None self.intraday_spread = None self.roll_mean = None self.roll_std = None self.bb_upper = None self.bb_lower = None self.long_spread = False self.short_spread = False self.in_trade = False self.trade_entry_timestamp = None self.algo = algorithm self.trade_data = None self.trade_timeout = False def reset_trade_flags(self): self.long_spread = False self.short_spread = False self.in_trade = False self.trade_entry_timestamp = None self.trade_data = None self.trade_timeout = False return def spread_crossed_since_trade(self): """this should be used only if trade is timed out""" # time diff ts = self.trade_data["timestamp"].iloc[0] tmp_spread = self.spread.loc[ts:] tmp_roll_mean = self.roll_mean.loc[ts:] if self.long_spread: if any(tmp_spread > tmp_roll_mean): self.reset_trade_flags() elif self.short_spread: if any(tmp_spread < tmp_roll_mean): self.reset_trade_flags() def check_trade_timeout(self): if self.in_trade and self.trade_entry_timestamp is not None: # numpy.busday_count(start, end) trade_duration = np.busday_count( self.trade_entry_timestamp.strftime("%Y-%m-%d"), self.algo.Time.strftime("%Y-%m-%d"), ) if trade_duration >= self.algo.max_trade_duration: for symbol in self.pair: # how many shares do we hold of the symbol in this pair trade # liquidate only the amount for that pair trade held_quantity = self.trade_data.loc[symbol, "quantity"] if self.long_spread: self.algo.MarketOnCloseOrder( symbol, -1 * held_quantity, tag=f"TRADE TIMEOUT LONG SPREAD::{self.pair[0]}-{self.pair[1]}", ) # need to track over time to confirm that spread crosses sma before anymore long positions # cannot reset all flags before this happens self.trade_timeout = True self.spread_crossed_since_trade() elif self.short_spread: self.algo.MarketOnCloseOrder( symbol, -1 * held_quantity, tag=f"TRADE TIMEOUT SHORT SPREAD::{self.pair[0]}-{self.pair[1]}", ) # need to track over time to confirm that spread crosses sma before anymore long positions # cannot reset all flags before this happens self.trade_timeout = True self.spread_crossed_since_trade() return def compute_spread(self, prices, intraday_prices): # use ratio spread self.spread = prices[self.symbol1] / prices[self.symbol2] # intraday spread for entering positions self.intraday_spread = ( intraday_prices[self.symbol1].iloc[-1] / intraday_prices[self.symbol2].iloc[-1] ) roll_mean = self.spread.rolling(self.window).mean() roll_std = self.spread.rolling(self.window).std() self.roll_mean = roll_mean self.roll_std = roll_std bb_upper = roll_mean + self.bb_multiple * roll_std bb_lower = roll_mean - self.bb_multiple * roll_std self.bb_upper = bb_upper self.bb_lower = bb_lower return def is_short_trigger(self): cond = self.intraday_spread > self.bb_upper.iloc[-1] if cond: return True return False def is_long_trigger(self): cond = self.intraday_spread < self.bb_lower.iloc[-1] if cond: return True return False class pairsTrader(QCAlgorithm): def Initialize(self): self.cash = 1_000_000 self.SetStartDate(2021, 1, 1) # Set Start Date self.SetEndDate(2022, 1, 1) self.SetCash(self.cash) # Set Strategy Cash self.pairs = None self.symbols = None self.AddUniverse(StockDataSource, "my-stock-data-source", self.stockDataSource) self.splotName = "Spread" sPlot = Chart(self.splotName) sPlot.AddSeries(Series("spread", SeriesType.Line, 0)) sPlot.AddSeries(Series("rollmean", SeriesType.Line, 0)) sPlot.AddSeries(Series("bb upper", SeriesType.Line, 0)) sPlot.AddSeries(Series("bb lower", SeriesType.Line, 0)) self.AddChart(sPlot) self.PLOT = False self.pPlotName = "Concurrent Positions" pPlot = Chart(self.pPlotName) pPlot.AddSeries(Series("num positions", SeriesType.Line, 0)) self.AddChart(pPlot) ##################################### # algo parameters self.allocation = int(0.5 * self.cash / 2) # len(self.pairs)) self.Debug(f"default allocation to each pair trade: {self.allocation}") MarketOnCloseOrder.SubmissionTimeBuffer = timedelta(minutes=10) self.bb_multiple = 2.0 # for spread bollinger bands self.window = 20 # for spread bollinger bands self.lookback = int(60) # for price data self.portfolio_equity_history = list() # for anl vol of port self.portfolio_anl_vol_lookback = 60 # for anl vol of port self.history_resolution = Resolution.Daily self.max_trade_duration = 7 self.target_vol = 0.10 self.spy = self.AddEquity("SPY").Symbol self.SetBenchmark(self.spy) self.symbol_data = dict() ##################################### # schedule functions self.Schedule.On( self.DateRules.EveryDay(self.spy), self.TimeRules.At(9, 30), self.plot_num_positions, ) self.Schedule.On( self.DateRules.EveryDay(self.spy), self.TimeRules.At(9, 25), self.get_pairs ) self.Schedule.On( self.DateRules.EveryDay(self.spy), self.TimeRules.At(15, 45), self.trade ) self.Schedule.On( self.DateRules.EveryDay(self.spy), self.TimeRules.At(15, 30), self.check_trade_exit, ) ##################################### # algo functions def stockDataSource(self, data): symbol_list = [] pairs = [] for item in data: for symbol in item["Symbols"]: symbol_list.append(symbol) tmp = self.AddEquity( symbol, Resolution.Minute, Market.USA, True, 0, True ) tmp_symbol = tmp.Symbol tmp_symbol.MarginModel = PatternDayTradingMarginModel() tmp.SetDataNormalizationMode(DataNormalizationMode.Adjusted) for i, v in enumerate(symbol_list, start=1): if i % 2 == 0: chunk = symbol_list[i - 2 : i] pair = [] for symbol in chunk: pair.append(symbol) pair = tuple(pair) pairs.append(pair) self.symbols = symbol_list self.pairs = pairs self.Debug(f"{self.Time} pairs: {self.pairs}") return symbol_list def get_daily_prices(self): """ get price history and make sure each stock has minimum amount of data """ prices = self.get_history( self.symbols, self.lookback, Resolution.Daily, "close", "close prices" ) if prices.empty: return prices prices = prices["close"].unstack(level=0).dropna() return prices def get_intraday_prices(self): """ get price history and make sure each stock has minimum amount of data """ prices = self.get_history( self.symbols, 2, Resolution.Minute, "close", "close prices" ) if prices.empty: return prices prices = prices["close"].unstack(level=0).dropna() return prices ################## def get_history(self, symbol, lookback, resolution, column, data_label): """ - function to download security history and check that the column we need exists in the dataframe. if it is missing it retries twice. - returns empty series if column not found """ hist = self.History(symbol, lookback, resolution) if column not in hist.columns: self.Debug(f"{self.Time} {data_label} {column} data missing") hist = self.History(symbol, lookback, resolution) if column not in hist.columns: self.Debug(f"{self.Time} {data_label} {column} data missing") hist = self.History(symbol, lookback, resolution) if column not in hist.columns: self.Debug( f"{self.Time} {data_label} {column} data missing no trades today" ) return pd.Series() return hist ################## def plot_num_positions(self): """function to plot the daily number of concurrent positions""" if len(self.symbol_data.keys()) < 1: return num_concurrent_positions = 0 for k in self.symbol_data.keys(): if self.symbol_data[k].in_trade: num_concurrent_positions += 1 self.Plot( self.pPlotName, "num positions", num_concurrent_positions if np.isfinite(num_concurrent_positions) else 0, ) return ################## def manage_pair_universe(self): """ remove pairs no longer in universe and liquidate if in any trades """ current_pairs = list(self.symbol_data.keys()) for pair in current_pairs: sd = self.symbol_data[pair] if pair not in self.pairs: if sd.in_trade: for symbol in sd.pair: qty = sd.trade_data.loc[symbol, "quantity"] self.MarketOnCloseOrder( symbol, -1 * qty, tag="symbol no longer in universe" ) del self.symbol_data[pair] return ################## def get_pairs(self): """ get all pairs for trading """ if self.symbols is None: self.Debug(f"[{self.Time}] missing symbols inside get_pairs") return # manage change in pair universe self.manage_pair_universe() prices = self.get_daily_prices() if prices.empty: return for pair in self.pairs: if pair not in self.symbol_data.keys(): self.symbol_data[pair] = symbolData( pair, self.bb_multiple, self.window, self, ) # self.Debug(f"pairs:\n{pair_df['pair']}") return ################## def pair_can_trade(self, pair): """ make sure the pair is tradeable: 1) by ensuring the security price is populated 2) that trade has not been timed out waiting for another crossover """ sd = self.symbol_data[pair] for symbol in pair: if not self.Securities[symbol].Close > 0: self.Debug(f"{self.Time} {symbol} is missing price data") return False if sd.trade_timeout: if not sd.spread_crossed_since_trade(): self.Log( f"{self.Time} {pair} has trade timed out, waiting for crossover" ) return False return True ################## def buy_spread(self, sd, pair, allocation=None): """ function to buy spread """ if allocation is None: allocation = self.allocation wt1 = allocation wt2 = allocation * -1 pair0_shares = int(wt1 / self.Securities[pair[0]].Close) pair1_shares = int(wt2 / self.Securities[pair[1]].Close) self.Debug( f"LONG spread : {sd.pair[0]} dv={wt1:.2f} shares={pair0_shares} vs {sd.pair[1]} dv={wt2:.2f} shares={pair1_shares}" ) # send market on close orders instead mkto0 = self.MarketOnCloseOrder(pair[0], pair0_shares) mkto1 = self.MarketOnCloseOrder(pair[1], pair1_shares) # save trade data sd.trade_data = pd.DataFrame( index=[pair[0], pair[1]], columns=["timestamp", "market_order", "quantity"] ) sd.trade_data.loc[pair[0], :] = (mkto0.Time, mkto0, mkto0.Quantity) sd.trade_data.loc[pair[1], :] = (mkto1.Time, mkto1, mkto1.Quantity) # set trade flags sd.in_trade = True sd.long_spread = True sd.trade_entry_timestamp = self.Time return ################## def short_spread(self, sd, pair, allocation=None): """ function to short spread """ if allocation is None: allocation = self.allocation wt1 = allocation * -1 wt2 = allocation pair0_shares = int(wt1 / self.Securities[pair[0]].Close) pair1_shares = int(wt2 / self.Securities[pair[1]].Close) self.Debug( f"SHORT spread : {sd.pair[0]} dv={wt1:.2f} shares={pair0_shares} vs {sd.pair[1]} dv={wt2:.2f} shares={pair1_shares}" ) # send market on close orders instead mkto0 = self.MarketOnCloseOrder(pair[0], pair0_shares) mkto1 = self.MarketOnCloseOrder(pair[1], pair1_shares) # save trade data sd.trade_data = pd.DataFrame( index=[pair[0], pair[1]], columns=["timestamp", "market_order", "quantity"] ) sd.trade_data.loc[pair[0], :] = (mkto0.Time, mkto0, mkto0.Quantity) sd.trade_data.loc[pair[1], :] = (mkto1.Time, mkto1, mkto1.Quantity) # set trade flags sd.in_trade = True sd.short_spread = True sd.trade_entry_timestamp = self.Time return ################## def calc_and_log_portfolio_anl_vol(self): # accumulate portfolio equity to log rolling annualized vol of portfolio self.portfolio_equity_history.append( (self.Time, self.Portfolio.TotalPortfolioValue) ) eq_s = pd.DataFrame(self.portfolio_equity_history).set_index(0)[ 1 ] # .drop_duplicates() if len(eq_s) >= self.portfolio_anl_vol_lookback: anl_vol = eq_s.pct_change().dropna().std() * np.sqrt(252) self.Log(f"{self.Time} PORTFOLIO rolling annualized std: {anl_vol:.2%}") return ################## def calc_and_log_pair_anl_vol(self, pair, symbol_data): if len(symbol_data.roll_mean.dropna()) > 3: anl_vol = symbol_data.roll_mean.pct_change().dropna().std() * np.sqrt(252) self.Log(f"{self.Time} {pair} rolling annualized std: {anl_vol:.2%}") self.Log( f"spread {symbol_data.spread.iloc[-1] if np.isfinite(symbol_data.spread.iloc[-1]) else 0}" ) self.Log( f"rollmean {symbol_data.roll_mean.iloc[-1] if np.isfinite(symbol_data.roll_mean.iloc[-1]) else 0}" ) self.Log( f"bb upper {symbol_data.bb_upper.iloc[-1] if np.isfinite(symbol_data.bb_upper.iloc[-1]) else 0}" ) self.Log( f"bb lower {symbol_data.bb_lower.iloc[-1] if np.isfinite(symbol_data.bb_lower.iloc[-1]) else 0}" ) return anl_vol ################## def trade(self): """ function to implement trades """ # accumulate portfolio equity to log rolling annualized vol of portfolio self.calc_and_log_portfolio_anl_vol() if self.symbols is None: self.Debug(f"[{self.Time}] missing symbols inside trade") return # get prices prices = self.get_daily_prices() if prices.empty: return # get intraday prices for trade entry and exit intraday_prices = self.get_intraday_prices() if intraday_prices.empty: return for pair in self.symbol_data.copy().keys(): sd = self.symbol_data[pair] # if already in trade check to see if our maximum trade duration is breached if sd.in_trade and not sd.trade_timeout: sd.check_trade_timeout() # compute spread and boundaries sd.compute_spread(prices, intraday_prices) # log annualized vol for pair pair_anl_vol = self.calc_and_log_pair_anl_vol(pair, sd) # need a minimum number of vol if pair_anl_vol < 0.1: pair_anl_vol = 0.1 if not self.pair_can_trade(pair): continue # plot spread data for debugging individual pairs # NOTE: only works for a single pair set if self.PLOT: self.Plot( self.splotName, "spread", sd.spread.iloc[-1] if np.isfinite(sd.spread.iloc[-1]) else 0, ) self.Plot( self.splotName, "rollmean", sd.roll_mean.iloc[-1] if np.isfinite(sd.roll_mean.iloc[-1]) else 0, ) self.Plot( self.splotName, "bb upper", sd.bb_upper.iloc[-1] if np.isfinite(sd.bb_upper.iloc[-1]) else 0, ) self.Plot( self.splotName, "bb lower", sd.bb_lower.iloc[-1] if np.isfinite(sd.bb_lower.iloc[-1]) else 0, ) # if not in trade for this pair if not sd.in_trade: allocation = ( self.Portfolio.TotalPortfolioValue * self.target_vol / pair_anl_vol / len(self.symbols) ) allocation = quantize(allocation) # is buy trigger? if sd.is_long_trigger(): self.Debug( f"{self.Time}::{pair} {pair_anl_vol:.2%} ${allocation:,}" ) self.buy_spread(sd, pair, allocation) # or short triggered? elif sd.is_short_trigger(): self.Debug( f"{self.Time}::{pair} {pair_anl_vol:.2%} ${allocation:,}" ) self.short_spread(sd, pair, allocation) return def check_trade_exit(self): if self.symbols is None: self.Debug(f"[{self.Time}] missing symbols inside check_trade_exit") return # get prices prices = self.get_daily_prices() if prices.empty: return # get intraday prices for trade entry and exit intraday_prices = self.get_intraday_prices() if intraday_prices.empty: return # no trades after 4pm RTH if self.Time.hour >= 16: return for pair in self.symbol_data.copy().keys(): sd = self.symbol_data[pair] # if already in trade check to see if our maximum trade duration is breached if sd.in_trade and not sd.trade_timeout: sd.check_trade_timeout() # compute spread and boundaries sd.compute_spread(prices, intraday_prices) if not self.pair_can_trade(pair): continue # if already in trade for this pair if sd.in_trade: # if long spread check that spread is >= roll mean if sd.long_spread: spread_gt_mean = sd.intraday_spread >= sd.roll_mean.iloc[-1] if spread_gt_mean: for symbol in sd.pair: qty = sd.trade_data.loc[symbol, "quantity"] # exit only the quantity of shares involved in that trade self.MarketOnCloseOrder( symbol, -1 * qty, tag=f"TP LONG SPREAD::{sd.pair[0]}-{sd.pair[1]} | sprd: {sd.intraday_spread:.2f} rollmean: {sd.roll_mean.iloc[-1]:.2f}", ) sd.reset_trade_flags() self.Debug( f"tp liquidating long spread: {sd.pair[0]} vs {sd.pair[1]} | sprd: {sd.intraday_spread:.2f} rollmean: {sd.roll_mean.iloc[-1]:.2f}" ) # else if short spread check that spread is <= roll mean elif sd.short_spread: spread_lt_mean = sd.intraday_spread <= sd.roll_mean.iloc[-1] if spread_lt_mean: for symbol in sd.pair: # exit only the quantity of shares involved in that trade qty = sd.trade_data.loc[symbol, "quantity"] self.MarketOnCloseOrder( symbol, -1 * qty, tag=f"TP SHORT SPREAD::{sd.pair[0]}-{sd.pair[1]} | sprd: {sd.intraday_spread:.2f} rollmean: {sd.roll_mean.iloc[-1]:.2f}", ) sd.reset_trade_flags() self.Debug( f"tp liquidating short spread: {sd.pair[0]} vs {sd.pair[1]} | sprd: {sd.intraday_spread:.2f} rollmean: {sd.roll_mean.iloc[-1]:.2f}" ) ################## def OnSecuritiesChanged(self, changes): self._changes = changes self.Debug(self.Time) def OnData(self, data): """OnData event is the primary entry point for your algorithm. Each new data point will be pumped in here. Arguments: data: Slice object keyed by symbol containing the stock data """ if self._changes is None: return