Overall Statistics
Total Trades
152
Average Win
0.08%
Average Loss
-0.14%
Compounding Annual Return
-2.442%
Drawdown
5.200%
Expectancy
-0.473
Net Profit
-4.823%
Sharpe Ratio
-1.687
Probabilistic Sharpe Ratio
0.000%
Loss Rate
67%
Win Rate
33%
Profit-Loss Ratio
0.60
Alpha
-0.017
Beta
0.002
Annual Standard Deviation
0.01
Annual Variance
0
Information Ratio
-0.853
Tracking Error
0.124
Treynor Ratio
-9.115
Total Fees
$498.73
Estimated Strategy Capacity
$2100000.00
Lowest Capacity Asset
GOOCV VP83T1ZUHROL
Portfolio Turnover
10.38%
# region imports
from AlgorithmImports import *

from QuantConnect.Data.UniverseSelection import *

from AlgorithmImports import *
from enum import Enum

import statsmodels.api as sm
from statsmodels.tsa.stattools import coint, adfuller
# endregion
'''
The Alpha model looks to implement as insights the ADF test statistics. This alpha model is designed to rank every pair combination
by its ADF test and trade the pair with the highest correlation. The base model is the "Black Litterman Optimization Model"
which calculates a ratio between the two securities by dividing their historical prices over a lookback window. 
It then calculates the mean of this ratio by taking the 500-period EMA of the quotient. When the ratio diverges 
far enough from the mean ratio, this model emits generates alternating long ratio/short ratio insights emitted as a 
group to capture the reversion of the ratio.

Finally, we use the mean-variance optimization for the construction of the portfolio in 
combination with a Traling Stop for the risk management model.
'''
class DeterminedTanKitten(QCAlgorithm):

    lookback = 6*60 # Last trading day
    entry_th = 2
    my_resolution = Resolution.Minute

    def Initialize(self):
        self.SetStartDate(2018, 1, 1)  # Set Start Date
        self.SetEndDate(2020, 1, 1)  # Set Start Date
        self.SetCash(100000)  # Set Strategy Cash

        # Broker
        self.SetBrokerageModel(BrokerageName.QuantConnectBrokerage, AccountType.Margin) 

        # Init Universe, use only GOOG and MSFT
        self.UniverseSettings.Resolution = self.my_resolution
        tickers = ['GOOG', 'MSFT']
        tickers = [Symbol.Create(ticker, SecurityType.Equity, Market.USA) for ticker in tickers]
        self.AddUniverseSelection(ManualUniverseSelectionModel(tickers))
        
        # Alpha model base: https://www.quantconnect.com/docs/v2/writing-algorithms/algorithm-framework/alpha/supported-models#08-Base-Pairs-Trading-Model
        self.AddAlpha(ADFPairsTradingAlphaModel(resolution=self.my_resolution,
                                                minimumZScore=self.entry_th))
        # Portfolio construction model: https://www.quantconnect.com/docs/v2/writing-algorithms/algorithm-framework/portfolio-construction/supported-models#08-Mean-Variance-Optimization-Model
        self.SetPortfolioConstruction(MeanVarianceOptimizationPortfolioConstructionModel(resolution=self.my_resolution,
                                                                                        lookback=self.lookback))
        # Risk Model :https://www.quantconnect.com/docs/v2/writing-algorithms/algorithm-framework/risk-management/supported-models#07-Trailing-Stop-Model
        self.AddRiskManagement(TrailingStopRiskManagementModel())

        # Track of securities in the universe
        self.securityTracker = set()

