Overall Statistics
Total Trades
22141
Average Win
0.04%
Average Loss
-0.04%
Compounding Annual Return
384.250%
Drawdown
0.300%
Expectancy
0.221
Net Profit
501.056%
Sharpe Ratio
21.534
Probabilistic Sharpe Ratio
100%
Loss Rate
43%
Win Rate
57%
Profit-Loss Ratio
1.13
Alpha
1.309
Beta
-0.037
Annual Standard Deviation
0.06
Annual Variance
0.004
Information Ratio
8.18
Tracking Error
0.13
Treynor Ratio
-35.251
Total Fees
$29092.61
from Selection.FundamentalUniverseSelectionModel import FundamentalUniverseSelectionModel
import pprint
from datetime import timedelta
from scipy.stats import spearmanr
from statsmodels.tsa.stattools import coint, adfuller
import numpy as np
import pandas as pd
from clr import AddReference
AddReference("QuantConnect.Common")
AddReference("QuantConnect.Algorithm.Framework")
AddReference("QuantConnect.Indicators")
from QuantConnect import *
from QuantConnect.Indicators import *
from QuantConnect.Algorithm.Framework.Alphas import *
from statsmodels.distributions.empirical_distribution import ECDF
from statsmodels.tsa.vector_ar.vecm import coint_johansen

GlobalPairsList = []


class PairsUniverseSelection(FundamentalUniverseSelectionModel):
    
    def __init__(self, filterFineData = True, universeSettings = None, securityInitializer = None):
        '''Initializes a new default instance of the TechnologyUniverseModule'''
        super().__init__(filterFineData, universeSettings, securityInitializer)
        self.numberOfSymbolsCoarse = 3000
        self.numberOfSymbolsFine = 1000
        self.dollarVolumeBySymbol = {}
        self.lastMonth = -1
        self.pairsList = list()
        self.resolution = Resolution.Daily
        self.minCorr = 0.75
        self.maxAdfP = 1e-10     #1e-15
        self.maxCointP = 1e-10   #1e-15
        self.lookback = 252 * 3
        self.max_pairs = 6
        
    
    def PairsTest(self, algorithm, history):
        df = self.get_returns_dataframe(history, algorithm)
        pairs = list()
        symbols = list()
        corr = df.corr('spearman').unstack()
        corr = corr.sort_values(kind='quicksort', ascending=False,)[corr < 1][corr>=self.minCorr]
        for i in range(0, len(corr)):
            pair = corr.index[i]
            if corr.index[i] in pairs or (corr.index[i][1], corr.index[i][0]) in pairs: 
                continue    
            if coint( df.loc[:,pair[0]], df.loc[:,pair[1]] )[1] <= self.maxCointP:
                pairs.append(pair)
                
                for symbol in pair:
                    if not symbol in symbols:
                        symbols.append(symbol)
                if len(pairs) >= self.max_pairs:
                    return pairs, symbols
                
                
        return pairs, symbols

    def SelectCoarse(self, algorithm, coarse):
        '''
        Performs a coarse selection:
        
        -The stock must have fundamental data
        -The stock must have positive previous-day close price
        -The stock must have positive volume on the previous trading day
        '''
        if algorithm.Time.month == self.lastMonth: 
            return Universe.Unchanged

        
            
        filtered = [x for x in coarse if x.HasFundamentalData and x.Volume > 0 and x.Price > 0]
        
        sortedByDollarVolume = sorted(filtered, key = lambda x: x.DollarVolume, reverse=True)[:self.numberOfSymbolsCoarse]
        symbols = []
        
        self.dollarVolumeBySymbol.clear()
        for x in sortedByDollarVolume:
            symbols.append(x.Symbol)
            self.dollarVolumeBySymbol[x.Symbol] = x.DollarVolume
        
        return symbols

    def SelectFine(self, algorithm, fine):
        '''
        Performs a fine selection:
        
        -The company's headquarter must in the U.S.
        -The stock must be traded on the NASDAQ stock exchange
        -At least half a year since its initial public offering
        -The stock must be in the Industry Template Code catagory N
        '''
        if algorithm.Time.month == self.lastMonth: 
            return Universe.Unchanged
        self.lastMonth = algorithm.Time.month
        
        
        # Filter stocks
        filteredFine = [x for x in fine if x.CompanyReference.CountryId == "USA"
                                        and ( (x.CompanyReference.PrimaryExchangeID == "NAS") 
                                        or (x.CompanyReference.PrimaryExchangeID == "NYSE") )
                                        and (algorithm.Time - x.SecurityReference.IPODate).days > 180
                                        and x.CompanyReference.IndustryTemplateCode == "N"
                                        ]
        sortedByDollarVolume = []

        # Sort stocks on dollar volume
        sortedByDollarVolume = sorted(filteredFine, key = lambda x: self.dollarVolumeBySymbol[x.Symbol], reverse=True)
        symbols = [x.Symbol for x in sortedByDollarVolume[:self.numberOfSymbolsFine]]

        #symbol_strings = list()
        #for symbol in symbols:
        #    symbol_strings.append(str(symbol))
        
       
        symbolsInPairs = list()
        GlobalPairsList.clear()
        
        history = algorithm.History(symbols, self.lookback, self.resolution)
        if history.empty:
            return symbolsInPairs
        
        Test = self.PairsTest(algorithm, history)
                
        GlobalPairsList.extend(Test[0]) ##################
        symbolsInPairs.extend(Test[1])
        

        algorithm.Debug(GlobalPairsList)
        algorithm.Log(GlobalPairsList)
        algorithm.Debug(str(len(symbolsInPairs)) + " symbols in universe after")
        algorithm.Log(str(len(symbolsInPairs)) + " symbols in universe after on " + str(algorithm.UtcTime))
        algorithm.Debug(str(len(GlobalPairsList)) + " Global Pairs List " + str(algorithm.UtcTime))
        algorithm.Log(str(len(GlobalPairsList)) + " Global Pairs List " + str(algorithm.UtcTime))
        
        return symbolsInPairs
    
        
    def get_returns_dataframe(self, history, algorithm):
        df = np.log(history.unstack(level = 0).close)
        df = 1 + pd.DataFrame(df).pct_change().dropna(axis=1, thresh = int(len(df.index) * .85)).dropna()
        return df
          
        
