Overall Statistics
Total Trades
4824
Average Win
0.02%
Average Loss
-0.01%
Compounding Annual Return
-12.729%
Drawdown
12.700%
Expectancy
-0.601
Net Profit
-12.718%
Sharpe Ratio
-10.248
Loss Rate
87%
Win Rate
13%
Profit-Loss Ratio
2.08
Alpha
-0.144
Beta
0.429
Annual Standard Deviation
0.013
Annual Variance
0
Information Ratio
-11.748
Tracking Error
0.013
Treynor Ratio
-0.316
Total Fees
$6352.60
import numpy as np

np.warnings.filterwarnings("ignore")
import pandas as pd
import datetime as datetime
import statsmodels.formula.api as sm
import statsmodels.tsa.stattools as ts
np.warnings.resetwarnings()

# generates all possible pairs
# calculates correlation and cointegration for all pairs every day
# every day will try to enter trades if for pairs with good correlation and cointegration and deviation too big/too small
# every day will try to exit trades for pairs with deviation too small/too big
class PairsTrading(QCAlgorithm):

    def __init__(self):
        self.symbols = ['CORN', 'SOYB', 'PALL', 'BAC', 'BXS', 'T', 'F', 'GM', 'MSFT', 'AAPL', 'IBM']

        self.maximalNumberOfPairs = 100000
        
        self.numberOfBars = 40

        # correlation selection criteria
        self.minimalCorrelation = 0.9

        # co-integration selection criteria
        self.maximalBIC = -3.5

        # number of standard deviations to open
        self.open_size = 2

        # number of standard deviations to close
        self.close_size = 1
        self.stop_loss = 6

        self.trading_pairs = []


    def Initialize(self):
        self.SetStartDate(2014, 1, 1)
        self.SetEndDate(2015, 1, 1)
        self.SetCash(50000)

        for i in range(len(self.symbols)):
            equity = self.AddEquity(self.symbols[i], Resolution.Minute).Symbol
            self.symbols[i] = equity

        self.generatedPairs = self.generatePairs(self.symbols, self.numberOfBars)

    def OnData(self, data):
        # update indicators in pairs
        for pair in self.generatedPairs:
            pair.update(data)

        if len(self.generatedPairs) == 0:
            return

        selectedPairs = self.selectPairsByCorrelation(self.generatedPairs, self.minimalCorrelation, self.maximalNumberOfPairs)
        if len(selectedPairs) == 0:
            return

        selectedPairs = self.selectPairsByCointegration(selectedPairs, self.maximalBIC, self.maximalNumberOfPairs)
        if len(selectedPairs) == 0:
            return

        self.Log('selectPairsByCointegration pairs= %s' % str(len(selectedPairs)))

        # select top maximalNumberOfPairs pairs
        if len(selectedPairs) > self.maximalNumberOfPairs:
            selectedPairs = selectedPairs[:self.maximalNumberOfPairs]

        ## enter
        for pair in selectedPairs:
            if pair.touch == 0:
                if pair.error < pair.mean_error - self.open_size * pair.standardDeviation and pair.last_error > pair.mean_error - self.open_size * pair.standardDeviation:
                    pair.touch += -1
                elif pair.error > pair.mean_error + self.open_size * pair.standardDeviation and pair.last_error < pair.mean_error + self.open_size * pair.standardDeviation:
                    pair.touch += 1
            elif pair.touch == -1:
                if pair.error > pair.mean_error - self.open_size * pair.standardDeviation and pair.last_error < pair.mean_error - self.open_size * pair.standardDeviation:
                    self.Log('long %s and short %s' % (str(pair.a),str(pair.b)))
                    pair.record_model = pair.model
                    pair.record_mean_error = pair.mean_error
                    pair.record_sd = pair.standardDeviation

                    self.trading_pairs.append(pair)

                    self.SetHoldings(pair.a, 0.2 / (len(selectedPairs)))
                    self.SetHoldings(pair.b, -0.2 / (len(selectedPairs)))

                    pair.touch = 0
            elif pair.touch == 1:
                if pair.error < pair.mean_error + self.open_size * pair.standardDeviation and pair.last_error > pair.mean_error + self.open_size * pair.standardDeviation:
                    self.Log('long %s and short %s' % (str(pair.b),str(pair.a)))
                    pair.record_model = pair.model
                    pair.record_mean_error = pair.mean_error
                    pair.record_sd = pair.standardDeviation

                    self.trading_pairs.append(pair)

                    self.SetHoldings(pair.b, 0.2 / (len(selectedPairs)))
                    self.SetHoldings(pair.a, -0.2 / (len(selectedPairs)))

                    pair.touch = 0


        else:
            # close
            for pair in self.trading_pairs:
                if ((pair.error < pair.record_mean_error + self.close_size * pair.record_sd and pair.last_error > pair.record_mean_error + self.close_size * pair.record_sd) or (pair.error > pair.record_mean_error - self.close_size * pair.record_sd and pair.last_error < pair.record_mean_error - self.close_size * pair.record_sd)):
                    self.Log('close %s' % str(pair.name))

                    self.Liquidate(pair.a)
                    self.Liquidate(pair.b)

                    self.trading_pairs.remove(pair)
                elif pair.error < pair.record_mean_error - self.stop_loss * pair.record_sd or pair.error > pair.record_mean_error + self.stop_loss * pair.record_sd:
                    self.Log('close %s to stop loss' % str(pair.name))

                    self.Liquidate(pair.a)
                    self.Liquidate(pair.b)

                    self.trading_pairs.remove(pair)


        return

    def generatePairs(self, equities, numberOfBars):
        # generates all possible pairs from equities

        generatedPairs = []

        for i in range(len(equities)):
            for j in range(i + 1, len(equities)):
                generatedPairs.append(Pair(self, equities[i], equities[j], numberOfBars))

        self.Log('generated pairs= %s' % str(len(generatedPairs)))

        return generatedPairs


    def selectPairsByCorrelation(self, pairs, minimalCorrelation, maximalNumberOfPairs):
        # returns pairs with correlation > minimalCorrelation, sorted by correlation

        # select pairs with correlation > minimalCorrelation
        selectedPairs = [pair for pair in pairs if pair.isReady() and pair.correlation > minimalCorrelation]
        
        # sort pairs by correlation, descending
        selectedPairs.sort(key = lambda pair: pair.correlation, reverse = True)

        return selectedPairs


    def selectPairsByCointegration(self, pairs, maximalBIC, maximalNumberOfPairs):
        # returns pairs with co-integration ADF < maximalBIC, sorted by co-integration

        # select pairs with co-integration ADF < maximalBIC
        selectedPairs = [pair for pair in pairs if pair.isReady() and pair.adf < maximalBIC]
        
        # sort pairs by co-integration ADF
        selectedPairs.sort(key = lambda pair: pair.adf)

        return selectedPairs


