Overall Statistics
Total Trades
0
Average Win
0%
Average Loss
0%
Compounding Annual Return
0%
Drawdown
0%
Expectancy
0
Net Profit
0%
Sharpe Ratio
0
Probabilistic Sharpe Ratio
0%
Loss Rate
0%
Win Rate
0%
Profit-Loss Ratio
0
Alpha
0
Beta
0
Annual Standard Deviation
0
Annual Variance
0
Information Ratio
-0.531
Tracking Error
0.143
Treynor Ratio
0
Total Fees
$0.00
import pandas as pd
from statsmodels.regression.linear_model import OLS
from statsmodels.tsa.stattools import adfuller

class JamesTExampleMeanReversionAlgo(QCAlgorithm):

    def Initialize(self):
        self.SetStartDate(2015, 1, 1)
        self.SetEndDate(2017, 1, 1)
        self.SetCash(1000)
        
        # Add tradable universe
        self.tickers = ['MSFT', 'AAPL', 'IBM', 'TSLA',                      # Arbitrary universe; Test some assets manually in Research OR take a large universe, and take only significant betas in OLS
                        'GOOG', 'FB', 'NFLX', 'SPY']
        for ticker in self.tickers:
            self.AddEquity(ticker, Resolution.Minute)
        
        self.significanceLevel = 0.05
        self.ols_betas = None
        self.spread_moments = (None, None)                                  # Historical (mean, std) of spread
        
        self.RunModel()                                                     ## Doing this means we estimate cointegration dynamics once at backtest start
                                                                            # Unlikely the dynamics stay constant over time, so play around with re-estimating
                                                                            # the coefficients in a ScheduledFunction (i.e. weekly, monthly, etc)
        
    def OnData(self, data):
        """
        This function is called whenever new data is received from the engine.
        """
        
        # Data Integrity check
        for ticker in self.tickers:
            if not data.ContainsKey(ticker):
                return
        
        # Ensure model converged
        if (self.ols_betas is None) or (self.spread_moments[0] is None) or (self.spread_moments[1] is None):
            return
            
        # Form our residual series
        tickerCloses = {}                                                   # Dict of { Security ID: Price }
        for ticker in self.tickers:
            close = data[ticker].Close
            tickerCloses[str(self.Symbol(ticker).ID)] = close               # In RunModel(), the columns of historical data are QC's Security IDs so we map: Ticker -> Security ID
        spread = self.ols_betas.multiply(tickerCloses)
        
        # Transform into z-scores for interpretability
        spread_standardized = ( spread - self.spread_moments[0] ) / self.spread_moments[1]
        
        # Trading Logic: Short the spread if 1 standard deviation above mean
        if spread_standardized > 1:
            
            self.Debug('Shorting spread')
            for n, beta in enumerate(self.ols_betas):
                ticker = str(self.Symbol(self.ols_betas.index[n]).Value)    # Map Security ID -> Ticker
                # Short positive coefficients
                if (beta > 0):
                    self.SetHoldings(ticker, -beta)
                    continue
                # Long negative coefficients
                if (beta < 0):
                    self.SetHoldings(ticker, beta)
                    continue
                
        # Trading Logic: Long the spread if 1 standard deviation below mean
        if spread_standardized < -1:
            
            self.Debug('Long spread')
            for n, beta in enumerate(self.ols_betas):
                ticker = str(self.Symbol(self.ols_betas.index[n]).Value)
                # Long positive coefficients
                if (beta > 0):
                    self.SetHoldings(ticker, beta)
                    continue
                # Short negative coefficients
                if (beta < 0):
                    self.SetHoldings(ticker, -beta)
                    continue
                
        return
        
    def RunModel(self):
        
        self.ols_betas = None
        
        # Get some historical data
        history = self.History(self.Securities.Keys, timedelta(weeks=52), Resolution.Minute)
        close = history['close'].unstack(level=0).dropna()
        
        ## Engle-Granger ##
        
        # 1. Pre-test all time series are I(1)
        unitRootProcesses = []
        for col in close.columns:
            res = adfuller(close[col], regression='c', autolag='aic')
            if res[1] > self.significanceLevel:                             # Accept null hypothesis that X's are unit root I(1) processes
                unitRootProcesses.append(col)
            
        # 2. Run OLS of Y on X
        filteredData = close.loc[:, unitRootProcesses]
        if filteredData.empty:
            return
        
        Y = filteredData.iloc[:, 0]
        X = filteredData.iloc[:, 1:]
        X['Intercept'] = 1
        ols_results = OLS(Y, X, hasconst=True).fit()                        # Use OLS to estimate coefficients of: Y = alpha + beta_1*X_1 + beta_2*X_2 + ...  (*)
        ols_betas = ols_results.params                                      # These are the estimates in a pandas.Series: [ Intercept=alpha, X_1=beta_1, X_2=beta_2, ...]
        ols_betas = dict(-ols_betas)                                        # Take negative betas and add coefficient of 1 for Y
        ols_betas[filteredData.columns[0]] = 1                              # Why? We are rearranging (*) into: Y - beta_1*X_1 - beta_2*X_2 - ... = alpha + epsilon
        ols_betas.pop('Intercept', None)                                    # Drop the intercept term (shift spread along y-axis so it mean-reverts around 0)
        
        # Normalize ols_betas so absolute norm is 1
        # We can now interpret betas as portfolio weights that make the spread stationary
        ols_betas = pd.Series(ols_betas)
        norm = ols_betas.abs().sum()
        ols_betas = ols_betas / norm
        self.ols_betas = ols_betas
        
        # 3. Form residual series (spread). Test if I(0)
        spread = filteredData.multiply(ols_betas, axis=1).sum(axis=1)       # Given X, reconstruct: epsilon = Y - beta_1*X_1 - beta_2*X_2 - ...
        res = adfuller(spread, regression='nc', autolag='aic')
        if res[1] > self.significanceLevel:                                 # Reject null hypothesis => Spread is stationary
            self.Debug('Model did not converge.')
            return 
    
        # Compute/Store historical mean/std of spread
        self.spread_moments = (spread.mean(axis=0), spread.std(axis=0))     ## If the spread is stationary, then mean/variance are constant over time
                                                                            # In other words, if cointegration persists in future we expect spread to revert around mu with variance sigma^2
        self.Debug('Model converged.')