from collections import OrderedDict
from clr import AddReference
AddReference("QuantConnect.Common")
AddReference("QuantConnect.Algorithm.Framework")
AddReference("QuantConnect.Indicators")

from QuantConnect import *
from QuantConnect.Indicators import *
from QuantConnect.Algorithm.Framework.Alphas import *
from BasePairsFrameworkAlpha import BasePairsTradingAlphaModel

class CorrelationAndCointegrationPairsTrading(BasePairsTradingAlphaModel):
    ''' This alpha model is designed to rank every pair combination by its pearson corrDictelation 
    and trade the pair with the hightest corrDictelation
    This model generates alternating long ratio/short ratio insights emitted as a group'''

    def __init__(self, 
            lookback = 1080,
            resolution = Resolution.Daily,
            threshold = .1,
            minimumcorrelation = .99):
        '''Initializes a new instance of the PearsoncorrDictelationPairsTradingAlphaModel class
        Args:
            lookback: lookback period of the analysis
            resolution: analysis resolution
            threshold: The percent [0, 100] deviation of the ratio from the mean before emitting an insight
            minimumcorrDictelation: The minimum corrDictelation to consider a tradable pair'''
        super().__init__(lookback, resolution, threshold)
        self.lookback = lookback
        self.resolution = resolution
        self.minimumcorrelation = .75
        self.max_p_value = 1 - minimumcorrelation
        self.pairsList = list()
        self.DataIsConsolidated = False

    def OnSecuritiesChanged(self, algorithm, changes):
        '''Event fired each time the we add/remove securities from the data feed.
        Args:
            algorithm: The algorithm instance that experienced the change in securities
            changes: The security additions and removals from the algorithm'''
        
        
        
        self.pairsList = GlobalPairsList
        if len(self.pairsList) == 0:
            algorithm.Debug("No Pairs; Date: " + str(algorithm.UtcTime))
            algorithm.Log("No Pairs; Date: " + str(algorithm.UtcTime))
        else:
            algorithm.Debug("Number of pairs: " + str(len(self.pairsList)) + ", Date: " + str(algorithm.UtcTime))
            algorithm.Log("Number of pairs: " + str(len(self.pairsList)) + ", Date: " + str(algorithm.UtcTime))
            
            
        super().OnSecuritiesChanged(algorithm, changes)
        
        
    def HasPassedTest(self, algorithm, asset1, asset2):
        '''Check whether the assets pass a pairs trading test
        Args:
            algorithm: The algorithm instance that experienced the change in securities
            asset1: The first asset's symbol in the pair
            asset2: The second asset's symbol in the pair
        Returns:
            True if the statistical test for the pair is successful'''

        pair = (str(asset1), str(asset2))
        reverse = (str(asset2), str(asset1))
        if pair in self.pairsList \
        or reverse in self.pairsList:
            return True 
        return False
    
    
        
        
        
        
        
        
        
        #
# QUANTCONNECT.COM - Democratizing Finance, Empowering Individuals.
# Lean Algorithmic Trading Engine v2.0. Copyright 2014 QuantConnect Corporation.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

from clr import AddReference
AddReference("QuantConnect.Common")
AddReference("QuantConnect.Algorithm")
AddReference("QuantConnect.Algorithm.Framework")
AddReference("QuantConnect.Indicators")

from QuantConnect import *
from QuantConnect.Indicators import *
from QuantConnect.Algorithm import *
from QuantConnect.Algorithm.Framework import *
from QuantConnect.Algorithm.Framework.Alphas import *
from datetime import timedelta
from enum import Enum
import numpy as np
from collections import deque