## ------------------------------------- ALPHA MODEL ------------------------------------- ##
class MyBasePairsTradingAlphaModel(AlphaModel):
    '''This alpha model is designed to accept every possible pair combination
    from securities selected by the universe selection model
    This model generates alternating long ratio/short ratio insights emitted as a group'''

    def __init__(self, lookback = 1,
            resolution = Resolution.Daily,
            threshold = 1):
        ''' Initializes a new instance of the PairsTradingAlphaModel class
        Args:
            lookback: Lookback period of the analysis
            resolution: Analysis resolution
            threshold: The percent [0, 100] deviation of the ratio from the mean before emitting an insight'''
        self.lookback = lookback
        self.resolution = resolution
        self.threshold = threshold
        self.predictionInterval = Time.Multiply(Extensions.ToTimeSpan(self.resolution), self.lookback)

        self.pairs = dict()
        self.Securities = list()

        resolutionString = Extensions.GetEnumString(resolution, Resolution)
        self.Name = f'{self.__class__.__name__}({self.lookback},{resolutionString},{Extensions.NormalizeToStr(threshold)})'


    def Update(self, algorithm, data):
        ''' Updates this alpha model with the latest data from the algorithm.
        This is called each time the algorithm receives data for subscribed securities
        Args:
            algorithm: The algorithm instance
            data: The new data available
        Returns:
            The new insights generated'''
        insights = []

        for key, pair in self.pairs.items():
            insights.extend(pair.GetInsightGroup())

        return insights

    def OnSecuritiesChanged(self, algorithm, changes):
        '''Event fired each time the we add/remove securities from the data feed.
        Args:
            algorithm: The algorithm instance that experienced the change in securities
            changes: The security additions and removals from the algorithm'''

        for security in changes.AddedSecurities:
            self.Securities.append(security)

        for security in changes.RemovedSecurities:
            if security in self.Securities:
                self.Securities.remove(security)

        self.UpdatePairs(algorithm)

        for security in changes.RemovedSecurities:
            keys = [k for k in self.pairs.keys() if security.Symbol in k]
            for key in keys:
                self.pairs.pop(key).dispose()

    def UpdatePairs(self, algorithm):

        symbols = sorted([x.Symbol for x in self.Securities], key=lambda x: str(x.ID))

        for i in range(0, len(symbols)):
            asset_i = symbols[i]

            for j in range(1 + i, len(symbols)):
                asset_j = symbols[j]

                pair_symbol = (asset_i, asset_j)
                invert = (asset_j, asset_i)

                if pair_symbol in self.pairs or invert in self.pairs:
                    continue

                if not self.HasPassedTest(algorithm, asset_i, asset_j):
                    continue

                pair = self.Pair(algorithm, asset_i, asset_j, self.predictionInterval, self.threshold)
                self.pairs[pair_symbol] = pair

    def HasPassedTest(self, algorithm, asset1, asset2):
        '''Check whether the assets pass a pairs trading test
        Args:
            algorithm: The algorithm instance that experienced the change in securities
            asset1: The first asset's symbol in the pair
            asset2: The second asset's symbol in the pair
        Returns:
            True if the statistical test for the pair is successful'''
        return True

    class Pair:

        class State(Enum):
            ShortRatio = -1
            FlatRatio = 0
            LongRatio = 1

        def __init__(self, algorithm, asset1, asset2, predictionInterval, threshold):
            '''Create a new pair
            Args:
                algorithm: The algorithm instance that experienced the change in securities
                asset1: The first asset's symbol in the pair
                asset2: The second asset's symbol in the pair
                predictionInterval: Period over which this insight is expected to come to fruition
                threshold: The percent [0, 100] deviation of the ratio from the mean before emitting an insight'''
            self.state = self.State.FlatRatio

            self.algorithm = algorithm
            self.asset1 = asset1
            self.asset2 = asset2

            # Created the Identity indicator for a given Symbol and
            # the consolidator it is registered to. The consolidator reference 
            # will be used to remove it from SubscriptionManager
            def CreateIdentityIndicator(symbol: Symbol):
                resolution = min([x.Resolution for x in algorithm.SubscriptionManager.SubscriptionDataConfigService.GetSubscriptionDataConfigs(symbol)])

                name = algorithm.CreateIndicatorName(symbol, "close", resolution)
                identity = Identity(name)

                consolidator = algorithm.ResolveConsolidator(symbol, resolution)
                algorithm.RegisterIndicator(symbol, identity, consolidator)

                return identity, consolidator

            self.asset1Price, self.identityConsolidator1 = CreateIdentityIndicator(asset1);
            self.asset2Price, self.identityConsolidator2 = CreateIdentityIndicator(asset2);

            self.ratio = IndicatorExtensions.Over(self.asset1Price, self.asset2Price)
            self.mean = IndicatorExtensions.Of(ExponentialMovingAverage(500), self.ratio)

            upper = ConstantIndicator[IndicatorDataPoint]("ct", 1 + threshold / 100)
            self.upperThreshold = IndicatorExtensions.Times(self.mean, upper)

            lower = ConstantIndicator[IndicatorDataPoint]("ct", 1 - threshold / 100)
            self.lowerThreshold = IndicatorExtensions.Times(self.mean, lower)

            self.predictionInterval = predictionInterval

        def dispose(self):
            '''
            On disposal, remove the consolidators from the subscription manager
            '''
            self.algorithm.SubscriptionManager.RemoveConsolidator(self.asset1, self.identityConsolidator1)
            self.algorithm.SubscriptionManager.RemoveConsolidator(self.asset2, self.identityConsolidator2)

        def GetInsightGroup(self):
            '''Gets the insights group for the pair
            Returns:
                Insights grouped by an unique group id'''

            if not self.mean.IsReady:
                return []

            # don't re-emit the same direction
            if self.state is not self.State.LongRatio and self.ratio > self.upperThreshold:
                self.state = self.State.LongRatio

                # asset1/asset2 is more than 2 std away from mean, short asset1, long asset2
                shortAsset1 = Insight.Price(self.asset1, self.predictionInterval, direction=InsightDirection.Down,magnitude=self.ratio.Current.Value)
                longAsset2 = Insight.Price(self.asset2, self.predictionInterval, direction=InsightDirection.Up,magnitude=self.ratio.Current.Value)

                # creates a group id and set the GroupId property on each insight object
                return Insight.Group(shortAsset1, longAsset2)

            # don't re-emit the same direction
            if self.state is not self.State.ShortRatio and self.ratio < self.lowerThreshold:
                self.state = self.State.ShortRatio

                # asset1/asset2 is less than 2 std away from mean, long asset1, short asset2
                longAsset1 = Insight.Price(self.asset1, self.predictionInterval, direction=InsightDirection.Up,magnitude=self.ratio.Current.Value)
                shortAsset2 = Insight.Price(self.asset2, self.predictionInterval, direction=InsightDirection.Down,magnitude=self.ratio.Current.Value)

                # creates a group id and set the GroupId property on each insight object
                return Insight.Group(longAsset1, shortAsset2)

            return []

