Overall Statistics |
Total Trades 26 Average Win 0.66% Average Loss -0.32% Compounding Annual Return 27.702% Drawdown 2.100% Expectancy 0.545 Net Profit 2.076% Sharpe Ratio 2.884 Sortino Ratio 4.969 Probabilistic Sharpe Ratio 74.061% Loss Rate 50% Win Rate 50% Profit-Loss Ratio 2.09 Alpha 0.093 Beta 0.232 Annual Standard Deviation 0.064 Annual Variance 0.004 Information Ratio -2.475 Tracking Error 0.083 Treynor Ratio 0.789 Total Fees $102.62 Estimated Strategy Capacity $560000.00 Lowest Capacity Asset PBCP RH8GLI8VB8TH Portfolio Turnover 28.94% |
#region imports from AlgorithmImports import * import statsmodels.formula.api as sm from statsmodels.tsa.stattools import coint, adfuller #endregion class Pairs(object): def __init__(self, a, b): self.a = a self.b = b self.Name = f'{a.Symbol.Value}:{b.Symbol.Value}' self.Model = None self.MeanError = 0 self.StandardDeviation = 0 self.Epsilon = 0 @property def DataFrame(self): df = pd.concat([self.a.DataFrame.droplevel([0]), self.b.DataFrame.droplevel([0])], axis=1).dropna() df.columns = [self.a.Symbol.Value, self.b.Symbol.Value] return df @property def Correlation(self): return self.DataFrame.corr().iloc[0][1] def cointegration_test(self): coint_test = coint(self.a.Series.values.flatten(), self.b.Series.values.flatten(), trend="n", maxlag=0) # Return if not cointegrated if coint_test[1] >= 0.05: return False self.Model = sm.ols(formula = f'{self.a.Symbol.Value} ~ {self.b.Symbol.Value}', data=self.DataFrame).fit() self.StationaryP = adfuller(self.Model.resid, autolag = 'BIC')[1] self.MeanError = np.mean(self.Model.resid) self.Epsilon = np.std(self.Model.resid) return True
#region imports from AlgorithmImports import * #endregion class SymbolData(object): def __init__(self, algorithm, symbol, lookback, interval): lookback = int(lookback) self.Symbol = symbol self.Prices = RollingWindow[TradeBar](lookback // interval) self.Series = None self.DataFrame = None self._algorithm = algorithm self._consolidator = TradeBarConsolidator(timedelta(minutes=interval)) self._consolidator.DataConsolidated += self.OnDataConsolidated history = algorithm.History(symbol, lookback, Resolution.Minute) algorithm.Log(f"Hit SymbolData for {symbol}") for bar in history.itertuples(): if bar.volume != bar.volume: # NAN volume handler algorithm.Log("NAN Volume handler triggered") continue trade_bar = TradeBar(bar.Index[1], symbol, bar.open, bar.high, bar.low, bar.close, bar.volume) self.Update(trade_bar) @property def IsReady(self): return self.Prices.IsReady def Update(self, trade_bar): self._consolidator.Update(trade_bar) def OnDataConsolidated(self, sender, consolidated): self.Prices.Add(consolidated) if self.IsReady: self.Series = self._algorithm.PandasConverter.GetDataFrame[TradeBar](self.Prices)['close'] self.DataFrame = self.Series.to_frame()
#region imports from AlgorithmImports import * #endregion class TradingPair(object): def __init__(self, ticket_a, ticket_b, intercept, slope, mean_error, epsilon): self.ticket_a = ticket_a self.ticket_b = ticket_b self.model_intercept = intercept self.model_slope = slope self.mean_error = mean_error self.epsilon = epsilon
#region imports from AlgorithmImports import * from Pair import * from SymbolData import * from TradingPair import * from itertools import combinations import statsmodels.tsa.stattools as ts #endregion class PairsTrading(QCAlgorithm): symbols = [ 'ING', 'TBC', 'BMA', 'PB', 'FBC', 'STL', 'FCF', 'PFS', 'BOH', 'BK', 'CMA', 'AF', 'PNC', 'KB', 'SHG', 'BSAC', 'CIB', 'BBD', 'BSBR' ] # SCNB -> delisted num_bar = 390*21*3 interval = 10 pair_num = 10 leverage = 1 min_corr_threshold = 0.9 open_size = 2.32 close_size = 0.5 stop_loss_size = 6 def Initialize(self): self.SetStartDate(2013, 9, 1) self.SetEndDate(2013, 10, 1) self.SetCash(50000) self.symbol_data = {} self.pair_list = [] self.selected_pair = [] self.trading_pairs = {} self.regenerate_time = datetime.min for ticker in self.symbols: symbol = self.AddEquity(ticker, Resolution.Minute).Symbol self.symbol_data[symbol] = SymbolData(self, symbol, self.num_bar, self.interval) for pair in combinations(self.symbol_data.items(), 2): if pair[0][1].IsReady and pair[1][1].IsReady: self.pair_list.append(Pairs(pair[0][1], pair[1][1])) def GeneratePairs(self): selected_pair = [] for pair in self.pair_list: # correlation selection if pair.Correlation < self.min_corr_threshold: continue # cointegration selection coint = pair.cointegration_test() if coint and pair.StationaryP < 0.05: selected_pair.append(pair) if len(selected_pair) == 0: self.Debug('No selected pair') return [] selected_pair.sort(key = lambda x: x.Correlation, reverse = True) if len(selected_pair) > self.pair_num: selected_pair = selected_pair[:self.pair_num] selected_pair.sort(key = lambda x: x.StationaryP) return selected_pair def OnData(self, data): for symbol, symbolData in self.symbol_data.items(): if data.Bars.ContainsKey(symbol): symbolData.Update(data.Bars[symbol]) # generate pairs with correlation and cointegration selection if self.regenerate_time < self.Time: self.selected_pair = self.GeneratePairs() self.regenerate_time = self.Time + timedelta(days=5) # closing existing position for pair, trading_pair in self.trading_pairs.copy().items(): # close: if not correlated nor cointegrated anymore if pair not in self.selected_pair: self.MarketOrder(pair.a.Symbol, -trading_pair.ticket_a.Quantity) self.MarketOrder(pair.b.Symbol, -trading_pair.ticket_b.Quantity) self.trading_pairs.pop(pair) self.Debug(f'Close {pair.Name}') continue # get current cointegrated series deviation from mean error = pair.a.Prices[0].Close - (trading_pair.model_intercept + trading_pair.model_slope * pair.b.Prices[0].Close) # close: when the cointegrated series is deviated less than 0.5 SD from its mean if trading_pair.ticket_a.Quantity > 0 \ and (error > trading_pair.mean_error - self.close_size * trading_pair.epsilon \ or error < trading_pair.mean_error - self.stop_loss_size * trading_pair.epsilon): self.MarketOrder(pair.a.Symbol, -trading_pair.ticket_a.Quantity) self.MarketOrder(pair.b.Symbol, -trading_pair.ticket_b.Quantity) self.trading_pairs.pop(pair) self.Debug(f'Close {pair.Name}') elif trading_pair.ticket_a.Quantity < 0 \ and (error < trading_pair.mean_error + self.close_size * trading_pair.epsilon \ or error > trading_pair.mean_error + self.stop_loss_size * trading_pair.epsilon): self.MarketOrder(pair.a.Symbol, -trading_pair.ticket_a.Quantity) self.MarketOrder(pair.b.Symbol, -trading_pair.ticket_b.Quantity) self.trading_pairs.pop(pair) self.Debug(f'Close {pair.Name}') # entry: when the cointegrated series is deviated by more than 2.32 SD from its mean for pair in self.selected_pair: # get current cointegrated series deviation from mean price_a = pair.a.Prices[0].Close price_b = pair.b.Prices[0].Close error = price_a - (pair.Model.params[0] + pair.Model.params[1] * price_b) if pair not in self.trading_pairs: if error < pair.MeanError - self.open_size * pair.Epsilon: qty_a = self.CalculateOrderQuantity(symbol, self.leverage/self.pair_num / 2) qty_b = self.CalculateOrderQuantity(symbol, -self.leverage/self.pair_num / 2) ticket_a = self.MarketOrder(pair.a.Symbol, qty_a) ticket_b = self.MarketOrder(pair.b.Symbol, qty_b) self.trading_pairs[pair] = TradingPair(ticket_a, ticket_b, pair.Model.params[0], pair.Model.params[1], pair.MeanError, pair.Epsilon) self.Debug(f'Long {qty_a} {pair.a.Symbol.Value} and short {qty_b} {pair.b.Symbol.Value}') elif error > pair.MeanError + self.open_size * pair.Epsilon: qty_a = self.CalculateOrderQuantity(symbol, -self.leverage/self.pair_num / 2) qty_b = self.CalculateOrderQuantity(symbol, self.leverage/self.pair_num / 2) ticket_a = self.MarketOrder(pair.a.Symbol, qty_a) ticket_b = self.MarketOrder(pair.b.Symbol, qty_b) self.trading_pairs[pair] = TradingPair(ticket_a, ticket_b, pair.Model.params[0], pair.Model.params[1], pair.MeanError, pair.Epsilon) self.Debug(f'Long {qty_b} {pair.b.Symbol.Value} and short {qty_a} {pair.a.Symbol.Value}')