class BasePairsTradingAlphaModel(AlphaModel):
    '''This alpha model is designed to accept every possible pair combination
    from securities selected by the universe selection model
    This model generates alternating long ratio/short ratio insights emitted as a group'''

    def __init__(self, lookback = 252,
            resolution = Resolution.Daily,
            threshold = 1, 
            predictionInterval = Time.Multiply(Extensions.ToTimeSpan(Resolution.Minute), 120)):

        ''' Initializes a new instance of the PairsTradingAlphaModel class
        Args:
            lookback: Lookback self.period of the analysis
            resolution: Analysis resolution
            threshold: The percent [0, 100] deviation of the ratio from the mean before emitting an insight'''
       
        self.lookback = lookback
        self.resolution = resolution
        self.threshold = threshold
        self.predictionInterval = predictionInterval
        self.consolidated = False
        self.pairs = dict()
        self.Securities = list()
        resolutionString = Extensions.GetEnumString(resolution, Resolution)
        self.Name = f'{self.__class__.__name__}({self.lookback},{resolutionString},{Extensions.NormalizeToStr(threshold)})'

    

    def Update(self, algorithm, data):
        ''' Updates this alpha model with the latest data from the algorithm.
        This is called each time the algorithm receives data for subscribed securities
        Args:
            algorithm: The algorithm instance
            data: The new data available
        Returns:
            The new insights generated'''
        insights = []
        for key, pair in self.pairs.items():
            insights.extend(pair.GetInsightGroup(algorithm))

        return insights

    def OnSecuritiesChanged(self, algorithm, changes):
        '''Event fired each time the we add/remove securities from the data feed.
        Args:
            algorithm: The algorithm instance that experienced the change in securities
            changes: The security additions and removals from the algorithm'''
        self.UpdatePairs(algorithm)

    def UpdatePairs(self, algorithm):
        symbols = sorted([x.Symbol for x in algorithm.ActiveSecurities.Values], key=lambda x: str(x.ID))
        self.pairs = {}
        for i in range(0, len(symbols)):
            asset_i = symbols[i]

            for j in range(1 + i, len(symbols)):
                asset_j = symbols[j]

                pair_symbol = (asset_i, asset_j)
                invert = (asset_j, asset_i)

                if pair_symbol in self.pairs or invert in self.pairs:
                    continue

                if not self.HasPassedTest(algorithm, asset_i, asset_j):
                    continue
                
                pair = self.Pair(algorithm, asset_i, asset_j, self.predictionInterval, self.threshold)
                self.pairs[pair_symbol] = pair
                
        
    def HasPassedTest(self, algorithm, asset1, asset2):
        '''Check whether the assets pass a pairs trading test
        Args:
            algorithm: The algorithm instance that experienced the change in securities
            asset1: The first asset's symbol in the pair
            asset2: The second asset's symbol in the pair
        Returns:
            True if the statistical test for the pair is successful'''
        return True

    class Pair:

        class State(Enum):
            ShortRatio = -1
            FlatRatio = 0
            LongRatio = 1
        def __init__(self, algorithm, asset1, asset2, predictionInterval, threshold):
            '''Create a new pair
            Args:
                algorithm: The algorithm instance that experienced the change in securities
                asset1: The first asset's symbol in the pair
                asset2: The second asset's symbol in the pair
                predictionInterval: self.period over which this insight is expected to come to fruition
                threshold: The percent [0, 100] deviation of the ratio from the mean before emitting an insight'''
            self.state = self.State.FlatRatio
            self.period = 500
            self.barsize = 1
            consolidatorLength = timedelta(minutes=self.barsize)
            self.pair = (asset1, asset2)
            tradeBarConsolidator1 = TradeBarConsolidator(consolidatorLength)
            tradeBarConsolidator2 = TradeBarConsolidator(consolidatorLength)
            
            algorithm.SubscriptionManager.AddConsolidator(asset1, tradeBarConsolidator1)
            algorithm.SubscriptionManager.AddConsolidator(asset2, tradeBarConsolidator2)

            self.asset1 = asset1
            self.asset2 = asset2
            
            self.spreadDeque = deque(maxlen=self.period)
            
            self.asset1Price = SimpleMovingAverage(1)
            self.asset2Price = SimpleMovingAverage(1)

            algorithm.RegisterIndicator(asset1, self.asset1Price, tradeBarConsolidator1)
            algorithm.RegisterIndicator(asset2, self.asset2Price, tradeBarConsolidator2)
            
            tradeBarConsolidator1.DataConsolidated += self.asset1Consolidated
            tradeBarConsolidator2.DataConsolidated += self.asset2Consolidated
            
        
            self.upperThreshold = 1 + (threshold / 100)
            self.lowerThreshold = 1 - (threshold / 100)

            self.count = 0
            
            self.predictionInterval = predictionInterval
            
            self.Data1IsConsolidated = False
            self.Data2IsConsolidated = False
            
            hist1 = algorithm.History(asset1, self.period * self.barsize, Resolution.Minute)
            hist2 = algorithm.History(asset2, self.period * self.barsize, Resolution.Minute)
            if not 'close' in hist1:
                return
            if not 'close' in hist2:
                return
            ###bar1 = np.log(np.array(hist1['close'][0::self.barsize]))
            bar1 = np.array(hist1['close'][0::self.barsize])
            ###bar2 = np.log(np.array(hist2['close'][0::self.barsize]))
            bar2 = np.array(hist2['close'][0::self.barsize])
            minhist = min(len(bar1), len(bar2))

            self.spread = np.divide(bar1[:minhist], bar2[:minhist])
            for i in range (0, len(self.spread)):
                self.spreadDeque.appendleft(self.spread[i])
            self.mean = np.mean(self.spreadDeque)
            self.std = np.std(self.spreadDeque)
            self.ratio = None

        def asset1Consolidated(self, sender, consolidated):
            self.Data1IsConsolidated = True
            
        def asset2Consolidated(self, sender, consolidated):
            self.Data2IsConsolidated = True
        
        def GetInsightGroup(self, algorithm):
            '''Gets the insights group for the pair
            Returns:
                Insights grouped by an unique group id'''
            # *** Add History Warmup
            if not (self.Data1IsConsolidated & self.Data2IsConsolidated):
                return []
            
            ###self.ratio = np.log(self.asset1Price.Current.Value) / np.log(self.asset2Price.Current.Value)
            self.ratio = self.asset1Price.Current.Value / self.asset2Price.Current.Value
            self.spreadDeque.appendleft(self.ratio)  
            self.mean = np.mean(self.spreadDeque)
            self.std = np.std(self.spreadDeque)
            self.zscore = (self.ratio - self.mean) / self.std
            
            self.Data1IsConsolidated = False
            self.Data2IsConsolidated = False
            if len(self.spreadDeque) != self.period:
                return []
                
            if self.state is not self.State.LongRatio and self.ratio > (self.mean * self.upperThreshold):
                self.state = self.State.LongRatio
                # asset1/asset2 is more than 2 std away from mean, short asset1, long asset2
                shortAsset1 = Insight.Price(self.asset1, self.predictionInterval, InsightDirection.Down)
                longAsset2 = Insight.Price(self.asset2, self.predictionInterval, InsightDirection.Up)
                #algorithm.SetHoldings(self.asset1, -weight)
                #algorithm.SetHoldings(self.asset2, weight)
                
                # creates a group id and set the GroupId property on each insight object
                return Insight.Group(shortAsset1, longAsset2)
        
        
            # don't re-emit the same direction
            if self.state is not self.State.ShortRatio and self.ratio < (self.mean * self.lowerThreshold):
                self.state = self.State.ShortRatio
                # asset1/asset2 is less than 2 std away from mean, long asset1, short asset2
                longAsset1 = Insight.Price(self.asset1, self.predictionInterval, InsightDirection.Up)
                shortAsset2 = Insight.Price(self.asset2, self.predictionInterval, InsightDirection.Down)
                #algorithm.SetHoldings(self.asset1, weight)
                #algorithm.SetHoldings(self.asset2, -weight)
                # creates a group id and set the GroupId property on each insight object
                return Insight.Group(longAsset1, shortAsset2)

            # don't re-emit the same direction
            if self.state is self.State.ShortRatio and self.ratio > self.mean:
                self.state = self.State.FlatRatio
                # asset1/asset2 is less than 2 std away from mean, long asset1, short asset2
                neutralasset1 = Insight.Price(self.asset1, self.predictionInterval, InsightDirection.Flat)
                neutralasset2 = Insight.Price(self.asset2, self.predictionInterval, InsightDirection.Flat)
                #algorithm.SetHoldings(self.asset1, 0)
                #algorithm.SetHoldings(self.asset2, 0)
                # creates a group id and set the GroupId property on each insight object
                return []#Insight.Group(neutralasset1, neutralasset2)
            
            # don't re-emit the same direction
            if self.state is self.State.LongRatio and self.ratio < self.mean:
                self.state = self.State.FlatRatio
                # asset1/asset2 is less than 2 std away from mean, long asset1, short asset2
                neutralasset1 = Insight.Price(self.asset1, self.predictionInterval, InsightDirection.Flat)
                neutralasset2 = Insight.Price(self.asset2, self.predictionInterval, InsightDirection.Flat)
                #algorithm.SetHoldings(self.asset1, 0)
                #algorithm.SetHoldings(self.asset2, 0)
                # creates a group id and set the GroupId property on each insight object
                return []#Insight.Group(neutralasset1, neutralasset2)

            return []
            
            
            
            
            