class ADFPairsTradingAlphaModel(MyBasePairsTradingAlphaModel):
    ''' 
    This model is an adaptation from the "PearsonCorrelationPairsTradingAlphaModel" implemented 
    by QuantConnect, It looks to implement as insights the ADF test statistics.

    This alpha model is designed to rank every pair combination by its ADF test 
    and trade the pair with the highest correlation
    This model generates alternating long ratio/short ratio insights emitted as a group.

    '''

    def __init__(self, lookback:int = 15,
            resolution = Resolution.Minute,
            threshold = 1,
            minimumZScore = 1,
            min_statistical_significance = 0.05):
        '''Initializes a new instance of the PearsonCorrelationPairsTradingAlphaModel class
        Args:
            lookback: lookback period of the analysis
            resolution: analysis resolution
            threshold: The percent [0, 100] deviation of the ratio from the mean before emitting an insight
            minimumCorrelation: The minimum correlation to consider a tradable pair'''
        super().__init__(lookback, resolution, threshold)
        self.lookback = lookback
        self.resolution = resolution
        self.min_statistical_significance = min_statistical_significance
        self.minimumZScore = minimumZScore
        self.best_pair = ()

    def Update(self, algorithm: QCAlgorithm, data: Slice) -> List[Insight]:
        # Update every hour
        if int(algorithm.Time.minute) != 0:
            return []
        symbols = []
        for x in self.Securities:
            symbols.append(x.Symbol)
            if not x.Exchange.Hours.IsOpen(algorithm.Time, extendedMarket=False):
                return []
        
        history = algorithm.History(symbols, self.lookback, self.resolution).close.unstack(level=0)

        if not history.empty:
            
            df = self.get_price_dataframe(history)
            stop = len(df.columns)

            adf = dict()

            for i in range(0, stop):
                for j in range(i+1, stop):
                    if (j, i) not in adf:
                        adf[(i, j)] = self.ComputeADF(df.iloc[:,i], df.iloc[:,j])
            # Statistical significan or Stationarity
            corr = list(filter(lambda kv: kv[1][0]<self.min_statistical_significance, adf.items()))
            if len(corr)==0:
                return []
            else:
                # Lowest statistical test -> better conintegration
                corr = sorted(corr, key = lambda kv: kv[-1][1],reverse=True)
                # ZScore filter
                if abs(corr[-1][-1][2]) >= self.minimumZScore:
                    algorithm.Log('The best pair was ({} - {}) with a p-value of {}, ADF-stat of {} and Z-Score of {}'.\
                                format(symbols[corr[-1][0][0]], symbols[corr[-1][0][1]],
                                        corr[-1][-1][0],corr[-1][-1][1], corr[-1][-1][2]))
                    self.best_pair = (symbols[corr[-1][0][0]], symbols[corr[-1][0][1]])
                    self.UpdatePairs(algorithm)
        insights = super().Update(algorithm, data)
        return insights
    
    def ComputeADF(self, Y, X):
        # Get history logs
        X = sm.add_constant(X)
        # Regression model
        model = sm.OLS(Y,X)
        results = model.fit()
        # Standard deviation of the residual
        sigma = np.sqrt(results.mse_resid) 
        slope = results.params[1]
        intercept = results.params[0]
        # Regression residual has mean =0 by definition
        res = results.resid 
        zscore = res/sigma
        adf = adfuller (res)
        adf = pd.Series(adf[0:4], index=['Test Statistic','p-value','#Lags Used','Number of Observations Used'])
        return [adf['p-value'], adf['Test Statistic'], zscore.values[-1]]

    def HasPassedTest(self, algorithm, asset1, asset2):
        '''Check whether the assets pass a pairs trading test
        Args:
            algorithm: The algorithm instance that experienced the change in securities
            asset1: The first asset's symbol in the pair
            asset2: The second asset's symbol in the pair
        Returns:
            True if the statistical test for the pair is successful'''
        return self.best_pair is not None and self.best_pair == (asset1, asset2)

    def get_price_dataframe(self, df):
        timezones = { x.Symbol.Value: x.Exchange.TimeZone for x in self.Securities }

        # Use log prices
        df = np.log(df)

        is_single_timeZone = len(set(timezones.values())) == 1

        if not is_single_timeZone:
            series_dict = dict()

            for column in df:
                # Change the dataframe index from data time to UTC time
                to_utc = lambda x: Extensions.ConvertToUtc(x, timezones[column])
                if self.resolution == Resolution.Daily:
                    to_utc = lambda x: Extensions.ConvertToUtc(x, timezones[column]).date()

                data = df[[column]]
                data.index = data.index.map(to_utc)
                series_dict[column] = data[column]

            df = pd.DataFrame(series_dict).dropna()

        return (df - df.shift(1)).dropna()