class Pair(object):

    def __init__(self, algorithm, a, b, numberOfBars):
        # pair: stock a, stock b
        # stock will contain DataFrame with prices and dates

        self.algorithm = algorithm
        self.a = a
        self.b = b

        # keep num_bar of data points
        self.numberOfBars = numberOfBars

        # name of pair
        self.name = str(a) + ':' + str(b)
        
        self.error = 0
        self.last_error = 0
        self.df = None
        self.touch = 0

    def calculateCorrelation(self):
        # calculate correlation
        self.correlation = self.df.corr().ix[0][1]

    def calculateCointegration(self):
        self.model = sm.ols(formula = '%s ~ %s' % (str(self.a), str(self.b)), data = self.df).fit()
        self.adf = ts.adfuller(self.model.resid, autolag = 'BIC')[0]
        self.mean_error = np.mean(self.model.resid)
        self.standardDeviation = np.std(self.model.resid)

    def isReady(self):
        return not self.df is None and len(self.df) == self.numberOfBars 

    def update(self, data):
        if not data.ContainsKey(self.a):
            self.df = None
            return

        if not data.ContainsKey(self.b):
            self.df = None
            return

        data_a = data[self.a]
        data_b = data[self.b]
        
        try:
            priceOfStockA = float(data_a.Close)
            priceOfStockB = float(data_b.Close)
        except:
            self.df = None
            return

        a_price = []
        a_date = []
        b_price = []
        b_date = []

        a_price.append(priceOfStockA)
        a_date.append(data_a.EndTime)
        b_price.append(priceOfStockB)
        b_date.append(data_b.EndTime)
        
        new_df = pd.DataFrame({str(self.a):a_price, str(self.b):b_price}, index = [a_date]).dropna()
        
        if self.df is None:
            self.df = new_df
        else:
            # concatenate existing DataFrame with new data
            self.df = pd.concat([self.df, new_df])
        
        # keep numberOfBars of data points
        self.df = self.df.tail(self.numberOfBars)

        if self.isReady():
            self.calculateCorrelation()
            self.calculateCointegration()

            self.last_error = self.error
            self.error = priceOfStockA - (self.model.params[0] + self.model.params[1] * priceOfStockB)