##
# QUANTCONNECT.COM - Democratizing Finance, Empowering Individuals.
# Lean Algorithmic Trading Engine v2.0. Copyright 2014 QuantConnect Corporation.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

from clr import AddReference
AddReference("QuantConnect.Common")
AddReference("QuantConnect.Algorithm")
AddReference("QuantConnect.Algorithm.Framework")
AddReference("QuantConnect.Indicators")

from QuantConnect import *
from QuantConnect.Indicators import *
from QuantConnect.Algorithm import *
from QuantConnect.Algorithm.Framework import *
from QuantConnect.Algorithm.Framework.Alphas import *
from datetime import timedelta
from enum import Enum
import numpy as np
from collections import deque


class BasePairsTradingAlphaModel(AlphaModel):
    '''This alpha model is designed to accept every possible pair combination
    from securities selected by the universe selection model
    This model generates alternating long ratio/short ratio insights emitted as a group'''

    def __init__(self, lookback = 252,
            resolution = Resolution.Daily,
            threshold = .4, 
            predictionInterval = Time.Multiply(Extensions.ToTimeSpan(Resolution.Minute), 1000)):

        ''' Initializes a new instance of the PairsTradingAlphaModel class
        Args:
            lookback: Lookback self.period of the analysis
            resolution: Analysis resolution
            threshold: The percent [0, 100] deviation of the ratio from the mean before emitting an insight'''
       
        self.lookback = lookback
        self.resolution = resolution
        self.threshold = threshold
        self.predictionInterval = predictionInterval
        self.consolidated = False
        self.pairs = dict()
        self.Securities = list()
        resolutionString = Extensions.GetEnumString(resolution, Resolution)
        self.Name = f'{self.__class__.__name__}({self.lookback},{resolutionString},{Extensions.NormalizeToStr(threshold)})'

    

    def Update(self, algorithm, data):
        ''' Updates this alpha model with the latest data from the algorithm.
        This is called each time the algorithm receives data for subscribed securities
        Args:
            algorithm: The algorithm instance
            data: The new data available
        Returns:
            The new insights generated'''
        insights = []
        for key, pair in self.pairs.items():
            insights.extend(pair.GetInsightGroup(algorithm))

        return insights

    def OnSecuritiesChanged(self, algorithm, changes):
        '''Event fired each time the we add/remove securities from the data feed.
        Args:
            algorithm: The algorithm instance that experienced the change in securities
            changes: The security additions and removals from the algorithm'''
        self.UpdatePairs(algorithm, changes)

    def UpdatePairs(self, algorithm, changes):
        symbols = sorted([x.Symbol for x in algorithm.ActiveSecurities.Values], key=lambda x: str(x.ID))

        for symbol in changes.RemovedSecurities:
            if symbol in symbols:
                symbols.remove(symbol)

            # Remove pair from dict and its consolidators from the algorithm
            pair = self.pairs.pop(symbol, None)
            if pair is not None:
                algorithm.Log(f"Removing Consolidators {algorithm.UtcTime}")
                algorithm.Debug(f"Removing Consolidators {algorithm.UtcTime}")
                pair.RemoveConsolidators(algorithm)


        for i in range(0, len(symbols)):
            asset_i = symbols[i]
            for j in range(1 + i, len(symbols)):
                asset_j = symbols[j]
                pair_symbol = (asset_i, asset_j)
                invert = (asset_j, asset_i)

                if pair_symbol in self.pairs or invert in self.pairs:
                    continue

                if not self.HasPassedTest(algorithm, asset_i, asset_j):
                    continue
                
                pair = self.Pair(algorithm, asset_i, asset_j, self.predictionInterval, self.threshold)
                self.pairs[pair_symbol] = pair
                
       
    def HasPassedTest(self, algorithm, asset1, asset2):
        '''Check whether the assets pass a pairs trading test
        Args:
            algorithm: The algorithm instance that experienced the change in securities
            asset1: The first asset's symbol in the pair
            asset2: The second asset's symbol in the pair
        Returns:
            True if the statistical test for the pair is successful'''
        return True
    
    class Pair:

        class State(Enum):
            ShortRatio = -1
            FlatRatio = 0
            LongRatio = 1 
            
        def __init__(self, algorithm, asset1, asset2, predictionInterval, threshold):
                '''Create a new pair
                Args:
                    algorithm: The algorithm instance that experienced the change in securities
                    asset1: The first asset's symbol in the pair
                    asset2: The second asset's symbol in the pair
                    predictionInterval: self.period over which this insight is expected to come to fruition
                    threshold: The percent [0, 100] deviation of the ratio from the mean before emitting an insight'''
                self.state = self.State.FlatRatio
                self.period = 500
                self.barsize = 1
                consolidatorLength = timedelta(minutes=self.barsize)
                self.pair = (asset1, asset2)
                self.consolidator1 = TradeBarConsolidator(consolidatorLength)
                self.consolidator2 = TradeBarConsolidator(consolidatorLength)
                
                #algorithm.SubscriptionManager.AddConsolidator(asset1, tradeBarConsolidator1)
                #algorithm.SubscriptionManager.AddConsolidator(asset2, tradeBarConsolidator2)
    
                self.asset1 = asset1
                self.asset2 = asset2
                self.security1 = algorithm.Securities[asset1]
                self.security2 = algorithm.Securities[asset2]
                self.spreadDeque = deque(maxlen=self.period)
                
                self.asset1Price = Identity(asset1)
                self.asset2Price = Identity(asset2)
    
                algorithm.RegisterIndicator(self.asset1, self.asset1Price, self.consolidator1)
                algorithm.RegisterIndicator(self.asset2, self.asset2Price, self.consolidator2)
                
                #tradeBarConsolidator1.DataConsolidated += self.asset1Consolidated
                #tradeBarConsolidator2.DataConsolidated += self.asset2Consolidated
                
            
                self.upperThreshold = 1 + (threshold / 100)
                self.lowerThreshold = 1 - (threshold / 100)
    
                self.count = 0
                
                self.predictionInterval = predictionInterval
                
                #self.Data1IsConsolidated = False
                #self.Data2IsConsolidated = False
                
                hist1 = algorithm.History(asset1, self.period * self.barsize, Resolution.Minute)
                hist2 = algorithm.History(asset2, self.period * self.barsize, Resolution.Minute)
                if not 'close' in hist1:
                    return
                if not 'close' in hist2:
                    return
                ###bar1 = np.log(np.array(hist1['close'][0::self.barsize]))
                bar1 = np.array(hist1['close'])
                ###bar2 = np.log(np.array(hist2['close'][0::self.barsize]))
                bar2 = np.array(hist2['close'])
                minhist = min(len(bar1), len(bar2))
    
                self.spread = np.divide(bar1[:minhist], bar2[:minhist])
                for i in range (0, len(self.spread)):
                    self.spreadDeque.appendleft(self.spread[i])
                self.mean = np.mean(self.spreadDeque)
                #self.std = np.std(self.spreadDeque)
                self.ratio = None
    
        #def asset1Consolidated(self, sender, consolidated):
            #self.Data1IsConsolidated = True
            
        #def asset2Consolidated(self, sender, consolidated):
            #self.Data2IsConsolidated = True
        
        def RemoveConsolidators(self, algorithm):
            algorithm.SubscriptionManager.RemoveConsolidator(self.asset1, self.consolidator1)
            algorithm.SubscriptionManager.RemoveConsolidator(self.asset2, self.consolidator2)
    
        
        def GetInsightGroup(self, algorithm):
            '''Gets the insights group for the pair
            Returns:
                Insights grouped by an unique group id'''
            # *** Add History Warmup
            
            if not (self.security1.IsTradable 
                    and self.security2.IsTradable
                    and self.security1.Price > 0
                    and self.security2.Price > 0):
                
                return []
                    
            
            
            ###self.ratio = np.log(self.asset1Price.Current.Value) / np.log(self.asset2Price.Current.Value)
            self.ratio = np.divide(self.asset1Price.Current.Value, self.asset2Price.Current.Value)
            self.spreadDeque.appendleft(self.ratio)  
            self.mean = np.mean(self.spreadDeque)
            #self.std = np.std(self.spreadDeque)
            #self.zscore = (self.ratio - self.mean) / self.std
            
            #self.Data1IsConsolidated = False
            #self.Data2IsConsolidated = False
            if len(self.spreadDeque) != self.period:
                return []
            
            
                
            if self.state is not self.State.LongRatio and self.ratio > (self.mean * self.upperThreshold):
                self.state = self.State.LongRatio
                # asset1/asset2 is more than 2 std away from mean, short asset1, long asset2
                shortAsset1 = Insight.Price(self.asset1, self.predictionInterval, InsightDirection.Down)
                longAsset2 = Insight.Price(self.asset2, self.predictionInterval, InsightDirection.Up)
                #algorithm.SetHoldings(self.asset1, -weight)
                #algorithm.SetHoldings(self.asset2, weight)
                
                # creates a group id and set the GroupId property on each insight object
                return Insight.Group(shortAsset1, longAsset2)
        
        
            # don't re-emit the same direction
            if self.state is not self.State.ShortRatio and self.ratio < (self.mean * self.lowerThreshold):
                self.state = self.State.ShortRatio
                # asset1/asset2 is less than 2 std away from mean, long asset1, short asset2
                longAsset1 = Insight.Price(self.asset1, self.predictionInterval, InsightDirection.Up)
                shortAsset2 = Insight.Price(self.asset2, self.predictionInterval, InsightDirection.Down)
                #algorithm.SetHoldings(self.asset1, weight)
                #algorithm.SetHoldings(self.asset2, -weight)
                # creates a group id and set the GroupId property on each insight object
                return Insight.Group(longAsset1, shortAsset2)
            
            return []
            
            
            
                
                
    ##
