Overall Statistics |
Total Orders 26 Average Win 0.67% Average Loss -0.32% Compounding Annual Return 27.939% Drawdown 2.100% Expectancy 0.547 Start Equity 50000 End Equity 51045.81 Net Profit 2.092% Sharpe Ratio 2.888 Sortino Ratio 4.977 Probabilistic Sharpe Ratio 74.043% Loss Rate 50% Win Rate 50% Profit-Loss Ratio 2.09 Alpha 0.095 Beta 0.232 Annual Standard Deviation 0.064 Annual Variance 0.004 Information Ratio -2.445 Tracking Error 0.083 Treynor Ratio 0.796 Total Fees $104.00 Estimated Strategy Capacity $560000.00 Lowest Capacity Asset PBCP RH8GLI8VB8TH Portfolio Turnover 29.03% |
#region imports from AlgorithmImports import * from itertools import combinations import statsmodels.tsa.stattools as ts from pair import Pairs from symbol_data import SymbolData from trading_pair import TradingPair #endregion class PairsTrading(QCAlgorithm): _symbols = [ 'ING', 'TBC', 'BMA', 'PB', 'FBC', 'STL', 'FCF', 'PFS', 'BOH', 'SCNB', 'BK', 'CMA', 'AF', 'PNC', 'KB', 'SHG', 'BSAC', 'CIB', 'BBD', 'BSBR' ] _num_bar = 390*21*3 _interval = 10 _pair_num = 10 _leverage = 1 _min_corr_threshold = 0.9 _open_size = 2.32 _close_size = 0.5 _stop_loss_size = 6 def initialize(self): self.set_start_date(2013, 9, 1) self.set_end_date(2013, 10, 1) self.set_cash(50000) self._symbol_data = {} self._pair_list = [] self._selected_pair = [] self._trading_pairs = {} self._regenerate_time = datetime.min for ticker in self._symbols: symbol = self.add_equity(ticker, Resolution.MINUTE).symbol self._symbol_data[symbol] = SymbolData(self, symbol, self._num_bar, self._interval) for pair in combinations(self._symbol_data.items(), 2): if pair[0][1].is_ready and pair[1][1].is_ready: self._pair_list.append(Pairs(pair[0][1], pair[1][1])) def _generate_pairs(self): selected_pair = [] for pair in self._pair_list: # correlation selection if pair.correlation() < self._min_corr_threshold: continue # cointegration selection coint = pair.cointegration_test() if coint and pair.stationary_p < 0.05: selected_pair.append(pair) if len(selected_pair) == 0: self.debug('No selected pair') return [] selected_pair.sort(key = lambda x: x.correlation(), reverse = True) if len(selected_pair) > self._pair_num: selected_pair = selected_pair[:self._pair_num] selected_pair.sort(key = lambda x: x.stationary_p) return selected_pair def on_data(self, data): for symbol, symbolData in self._symbol_data.items(): if data.bars.contains_key(symbol): symbolData.update(data.bars[symbol]) # generate pairs with correlation and cointegration selection if self._regenerate_time < self.time: self._selected_pair = self._generate_pairs() self._regenerate_time = self.time + timedelta(days=5) # closing existing position for pair, trading_pair in self._trading_pairs.copy().items(): # close: if not correlated nor cointegrated anymore if pair not in self._selected_pair: self.market_order(pair.a.symbol, -trading_pair.ticket_a.quantity) self.market_order(pair.b.symbol, -trading_pair.ticket_b.quantity) self._trading_pairs.pop(pair) self.debug(f'Close {pair.name}') continue # get current cointegrated series deviation from mean error = pair.a.prices[0].close - (trading_pair.model_intercept + trading_pair.model_slope * pair.b.prices[0].close) # close: when the cointegrated series is deviated less than 0.5 SD from its mean if (trading_pair.ticket_a.quantity > 0 and (error > trading_pair.mean_error - self._close_size * trading_pair.epsilon or error < trading_pair.mean_error - self._stop_loss_size * trading_pair.epsilon)): self.market_order(pair.a.symbol, -trading_pair.ticket_a.quantity) self.market_order(pair.b.symbol, -trading_pair.ticket_b.quantity) self._trading_pairs.pop(pair) self.debug(f'Close {pair.name}') elif (trading_pair.ticket_a.quantity < 0 and (error < trading_pair.mean_error + self._close_size * trading_pair.epsilon or error > trading_pair.mean_error + self._stop_loss_size * trading_pair.epsilon)): self.market_order(pair.a.symbol, -trading_pair.ticket_a.quantity) self.market_order(pair.b.symbol, -trading_pair.ticket_b.quantity) self._trading_pairs.pop(pair) self.debug(f'Close {pair.name}') # entry: when the cointegrated series is deviated by more than 2.32 SD from its mean for pair in self._selected_pair: # get current cointegrated series deviation from mean price_a = pair.a.prices[0].close price_b = pair.b.prices[0].close error = price_a - (pair.model.params[0] + pair.model.params[1] * price_b) if pair not in self._trading_pairs: if error < pair.mean_error - self._open_size * pair.epsilon: qty_a = self.calculate_order_quantity(symbol, self._leverage/self._pair_num / 2) qty_b = self.calculate_order_quantity(symbol, -self._leverage/self._pair_num / 2) ticket_a = self.market_order(pair.a.symbol, qty_a) ticket_b = self.market_order(pair.b.symbol, qty_b) self._trading_pairs[pair] = TradingPair(ticket_a, ticket_b, pair.model.params[0], pair.model.params[1], pair.mean_error, pair.epsilon) self.debug(f'Long {qty_a} {pair.a.symbol.value} and short {qty_b} {pair.b.symbol.value}') elif error > pair.mean_error + self._open_size * pair.epsilon: qty_a = self.calculate_order_quantity(symbol, -self._leverage/self._pair_num / 2) qty_b = self.calculate_order_quantity(symbol, self._leverage/self._pair_num / 2) ticket_a = self.market_order(pair.a.symbol, qty_a) ticket_b = self.market_order(pair.b.symbol, qty_b) self._trading_pairs[pair] = TradingPair(ticket_a, ticket_b, pair.model.params[0], pair.model.params[1], pair.mean_error, pair.epsilon) self.debug(f'Long {qty_b} {pair.b.symbol.value} and short {qty_a} {pair.a.symbol.value}')
#region imports from AlgorithmImports import * import statsmodels.formula.api as sm from statsmodels.tsa.stattools import coint, adfuller #endregion class Pairs(object): def __init__(self, a, b): self.a = a self.b = b self.name = f'{a.symbol.value}:{b.symbol.value}' self.model = None self.mean_error = 0 self.epsilon = 0 def _data_frame(self): df = pd.concat([self.a.data_frame.droplevel([0]), self.b.data_frame.droplevel([0])], axis=1).dropna() df.columns = [self.a.symbol.value, self.b.symbol.value] return df def correlation(self): return self._data_frame().corr().iloc[0][1] def cointegration_test(self): coint_test = coint(self.a.series.values.flatten(), self.b.series.values.flatten(), trend="n", maxlag=0) # Return if not cointegrated if coint_test[1] >= 0.05: return False self.model = sm.ols(formula = f'{self.a.symbol.value} ~ {self.b.symbol.value}', data=self._data_frame()).fit() self.stationary_p = adfuller(self.model.resid, autolag = 'BIC')[1] self.mean_error = np.mean(self.model.resid) self.epsilon = np.std(self.model.resid) return True
#region imports from AlgorithmImports import * #endregion class SymbolData(object): def __init__(self, algorithm, symbol, lookback, interval): lookback = int(lookback) self.symbol = symbol self.prices = RollingWindow[TradeBar](lookback // interval) self.series = None self.data_frame = None self._algorithm = algorithm self._consolidator = TradeBarConsolidator(timedelta(minutes=interval)) self._consolidator.data_consolidated += self._on_data_consolidated history = algorithm.history(symbol, lookback, Resolution.Minute) for bar in history.itertuples(): if np.isnan(bar.volume): continue trade_bar = TradeBar(bar.Index[1], symbol, bar.open, bar.high, bar.low, bar.close, bar.volume) self.update(trade_bar) @property def is_ready(self): return self.prices.is_ready def update(self, trade_bar): self._consolidator.update(trade_bar) def _on_data_consolidated(self, sender, consolidated): self.prices.add(consolidated) if self.is_ready: self.series = self._algorithm.pandas_converter.get_data_frame[TradeBar](self.prices)['close'] self.data_frame = self.series.to_frame()
#region imports from AlgorithmImports import * #endregion class TradingPair(object): def __init__(self, ticket_a, ticket_b, intercept, slope, mean_error, epsilon): self.ticket_a = ticket_a self.ticket_b = ticket_b self.model_intercept = intercept self.model_slope = slope self.mean_error = mean_error self.epsilon = epsilon