Overall Statistics |
Total Trades 4824 Average Win 0.02% Average Loss -0.01% Compounding Annual Return -12.729% Drawdown 12.700% Expectancy -0.601 Net Profit -12.718% Sharpe Ratio -10.248 Loss Rate 87% Win Rate 13% Profit-Loss Ratio 2.08 Alpha -0.144 Beta 0.429 Annual Standard Deviation 0.013 Annual Variance 0 Information Ratio -11.748 Tracking Error 0.013 Treynor Ratio -0.316 Total Fees $6352.60 |
import numpy as np np.warnings.filterwarnings("ignore") import pandas as pd import datetime as datetime import statsmodels.formula.api as sm import statsmodels.tsa.stattools as ts np.warnings.resetwarnings() # generates all possible pairs # calculates correlation and cointegration for all pairs every day # every day will try to enter trades if for pairs with good correlation and cointegration and deviation too big/too small # every day will try to exit trades for pairs with deviation too small/too big class PairsTrading(QCAlgorithm): def __init__(self): self.symbols = ['CORN', 'SOYB', 'PALL', 'BAC', 'BXS', 'T', 'F', 'GM', 'MSFT', 'AAPL', 'IBM'] self.maximalNumberOfPairs = 100000 self.numberOfBars = 40 # correlation selection criteria self.minimalCorrelation = 0.9 # co-integration selection criteria self.maximalBIC = -3.5 # number of standard deviations to open self.open_size = 2 # number of standard deviations to close self.close_size = 1 self.stop_loss = 6 self.trading_pairs = [] def Initialize(self): self.SetStartDate(2014, 1, 1) self.SetEndDate(2015, 1, 1) self.SetCash(50000) for i in range(len(self.symbols)): equity = self.AddEquity(self.symbols[i], Resolution.Minute).Symbol self.symbols[i] = equity self.generatedPairs = self.generatePairs(self.symbols, self.numberOfBars) def OnData(self, data): # update indicators in pairs for pair in self.generatedPairs: pair.update(data) if len(self.generatedPairs) == 0: return selectedPairs = self.selectPairsByCorrelation(self.generatedPairs, self.minimalCorrelation, self.maximalNumberOfPairs) if len(selectedPairs) == 0: return selectedPairs = self.selectPairsByCointegration(selectedPairs, self.maximalBIC, self.maximalNumberOfPairs) if len(selectedPairs) == 0: return self.Log('selectPairsByCointegration pairs= %s' % str(len(selectedPairs))) # select top maximalNumberOfPairs pairs if len(selectedPairs) > self.maximalNumberOfPairs: selectedPairs = selectedPairs[:self.maximalNumberOfPairs] ## enter for pair in selectedPairs: if pair.touch == 0: if pair.error < pair.mean_error - self.open_size * pair.standardDeviation and pair.last_error > pair.mean_error - self.open_size * pair.standardDeviation: pair.touch += -1 elif pair.error > pair.mean_error + self.open_size * pair.standardDeviation and pair.last_error < pair.mean_error + self.open_size * pair.standardDeviation: pair.touch += 1 elif pair.touch == -1: if pair.error > pair.mean_error - self.open_size * pair.standardDeviation and pair.last_error < pair.mean_error - self.open_size * pair.standardDeviation: self.Log('long %s and short %s' % (str(pair.a),str(pair.b))) pair.record_model = pair.model pair.record_mean_error = pair.mean_error pair.record_sd = pair.standardDeviation self.trading_pairs.append(pair) self.SetHoldings(pair.a, 0.2 / (len(selectedPairs))) self.SetHoldings(pair.b, -0.2 / (len(selectedPairs))) pair.touch = 0 elif pair.touch == 1: if pair.error < pair.mean_error + self.open_size * pair.standardDeviation and pair.last_error > pair.mean_error + self.open_size * pair.standardDeviation: self.Log('long %s and short %s' % (str(pair.b),str(pair.a))) pair.record_model = pair.model pair.record_mean_error = pair.mean_error pair.record_sd = pair.standardDeviation self.trading_pairs.append(pair) self.SetHoldings(pair.b, 0.2 / (len(selectedPairs))) self.SetHoldings(pair.a, -0.2 / (len(selectedPairs))) pair.touch = 0 else: # close for pair in self.trading_pairs: if ((pair.error < pair.record_mean_error + self.close_size * pair.record_sd and pair.last_error > pair.record_mean_error + self.close_size * pair.record_sd) or (pair.error > pair.record_mean_error - self.close_size * pair.record_sd and pair.last_error < pair.record_mean_error - self.close_size * pair.record_sd)): self.Log('close %s' % str(pair.name)) self.Liquidate(pair.a) self.Liquidate(pair.b) self.trading_pairs.remove(pair) elif pair.error < pair.record_mean_error - self.stop_loss * pair.record_sd or pair.error > pair.record_mean_error + self.stop_loss * pair.record_sd: self.Log('close %s to stop loss' % str(pair.name)) self.Liquidate(pair.a) self.Liquidate(pair.b) self.trading_pairs.remove(pair) return def generatePairs(self, equities, numberOfBars): # generates all possible pairs from equities generatedPairs = [] for i in range(len(equities)): for j in range(i + 1, len(equities)): generatedPairs.append(Pair(self, equities[i], equities[j], numberOfBars)) self.Log('generated pairs= %s' % str(len(generatedPairs))) return generatedPairs def selectPairsByCorrelation(self, pairs, minimalCorrelation, maximalNumberOfPairs): # returns pairs with correlation > minimalCorrelation, sorted by correlation # select pairs with correlation > minimalCorrelation selectedPairs = [pair for pair in pairs if pair.isReady() and pair.correlation > minimalCorrelation] # sort pairs by correlation, descending selectedPairs.sort(key = lambda pair: pair.correlation, reverse = True) return selectedPairs def selectPairsByCointegration(self, pairs, maximalBIC, maximalNumberOfPairs): # returns pairs with co-integration ADF < maximalBIC, sorted by co-integration # select pairs with co-integration ADF < maximalBIC selectedPairs = [pair for pair in pairs if pair.isReady() and pair.adf < maximalBIC] # sort pairs by co-integration ADF selectedPairs.sort(key = lambda pair: pair.adf) return selectedPairs class Pair(object): def __init__(self, algorithm, a, b, numberOfBars): # pair: stock a, stock b # stock will contain DataFrame with prices and dates self.algorithm = algorithm self.a = a self.b = b # keep num_bar of data points self.numberOfBars = numberOfBars # name of pair self.name = str(a) + ':' + str(b) self.error = 0 self.last_error = 0 self.df = None self.touch = 0 def calculateCorrelation(self): # calculate correlation self.correlation = self.df.corr().ix[0][1] def calculateCointegration(self): self.model = sm.ols(formula = '%s ~ %s' % (str(self.a), str(self.b)), data = self.df).fit() self.adf = ts.adfuller(self.model.resid, autolag = 'BIC')[0] self.mean_error = np.mean(self.model.resid) self.standardDeviation = np.std(self.model.resid) def isReady(self): return not self.df is None and len(self.df) == self.numberOfBars def update(self, data): if not data.ContainsKey(self.a): self.df = None return if not data.ContainsKey(self.b): self.df = None return data_a = data[self.a] data_b = data[self.b] try: priceOfStockA = float(data_a.Close) priceOfStockB = float(data_b.Close) except: self.df = None return a_price = [] a_date = [] b_price = [] b_date = [] a_price.append(priceOfStockA) a_date.append(data_a.EndTime) b_price.append(priceOfStockB) b_date.append(data_b.EndTime) new_df = pd.DataFrame({str(self.a):a_price, str(self.b):b_price}, index = [a_date]).dropna() if self.df is None: self.df = new_df else: # concatenate existing DataFrame with new data self.df = pd.concat([self.df, new_df]) # keep numberOfBars of data points self.df = self.df.tail(self.numberOfBars) if self.isReady(): self.calculateCorrelation() self.calculateCointegration() self.last_error = self.error self.error = priceOfStockA - (self.model.params[0] + self.model.params[1] * priceOfStockB)