# QUANTCONNECT.COM - Democratizing Finance, Empowering Individuals.
# Lean Algorithmic Trading Engine v2.0. Copyright 2014 QuantConnect Corporation.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from collections import OrderedDict
from clr import AddReference
AddReference("QuantConnect.Common")
AddReference("QuantConnect.Algorithm.Framework")
AddReference("QuantConnect.Indicators")

from QuantConnect import *
from QuantConnect.Indicators import *
from QuantConnect.Algorithm.Framework.Alphas import *
from BasePairsFrameworkAlpha import BasePairsTradingAlphaModel
from datetime import timedelta
from scipy.stats import pearsonr
from statsmodels.tsa.stattools import coint, adfuller
import numpy as np
import pandas as pd

class CorrelationAndCointegrationPairsTrading(BasePairsTradingAlphaModel):
    ''' This alpha model is designed to rank every pair combination by its pearson corrDictelation 
    and trade the pair with the hightest corrDictelation
    This model generates alternating long ratio/short ratio insights emitted as a group'''

    def __init__(self, 
            lookback = 1080,
            resolution = Resolution.Daily,
            threshold = .1,
            minimumcorrelation = .99):
        '''Initializes a new instance of the PearsoncorrDictelationPairsTradingAlphaModel class
        Args:
            lookback: lookback period of the analysis
            resolution: analysis resolution
            threshold: The percent [0, 100] deviation of the ratio from the mean before emitting an insight
            minimumcorrDictelation: The minimum corrDictelation to consider a tradable pair'''
        super().__init__(lookback, resolution, threshold)
        self.lookback = lookback
        self.resolution = resolution
        self.minimumcorrelation = .85
        self.max_p_value = 1 - minimumcorrelation
        self.pairsList = list()
        self.DataIsConsolidated = False

    def OnSecuritiesChanged(self, algorithm, changes):
        '''Event fired each time the we add/remove securities from the data feed.
        Args:
            algorithm: The algorithm instance that experienced the change in securities
            changes: The security additions and removals from the algorithm'''
        
        
            
        
        self.pairsList = []
        symbols = [ x.Symbol for x in algorithm.ActiveSecurities.Values ]
        symbols.pop()
        pairsDict = dict()
        history = algorithm.History(symbols, self.lookback, self.resolution).close.unstack(level=0)
    
        if not history.empty:
            df = self.get_price_dataframe(history, algorithm)
            for i in range(0, len(df.columns)):
                for j in range(i+1, len(df.columns)):
                    pair = (i, j)
                    array1 = df.iloc[:,i]
                    array2 = df.iloc[:,j]
                    minhist = min(len(array1), len(array2))
                    spreadArray = np.divide(array1[:minhist], array2[:minhist])
                    IsFinite =  np.isfinite(spreadArray)
                    if not np.all(IsFinite):
                        continue
                    pearsonr_value = pearsonr(array1, array2)[0]
                    
                    if pearsonr_value == 1:
                        continue
                    if (pearsonr_value >= self.minimumcorrelation):
                        p_value = coint(array1, array2)[1]   
                        if p_value == 0:
                            continue
                        if (p_value <= self.max_p_value):
                            adfP = adfuller(spreadArray)[1]
                            if adfP == 0:
                                continue
                            if adfP < 0.95:
                                algorithm.Debug(str(p_value) + " = Coint P Value, " \
                                + str(adfP) + " = Adfuller P Value, " \
                                + str(pearsonr_value) + " = Pearson Value, " \
                                + str(df.columns[i]) + ", " \
                                + str(df.columns[j]))
                                algorithm.Log(str(p_value) + " = Coint P Value, " \
                                + str(adfP) + " = Adfuller P Value, " \
                                + str(pearsonr_value) + " = Pearson Value, " \
                                + str(df.columns[i]) + ", " \
                                + str(df.columns[j]))
                                self.pairsList.append( (df.columns[i], df.columns[j]) ) ##################
                        
            if len(self.pairsList) == 0:
                algorithm.Debug("No Pairs; Date: " + str(algorithm.UtcTime))
                algorithm.Log("No Pairs; Date: " + str(algorithm.UtcTime))
            else:
                algorithm.Debug("Number of pairs: " + str(len(self.pairsList)) + ", Date: " + str(algorithm.UtcTime))
                algorithm.Log("Number of pairs: " + str(len(self.pairsList)) + ", Date: " + str(algorithm.UtcTime))
                
            
        super().OnSecuritiesChanged(algorithm, changes)
        
        
    def HasPassedTest(self, algorithm, asset1, asset2):
        '''Check whether the assets pass a pairs trading test
        Args:
            algorithm: The algorithm instance that experienced the change in securities
            asset1: The first asset's symbol in the pair
            asset2: The second asset's symbol in the pair
        Returns:
            True if the statistical test for the pair is successful'''

        pair = (str(asset1), str(asset2))
        reverse = (str(asset2), str(asset1))
        if pair in self.pairsList \
        or reverse in self.pairsList:
            return True 
        return False
    
    def get_price_dataframe(self, df, algorithm):
        df = np.log(df)
        return (df - df.shift(1)).dropna()
