Overall Statistics |
Total Trades 11 Average Win 0% Average Loss -0.14% Compounding Annual Return -42.649% Drawdown 1.000% Expectancy -1 Net Profit -0.607% Sharpe Ratio -6.864 Probabilistic Sharpe Ratio 0% Loss Rate 100% Win Rate 0% Profit-Loss Ratio 0 Alpha 0.168 Beta -13.112 Annual Standard Deviation 0.067 Annual Variance 0.004 Information Ratio -7.042 Tracking Error 0.072 Treynor Ratio 0.035 Total Fees $0.00 Estimated Strategy Capacity $260000.00 Lowest Capacity Asset TSLA UNU3P8Y3WFAD |
# region imports from AlgorithmImports import * # endregion import numpy as np import pandas as pd import statsmodels.api as sm from sklearn.decomposition import PCA from sklearn.preprocessing import * from statsmodels.tsa.stattools import adfuller, coint, grangercausalitytests import statsmodels.api as sm import datetime # https://medium.com/@financialnoob/granger-causality-test-in-pairs-trading-bf2fd939e575 class PairTradingGrangerAlgorithm(QCAlgorithm): def Initialize(self): self.SetStartDate(2019, 2, 8) self.SetEndDate(2019, 2, 11) self.SetCash(100000) self.lookback = 500 self.num_equities = 20 self.selected_pairs = None self.UniverseSettings.Resolution = Resolution.Daily self.AddUniverse(self.CoarseSelection) # self.SetSecurityInitializer(lambda x: x.SetDataNormalizationMode(DataNormalizationMode.Raw)) self.AddEquity("SPY") # self.Schedule.On(self.DateRules.EveryDay("SPY"), self.TimeRules.BeforeMarketClose("SPY", 20), self.Liquidate) self.Schedule.On(self.DateRules.EveryDay("SPY"), self.TimeRules.BeforeMarketClose("SPY", 10), self.EveryDayBeforeMarketClose) # self.Schedule.On(self.DateRules.On(self.EndDate.year, self.EndDate.month, self.EndDate.day), # self.TimeRules.At(0, 0), # self.Liquidate) self.SetWarmup(500) self.day = 0 def CoarseSelection(self, coarse): sortedByDollarVolume = sorted(coarse, key=lambda x: x.DollarVolume, reverse=True) symbols = [ x.Symbol for x in sortedByDollarVolume if x.Price > 50 and x.DollarVolume > 1000000000 ] self.symbols = symbols[:self.num_equities] return [x for x in self.symbols] # returns Numpy array with normalized prices def Normalize(self, df_history): scaler = MinMaxScaler() scaler.fit(df_history) history = scaler.transform(df_history) history = pd.DataFrame(history) history.columns = df_history.columns.values.tolist() return history # returns array with cointegrated pairs def CreateCointegratedPairs(self, stocks, df_log_prices): selected_pairs = [] selected_stocks = [] for s1 in stocks: for s2 in stocks: if (s1 != s2) and (s1 not in selected_stocks) and (s2 not in selected_stocks): if (coint(df_log_prices[s1], df_log_prices[s2])[1] < 0.1): selected_stocks.append(s1) selected_stocks.append(s2) selected_pairs.append((s1, s2)) # self.Debug("selected_pairs=" + str(selected_pairs)) return selected_pairs # returns array with cointegrated pairs def SelectGrangerPairs(self, selected_pairs, df_log_prices): maxlag = 1 limit = 0.1 selected_pairs_gc = [] for index, pair in enumerate(selected_pairs): s1 = pair[0] s2 = pair[1] if s1 in df_log_prices.columns and s2 in df_log_prices.columns: gct12 = grangercausalitytests(df_log_prices[[s1, s2]], maxlag=maxlag) pvals12 = [gct12[x][0]['ssr_ftest'][1] for x in range(1, maxlag + 1)] pvals12 = np.array(pvals12) if len(pvals12[pvals12 < limit]) > 0: selected_pairs_gc.append((s1, s2)) else: # switch Granger-leader and Granger-follower gct21 = grangercausalitytests(df_log_prices[[s2, s1]], maxlag=maxlag) pvals21 = [gct21[x][0]['ssr_ftest'][1] for x in range(1, maxlag + 1)] pvals21 = np.array(pvals21) if len(pvals21[pvals21 < limit]) > 0: selected_pairs_gc.append((s2, s1)) # self.Debug("selected_pairs_gc=" + str(selected_pairs_gc)) return selected_pairs_gc def CalculateWeights(self, selected_pairs, df_log_prices): r = 1 # standard deviation threshold positions = pd.Series() for index, pair in enumerate(selected_pairs): s1 = pair[0] s2 = pair[1] self.AddEquity(s1) self.AddEquity(s2) if s1 in df_log_prices.columns and s2 in df_log_prices.columns: model = sm.OLS(df_log_prices[s1], sm.add_constant(df_log_prices[s2])) res = model.fit() mu = res.resid.mean() # spread historical mean sigma = res.resid.std() # spread historical sd # calculate spread spread = df_log_prices[s1] - res.predict(sm.add_constant(df_log_prices[s2])) spread = spread.iloc[-1] if spread > mu + r * sigma: positions[s1] = -1 positions[s2] = 1 elif spread < mu - r * sigma: positions[s1] = 1 positions[s2] = -1 else: positions[s1] = 0 positions[s2] = 0 # # Return the weights for each selected stock if positions.abs().sum() == 0: weights = positions else: weights = positions * (0.95 / positions.abs().sum()) # self.Debug("weights=" + str(weights)) return weights def EveryDayBeforeMarketClose(self): if not self.IsWarmingUp and self.Time.date(): self.Trade() def Trade(self): df_history = self.History(self.symbols, self.lookback, Resolution.Daily).close.unstack(level=0) df_history_last = self.History(self.symbols, 1, Resolution.Minute).close.unstack(level=0) df_history = pd.concat([df_history, df_history_last]) df_history = df_history.dropna('columns') log_prices = self.Normalize(df_history) self.day += 1 if self.selected_pairs is None or len(self.selected_pairs) or self.day % 10 == 0: stocks = df_history.columns pairs = self.CreateCointegratedPairs(stocks, log_prices) self.selected_pairs = self.SelectGrangerPairs(pairs, log_prices) weights = self.CalculateWeights(self.selected_pairs, log_prices) self.Debug("weights=" + str(weights)) portfolioTargets = [] for symbol, weight in weights.items(): if weight != 0: self.Securities[symbol].FeeModel = ConstantFeeModel(0) portfolioTargets.append(PortfolioTarget(symbol, weight)) self.SetHoldings(portfolioTargets, True) def OnEndOfAlgorithm(self): self.Liquidate()