# QUANTCONNECT.COM - Democratizing Finance, Empowering Individuals.
# Lean Algorithmic Trading Engine v2.0. Copyright 2014 QuantConnect Corporation.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

from clr import AddReference
AddReference("QuantConnect.Common")
AddReference("QuantConnect.Algorithm.Framework")

from QuantConnect import Resolution, Extensions
from QuantConnect.Algorithm.Framework.Alphas import *
from QuantConnect.Algorithm.Framework.Portfolio import *
from itertools import groupby
from datetime import datetime, timedelta
from pytz import utc
UTCMIN = datetime.min.replace(tzinfo=utc)

class PairsTradingPortfolioConstruction(PortfolioConstructionModel):
    '''Provides an implementation of IPortfolioConstructionModel that gives equal weighting to all securities.
    The target percent holdings of each security is 1/N where N is the number of securities.
    For insights of direction InsightDirection.Up, long targets are returned and
    for insights of direction InsightDirection.Down, short targets are returned.'''

    def __init__(self, rebalancingParam = Resolution.Daily):
        '''Initialize a new instance of EqualWeightingPortfolioConstructionModel
        Args:
            rebalancingParam: Rebalancing parameter. If it is a timedelta or Resolution, it will be converted into a function.
                              The function returns the next expected rebalance time for a given algorithm UTC DateTime'''
        self.insightCollection = InsightCollection()
        self.removedSymbols = []
        self.nextExpiryTime = UTCMIN
        self.rebalancingTime = UTCMIN
        self.rebalancingFunc = rebalancingParam

        # If the argument is an instance if Resolution or Timedelta
        # Redefine self.rebalancingFunc
        if isinstance(rebalancingParam, int):
            rebalancingParam = Extensions.ToTimeSpan(rebalancingParam)
        if isinstance(rebalancingParam, timedelta):
            self.rebalancingFunc = lambda dt: dt + rebalancingParam

    def ShouldCreateTargetForInsight(self, insight):
        '''Method that will determine if the portfolio construction model should create a
        target for this insight
        Args:
            insight: The insight to create a target for'''
        return True

    def DetermineTargetPercent(self, activeInsights):
        '''Will determine the target percent for each insight
        Args:
            activeInsights: The active insights to generate a target for'''
        result = {}

        # give equal weighting to each security
        count = sum(x.Direction != InsightDirection.Flat for x in activeInsights)
        percent = 0 if count == 0 else 0.95 / count
        for insight in activeInsights:
            result[insight] = insight.Direction * percent
        return result

    def CreateTargets(self, algorithm, insights):
        '''Create portfolio targets from the specified insights
        Args:
            algorithm: The algorithm instance
            insights: The insights to create portfolio targets from
        Returns:
            An enumerable of portfolio targets to be sent to the execution model'''

        targets = []

        if (algorithm.UtcTime <= self.nextExpiryTime and
            algorithm.UtcTime <= self.rebalancingTime and
            len(insights) == 0 and
            self.removedSymbols is None):
            return targets

        for insight in insights:
            if self.ShouldCreateTargetForInsight(insight):
                self.insightCollection.Add(insight)

        # Create flatten target for each security that was removed from the universe
        if self.removedSymbols is not None:
            universeDeselectionTargets = [ PortfolioTarget(symbol, 0) for symbol in self.removedSymbols ]
            targets.extend(universeDeselectionTargets)
            self.removedSymbols = None

        # Get insight that haven't expired of each symbol that is still in the universe
        activeInsights = self.insightCollection.GetActiveInsights(algorithm.UtcTime)

        # Get the last generated active insight for each symbol
        lastActiveInsights = []
        for symbol, g in groupby(activeInsights, lambda x: x.Symbol):
            lastActiveInsights.append(sorted(g, key = lambda x: x.GeneratedTimeUtc)[-1])

        # Determine target percent for the given insights
        percents = self.DetermineTargetPercent(lastActiveInsights)

        errorSymbols = {}
        for insight in lastActiveInsights:
            target = PortfolioTarget.Percent(algorithm, insight.Symbol, percents[insight])
            if not target is None:
                targets.append(target)
            else:
                errorSymbols[insight.Symbol] = insight.Symbol

        # Get expired insights and create flatten targets for each symbol
        expiredInsights = self.insightCollection.RemoveExpiredInsights(algorithm.UtcTime)

        expiredTargets = []
        for symbol, f in groupby(expiredInsights, lambda x: x.Symbol):
            if not self.insightCollection.HasActiveInsights(symbol, algorithm.UtcTime) and not symbol in errorSymbols:
                expiredTargets.append(PortfolioTarget(symbol, 0))
                continue

        targets.extend(expiredTargets)

        self.nextExpiryTime = self.insightCollection.GetNextExpiryTime()
        if self.nextExpiryTime is None:
            self.nextExpiryTime = UTCMIN

        self.rebalancingTime = self.rebalancingFunc(algorithm.UtcTime)

        return targets

    def OnSecuritiesChanged(self, algorithm, changes):
        '''Event fired each time the we add/remove securities from the data feed
        Args:
            algorithm: The algorithm instance that experienced the change in securities
            changes: The security additions and removals from the algorithm'''

        # Get removed symbol and invalidate them in the insight collection
        self.removedSymbols = [x.Symbol for x in changes.RemovedSecurities]
        self.insightCollection.Clear(self.removedSymbols)
# QUANTCONNECT.COM - Democratizing Finance, Empowering Individuals.
# Lean Algorithmic Trading Engine v2.0. Copyright 2014 QuantConnect Corporation.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

from clr import AddReference
AddReference("System")
AddReference("QuantConnect.Common")
AddReference("QuantConnect.Algorithm")
AddReference("QuantConnect.Algorithm.Framework")
import numpy as np
from System import *
from QuantConnect import *
from QuantConnect.Orders import *
from QuantConnect.Algorithm import *
from QuantConnect.Algorithm.Framework import *
from QuantConnect.Algorithm.Framework.Execution import *
from QuantConnect.Algorithm.Framework.Portfolio import *

class PairsTradingOrderExecution(ExecutionModel):
    '''Provides an implementation of IExecutionModel that immediately submits market orders to achieve the desired portfolio targets'''

    def __init__(self):
        '''Initializes a new instance of the ImmediateExecutionModel class'''
        self.targetsCollection = PortfolioTargetCollection()

    def Execute(self, algorithm, targets):
        '''Immediately submits orders for the specified portfolio targets.
        Args:
            algorithm: The algorithm instance
            targets: The portfolio targets to be ordered'''

        # for performance we check count value, OrderByMarginImpact and ClearFulfilled are expensive to call
        self.targetsCollection.AddRange(targets)
        if self.targetsCollection.Count > 0:
            for target in self.targetsCollection.OrderByMarginImpact(algorithm):
                open_quantity = sum([x.Quantity - x.QuantityFilled for x in algorithm.Transactions.GetOpenOrderTickets(target.Symbol)])
                existing = algorithm.Securities[target.Symbol].Holdings.Quantity + open_quantity
                quantity = int(target.Quantity - existing)
                if np.absolute(quantity) > 1:
                    try: algorithm.MarketOrder(target.Symbol, quantity)
                    except: pass
            self.targetsCollection.ClearFulfilled(algorithm)
            
            
            
            
            
            
            
            #
from Selection.FundamentalUniverseSelectionModel import FundamentalUniverseSelectionModel

class TechnologyUniverseModule(FundamentalUniverseSelectionModel):
    '''
    This module selects the most liquid stocks listed on the Nasdaq Stock Exchange.
    '''

    def __init__(self, filterFineData = True, universeSettings = None, securityInitializer = None):
        '''Initializes a new default instance of the TechnologyUniverseModule'''
        super().__init__(filterFineData, universeSettings, securityInitializer)
        self.numberOfSymbolsCoarse = 1000
        self.numberOfSymbolsFine = 500
        self.dollarVolumeBySymbol = {}
        self.symbols = []
        self.lastMonth = -1

    def SelectCoarse(self, algorithm, coarse):
        '''
        Performs a coarse selection:
        
        -The stock must have fundamental data
        -The stock must have positive previous-day close price
        -The stock must have positive volume on the previous trading day
        '''
        if algorithm.Time.month == self.lastMonth: 
            return symbols

        filtered = [x for x in coarse if x.HasFundamentalData and x.Volume > 0 and x.Price > 0]
        sortedByDollarVolume = sorted(filtered, key = lambda x: x.DollarVolume, reverse=True)[:self.numberOfSymbolsCoarse]

        self.dollarVolumeBySymbol.clear()
        for x in sortedByDollarVolume:
            symbols.append(x.Symbol)
            self.dollarVolumeBySymbol[x.Symbol] = x.DollarVolume

        return symbols

    def SelectFine(self, algorithm, fine):
        '''
        Performs a fine selection:
        
        -The company's headquarter must in the U.S.
        -The stock must be traded on the NASDAQ stock exchange
        -At least half a year since its initial public offering
        -The stock must be in the Industry Template Code catagory N
        '''
        if algorithm.Time.month == self.lastMonth: 
            return self.symbols
        self.lastMonth = algorithm.Time.month

        # Filter stocks
        filteredFine = [x for x in fine if x.CompanyReference.CountryId == "USA"
                                        #and (x.CompanyReference.PrimaryExchangeID == "NAS")
                                        and (algorithm.Time - x.SecurityReference.IPODate).days > 180
                                        and x.CompanyReference.IndustryTemplateCode == "N"]

        sortedByDollarVolume = []

        # Sort stocks on dollar volume
        sortedByDollarVolume = sorted(filteredFine, key = lambda x: self.dollarVolumeBySymbol[x.Symbol], reverse=True)
        
        self.symbols = [x.Symbol for x in sortedByDollarVolume[:self.numberOfSymbolsFine]]
        
        return self.symbols
from Execution.ImmediateExecutionModel import ImmediateExecutionModel
from Portfolio.EqualWeightingPortfolioConstructionModel import EqualWeightingPortfolioConstructionModel
#from CorrelatedPairsTrading import CorrelationAndCointegrationPairsTrading
from PairsUniverseSelection import PairsUniverseSelection, CorrelationAndCointegrationPairsTrading
from PairsTradingPortfolioConstruction import PairsTradingPortfolioConstruction
from PairsTradingOrderExecution import PairsTradingOrderExecution
from Selection.UncorrelatedUniverseSelectionModel import UncorrelatedUniverseSelectionModel

class NadionParticleGearbox(QCAlgorithm):

    def Initialize(self):
        self.SetStartDate(2019, 1, 1)  # Set Start Date
        #self.SetEndDate(2018, 12, 31)  # Set Start Date
        self.SetCash(2e4)  # Set Strategy Cash ** min cash for alpha streams is 1M/1e6
        
        '''
        Universes:                                                
        LiquidETFUniverse
        TechnologyUniverseModule
        VolatilityETFUniverse
        QC500UniverseSelectionModel
        UncorrelatedUniverseSelectionModel
        '''
        self.SetUniverseSelection(PairsUniverseSelection())
        self.UniverseSettings.Resolution = Resolution.Minute                        # Universe Settings
        benchmark = self.AddEquity("SPY").Symbol                                    # Benchmark Symbol
        self.SetBenchmark(benchmark)                                                # Setting Benchmark
        
        self.AddAlpha(CorrelationAndCointegrationPairsTrading())                    # Alpha Model
        self.SetPortfolioConstruction(PairsTradingPortfolioConstruction())   # Portfolio Construction Model
        self.SetExecution(PairsTradingOrderExecution())
        #self.SetRiskManagement(TrailingStopRiskManagementModel(0.02))               # Add Risk Managment
        #  AlphaStreamsBrokerageModel
        # InteractiveBrokersBrokerageModel 
        self.SetBrokerageModel(InteractiveBrokersBrokerageModel ())                        # Brokerage Model (Alpha Streams is for Prime Brokerage)
        

        
        
        
        
        
        
        #