Overall Statistics |
Total Trades 64340 Average Win 0.15% Average Loss -0.06% Compounding Annual Return 735118.118% Drawdown 4.800% Expectancy 0.366 Net Profit 753271.388% Sharpe Ratio 16.957 Probabilistic Sharpe Ratio 100% Loss Rate 60% Win Rate 40% Profit-Loss Ratio 2.42 Alpha 7.475 Beta -0.09 Annual Standard Deviation 0.442 Annual Variance 0.196 Information Ratio 13.264 Tracking Error 0.589 Treynor Ratio -83 Total Fees $63307522.66 |
from Selection.FundamentalUniverseSelectionModel import FundamentalUniverseSelectionModel import pprint from datetime import timedelta from scipy.stats import spearmanr from statsmodels.tsa.stattools import coint, adfuller import numpy as np import pandas as pd from clr import AddReference AddReference("QuantConnect.Common") AddReference("QuantConnect.Algorithm.Framework") AddReference("QuantConnect.Indicators") from QuantConnect import * from QuantConnect.Indicators import * from QuantConnect.Algorithm.Framework.Alphas import * from statsmodels.distributions.empirical_distribution import ECDF from statsmodels.tsa.vector_ar.vecm import coint_johansen GlobalPairsList = [] class PairsUniverseSelection(FundamentalUniverseSelectionModel): def __init__(self, filterFineData = True, universeSettings = None, securityInitializer = None): '''Initializes a new default instance of the TechnologyUniverseModule''' super().__init__(filterFineData, universeSettings, securityInitializer) self.numberOfSymbolsCoarse = 3000 self.numberOfSymbolsFine = 1000 self.dollarVolumeBySymbol = {} self.lastMonth = -1 self.pairsList = list() self.resolution = Resolution.Daily self.minCorr = 0.75 self.maxAdfP = 1e-10 #1e-15 self.maxCointP = 1e-10 #1e-15 self.lookback = 252 * 3 self.max_pairs = 6 def PairsTest(self, algorithm, history): df = self.get_returns_dataframe(history, algorithm) pairs = list() symbols = list() corr = df.corr('spearman').unstack() corr = corr.sort_values(kind='quicksort', ascending=False,)[corr < 1][corr>=self.minCorr] for i in range(0, len(corr)): pair = corr.index[i] if corr.index[i] in pairs or (corr.index[i][1], corr.index[i][0]) in pairs: continue if coint( df.loc[:,pair[0]], df.loc[:,pair[1]] )[1] <= self.maxCointP: pairs.append(pair) for symbol in pair: if not symbol in symbols: symbols.append(symbol) if len(pairs) >= self.max_pairs: return pairs, symbols return pairs, symbols def SelectCoarse(self, algorithm, coarse): ''' Performs a coarse selection: -The stock must have fundamental data -The stock must have positive previous-day close price -The stock must have positive volume on the previous trading day ''' if algorithm.Time.month == self.lastMonth: return Universe.Unchanged filtered = [x for x in coarse if x.HasFundamentalData and x.Volume > 0 and x.Price > 0] sortedByDollarVolume = sorted(filtered, key = lambda x: x.DollarVolume, reverse=True)[:self.numberOfSymbolsCoarse] symbols = [] self.dollarVolumeBySymbol.clear() for x in sortedByDollarVolume: symbols.append(x.Symbol) self.dollarVolumeBySymbol[x.Symbol] = x.DollarVolume return symbols def SelectFine(self, algorithm, fine): ''' Performs a fine selection: -The company's headquarter must in the U.S. -The stock must be traded on the NASDAQ stock exchange -At least half a year since its initial public offering -The stock must be in the Industry Template Code catagory N ''' if algorithm.Time.month == self.lastMonth: return Universe.Unchanged self.lastMonth = algorithm.Time.month # Filter stocks filteredFine = [x for x in fine if x.CompanyReference.CountryId == "USA" and ( (x.CompanyReference.PrimaryExchangeID == "NAS") or (x.CompanyReference.PrimaryExchangeID == "NYSE") ) and (algorithm.Time - x.SecurityReference.IPODate).days > 180 and x.CompanyReference.IndustryTemplateCode == "N" ] sortedByDollarVolume = [] # Sort stocks on dollar volume sortedByDollarVolume = sorted(filteredFine, key = lambda x: self.dollarVolumeBySymbol[x.Symbol], reverse=True) symbols = [x.Symbol for x in sortedByDollarVolume[:self.numberOfSymbolsFine]] #symbol_strings = list() #for symbol in symbols: # symbol_strings.append(str(symbol)) symbolsInPairs = list() GlobalPairsList.clear() history = algorithm.History(symbols, self.lookback, self.resolution) if history.empty: return symbolsInPairs Test = self.PairsTest(algorithm, history) GlobalPairsList.extend(Test[0]) ################## symbolsInPairs.extend(Test[1]) algorithm.Debug(GlobalPairsList) algorithm.Log(GlobalPairsList) algorithm.Debug(str(len(symbolsInPairs)) + " symbols in universe after") algorithm.Log(str(len(symbolsInPairs)) + " symbols in universe after on " + str(algorithm.UtcTime)) algorithm.Debug(str(len(GlobalPairsList)) + " Global Pairs List " + str(algorithm.UtcTime)) algorithm.Log(str(len(GlobalPairsList)) + " Global Pairs List " + str(algorithm.UtcTime)) return symbolsInPairs def get_returns_dataframe(self, history, algorithm): df = np.log(history.unstack(level = 0).close) df = 1 + pd.DataFrame(df).pct_change().dropna(axis=1, thresh = int(len(df.index) * .85)).dropna() return df from collections import OrderedDict from clr import AddReference AddReference("QuantConnect.Common") AddReference("QuantConnect.Algorithm.Framework") AddReference("QuantConnect.Indicators") from QuantConnect import * from QuantConnect.Indicators import * from QuantConnect.Algorithm.Framework.Alphas import * from BasePairsFrameworkAlpha import BasePairsTradingAlphaModel class CorrelationAndCointegrationPairsTrading(BasePairsTradingAlphaModel): ''' This alpha model is designed to rank every pair combination by its pearson corrDictelation and trade the pair with the hightest corrDictelation This model generates alternating long ratio/short ratio insights emitted as a group''' def __init__(self, lookback = 1080, resolution = Resolution.Daily, threshold = .1, minimumcorrelation = .99): '''Initializes a new instance of the PearsoncorrDictelationPairsTradingAlphaModel class Args: lookback: lookback period of the analysis resolution: analysis resolution threshold: The percent [0, 100] deviation of the ratio from the mean before emitting an insight minimumcorrDictelation: The minimum corrDictelation to consider a tradable pair''' super().__init__(lookback, resolution, threshold) self.lookback = lookback self.resolution = resolution self.minimumcorrelation = .75 self.max_p_value = 1 - minimumcorrelation self.pairsList = list() self.DataIsConsolidated = False def OnSecuritiesChanged(self, algorithm, changes): '''Event fired each time the we add/remove securities from the data feed. Args: algorithm: The algorithm instance that experienced the change in securities changes: The security additions and removals from the algorithm''' self.pairsList = GlobalPairsList if len(self.pairsList) == 0: algorithm.Debug("No Pairs; Date: " + str(algorithm.UtcTime)) algorithm.Log("No Pairs; Date: " + str(algorithm.UtcTime)) else: algorithm.Debug("Number of pairs: " + str(len(self.pairsList)) + ", Date: " + str(algorithm.UtcTime)) algorithm.Log("Number of pairs: " + str(len(self.pairsList)) + ", Date: " + str(algorithm.UtcTime)) super().OnSecuritiesChanged(algorithm, changes) def HasPassedTest(self, algorithm, asset1, asset2): '''Check whether the assets pass a pairs trading test Args: algorithm: The algorithm instance that experienced the change in securities asset1: The first asset's symbol in the pair asset2: The second asset's symbol in the pair Returns: True if the statistical test for the pair is successful''' pair = (str(asset1), str(asset2)) reverse = (str(asset2), str(asset1)) if pair in self.pairsList \ or reverse in self.pairsList: return True return False #
# QUANTCONNECT.COM - Democratizing Finance, Empowering Individuals. # Lean Algorithmic Trading Engine v2.0. Copyright 2014 QuantConnect Corporation. # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. # You may obtain a copy of the License at http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. from clr import AddReference AddReference("QuantConnect.Common") AddReference("QuantConnect.Algorithm") AddReference("QuantConnect.Algorithm.Framework") AddReference("QuantConnect.Indicators") from QuantConnect import * from QuantConnect.Indicators import * from QuantConnect.Algorithm import * from QuantConnect.Algorithm.Framework import * from QuantConnect.Algorithm.Framework.Alphas import * from datetime import timedelta from enum import Enum import numpy as np from collections import deque class BasePairsTradingAlphaModel(AlphaModel): '''This alpha model is designed to accept every possible pair combination from securities selected by the universe selection model This model generates alternating long ratio/short ratio insights emitted as a group''' def __init__(self, lookback = 252, resolution = Resolution.Daily, threshold = 1, predictionInterval = Time.Multiply(Extensions.ToTimeSpan(Resolution.Minute), 120)): ''' Initializes a new instance of the PairsTradingAlphaModel class Args: lookback: Lookback self.period of the analysis resolution: Analysis resolution threshold: The percent [0, 100] deviation of the ratio from the mean before emitting an insight''' self.lookback = lookback self.resolution = resolution self.threshold = threshold self.predictionInterval = predictionInterval self.consolidated = False self.pairs = dict() self.Securities = list() resolutionString = Extensions.GetEnumString(resolution, Resolution) self.Name = f'{self.__class__.__name__}({self.lookback},{resolutionString},{Extensions.NormalizeToStr(threshold)})' def Update(self, algorithm, data): ''' Updates this alpha model with the latest data from the algorithm. This is called each time the algorithm receives data for subscribed securities Args: algorithm: The algorithm instance data: The new data available Returns: The new insights generated''' insights = [] for key, pair in self.pairs.items(): insights.extend(pair.GetInsightGroup(algorithm)) return insights def OnSecuritiesChanged(self, algorithm, changes): '''Event fired each time the we add/remove securities from the data feed. Args: algorithm: The algorithm instance that experienced the change in securities changes: The security additions and removals from the algorithm''' self.UpdatePairs(algorithm) def UpdatePairs(self, algorithm): symbols = sorted([x.Symbol for x in algorithm.ActiveSecurities.Values], key=lambda x: str(x.ID)) self.pairs = {} for i in range(0, len(symbols)): asset_i = symbols[i] for j in range(1 + i, len(symbols)): asset_j = symbols[j] pair_symbol = (asset_i, asset_j) invert = (asset_j, asset_i) if pair_symbol in self.pairs or invert in self.pairs: continue if not self.HasPassedTest(algorithm, asset_i, asset_j): continue pair = self.Pair(algorithm, asset_i, asset_j, self.predictionInterval, self.threshold) self.pairs[pair_symbol] = pair def HasPassedTest(self, algorithm, asset1, asset2): '''Check whether the assets pass a pairs trading test Args: algorithm: The algorithm instance that experienced the change in securities asset1: The first asset's symbol in the pair asset2: The second asset's symbol in the pair Returns: True if the statistical test for the pair is successful''' return True class Pair: class State(Enum): ShortRatio = -1 FlatRatio = 0 LongRatio = 1 def __init__(self, algorithm, asset1, asset2, predictionInterval, threshold): '''Create a new pair Args: algorithm: The algorithm instance that experienced the change in securities asset1: The first asset's symbol in the pair asset2: The second asset's symbol in the pair predictionInterval: self.period over which this insight is expected to come to fruition threshold: The percent [0, 100] deviation of the ratio from the mean before emitting an insight''' self.state = self.State.FlatRatio self.period = 500 self.barsize = 1 consolidatorLength = timedelta(minutes=self.barsize) self.pair = (asset1, asset2) tradeBarConsolidator1 = TradeBarConsolidator(consolidatorLength) tradeBarConsolidator2 = TradeBarConsolidator(consolidatorLength) algorithm.SubscriptionManager.AddConsolidator(asset1, tradeBarConsolidator1) algorithm.SubscriptionManager.AddConsolidator(asset2, tradeBarConsolidator2) self.asset1 = asset1 self.asset2 = asset2 self.spreadDeque = deque(maxlen=self.period) self.asset1Price = SimpleMovingAverage(1) self.asset2Price = SimpleMovingAverage(1) algorithm.RegisterIndicator(asset1, self.asset1Price, tradeBarConsolidator1) algorithm.RegisterIndicator(asset2, self.asset2Price, tradeBarConsolidator2) tradeBarConsolidator1.DataConsolidated += self.asset1Consolidated tradeBarConsolidator2.DataConsolidated += self.asset2Consolidated self.upperThreshold = 1 + (threshold / 100) self.lowerThreshold = 1 - (threshold / 100) self.count = 0 self.predictionInterval = predictionInterval self.Data1IsConsolidated = False self.Data2IsConsolidated = False hist1 = algorithm.History(asset1, self.period * self.barsize, Resolution.Minute) hist2 = algorithm.History(asset2, self.period * self.barsize, Resolution.Minute) if not 'close' in hist1: return if not 'close' in hist2: return ###bar1 = np.log(np.array(hist1['close'][0::self.barsize])) bar1 = np.array(hist1['close'][0::self.barsize]) ###bar2 = np.log(np.array(hist2['close'][0::self.barsize])) bar2 = np.array(hist2['close'][0::self.barsize]) minhist = min(len(bar1), len(bar2)) self.spread = np.divide(bar1[:minhist], bar2[:minhist]) for i in range (0, len(self.spread)): self.spreadDeque.appendleft(self.spread[i]) self.mean = np.mean(self.spreadDeque) self.std = np.std(self.spreadDeque) self.ratio = None def asset1Consolidated(self, sender, consolidated): self.Data1IsConsolidated = True def asset2Consolidated(self, sender, consolidated): self.Data2IsConsolidated = True def GetInsightGroup(self, algorithm): '''Gets the insights group for the pair Returns: Insights grouped by an unique group id''' # *** Add History Warmup if not (self.Data1IsConsolidated & self.Data2IsConsolidated): return [] ###self.ratio = np.log(self.asset1Price.Current.Value) / np.log(self.asset2Price.Current.Value) self.ratio = self.asset1Price.Current.Value / self.asset2Price.Current.Value self.spreadDeque.appendleft(self.ratio) self.mean = np.mean(self.spreadDeque) self.std = np.std(self.spreadDeque) self.zscore = (self.ratio - self.mean) / self.std self.Data1IsConsolidated = False self.Data2IsConsolidated = False if len(self.spreadDeque) != self.period: return [] if self.state is not self.State.LongRatio and self.ratio > (self.mean * self.upperThreshold): self.state = self.State.LongRatio # asset1/asset2 is more than 2 std away from mean, short asset1, long asset2 shortAsset1 = Insight.Price(self.asset1, self.predictionInterval, InsightDirection.Down) longAsset2 = Insight.Price(self.asset2, self.predictionInterval, InsightDirection.Up) #algorithm.SetHoldings(self.asset1, -weight) #algorithm.SetHoldings(self.asset2, weight) # creates a group id and set the GroupId property on each insight object return Insight.Group(shortAsset1, longAsset2) # don't re-emit the same direction if self.state is not self.State.ShortRatio and self.ratio < (self.mean * self.lowerThreshold): self.state = self.State.ShortRatio # asset1/asset2 is less than 2 std away from mean, long asset1, short asset2 longAsset1 = Insight.Price(self.asset1, self.predictionInterval, InsightDirection.Up) shortAsset2 = Insight.Price(self.asset2, self.predictionInterval, InsightDirection.Down) #algorithm.SetHoldings(self.asset1, weight) #algorithm.SetHoldings(self.asset2, -weight) # creates a group id and set the GroupId property on each insight object return Insight.Group(longAsset1, shortAsset2) # don't re-emit the same direction if self.state is self.State.ShortRatio and self.ratio > self.mean: self.state = self.State.FlatRatio # asset1/asset2 is less than 2 std away from mean, long asset1, short asset2 neutralasset1 = Insight.Price(self.asset1, self.predictionInterval, InsightDirection.Flat) neutralasset2 = Insight.Price(self.asset2, self.predictionInterval, InsightDirection.Flat) #algorithm.SetHoldings(self.asset1, 0) #algorithm.SetHoldings(self.asset2, 0) # creates a group id and set the GroupId property on each insight object return []#Insight.Group(neutralasset1, neutralasset2) # don't re-emit the same direction if self.state is self.State.LongRatio and self.ratio < self.mean: self.state = self.State.FlatRatio # asset1/asset2 is less than 2 std away from mean, long asset1, short asset2 neutralasset1 = Insight.Price(self.asset1, self.predictionInterval, InsightDirection.Flat) neutralasset2 = Insight.Price(self.asset2, self.predictionInterval, InsightDirection.Flat) #algorithm.SetHoldings(self.asset1, 0) #algorithm.SetHoldings(self.asset2, 0) # creates a group id and set the GroupId property on each insight object return []#Insight.Group(neutralasset1, neutralasset2) return [] ##
# QUANTCONNECT.COM - Democratizing Finance, Empowering Individuals. # Lean Algorithmic Trading Engine v2.0. Copyright 2014 QuantConnect Corporation. # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. # You may obtain a copy of the License at http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. from clr import AddReference AddReference("QuantConnect.Common") AddReference("QuantConnect.Algorithm") AddReference("QuantConnect.Algorithm.Framework") AddReference("QuantConnect.Indicators") from QuantConnect import * from QuantConnect.Indicators import * from QuantConnect.Algorithm import * from QuantConnect.Algorithm.Framework import * from QuantConnect.Algorithm.Framework.Alphas import * from datetime import timedelta from enum import Enum import numpy as np from collections import deque class BasePairsTradingAlphaModel(AlphaModel): '''This alpha model is designed to accept every possible pair combination from securities selected by the universe selection model This model generates alternating long ratio/short ratio insights emitted as a group''' def __init__(self, lookback = 252, resolution = Resolution.Daily, threshold = .4, predictionInterval = Time.Multiply(Extensions.ToTimeSpan(Resolution.Minute), 1000)): ''' Initializes a new instance of the PairsTradingAlphaModel class Args: lookback: Lookback self.period of the analysis resolution: Analysis resolution threshold: The percent [0, 100] deviation of the ratio from the mean before emitting an insight''' self.lookback = lookback self.resolution = resolution self.threshold = threshold self.predictionInterval = predictionInterval self.consolidated = False self.pairs = dict() self.Securities = list() resolutionString = Extensions.GetEnumString(resolution, Resolution) self.Name = f'{self.__class__.__name__}({self.lookback},{resolutionString},{Extensions.NormalizeToStr(threshold)})' def Update(self, algorithm, data): ''' Updates this alpha model with the latest data from the algorithm. This is called each time the algorithm receives data for subscribed securities Args: algorithm: The algorithm instance data: The new data available Returns: The new insights generated''' insights = [] for key, pair in self.pairs.items(): insights.extend(pair.GetInsightGroup(algorithm)) return insights def OnSecuritiesChanged(self, algorithm, changes): '''Event fired each time the we add/remove securities from the data feed. Args: algorithm: The algorithm instance that experienced the change in securities changes: The security additions and removals from the algorithm''' self.UpdatePairs(algorithm, changes) def UpdatePairs(self, algorithm, changes): symbols = sorted([x.Symbol for x in algorithm.ActiveSecurities.Values], key=lambda x: str(x.ID)) for symbol in changes.RemovedSecurities: if symbol in symbols: symbols.remove(symbol) # Remove pair from dict and its consolidators from the algorithm pair = self.pairs.pop(symbol, None) if pair is not None: algorithm.Log(f"Removing Consolidators {algorithm.UtcTime}") algorithm.Debug(f"Removing Consolidators {algorithm.UtcTime}") pair.RemoveConsolidators(algorithm) for i in range(0, len(symbols)): asset_i = symbols[i] for j in range(1 + i, len(symbols)): asset_j = symbols[j] pair_symbol = (asset_i, asset_j) invert = (asset_j, asset_i) if pair_symbol in self.pairs or invert in self.pairs: continue if not self.HasPassedTest(algorithm, asset_i, asset_j): continue pair = self.Pair(algorithm, asset_i, asset_j, self.predictionInterval, self.threshold) self.pairs[pair_symbol] = pair def HasPassedTest(self, algorithm, asset1, asset2): '''Check whether the assets pass a pairs trading test Args: algorithm: The algorithm instance that experienced the change in securities asset1: The first asset's symbol in the pair asset2: The second asset's symbol in the pair Returns: True if the statistical test for the pair is successful''' return True class Pair: class State(Enum): ShortRatio = -1 FlatRatio = 0 LongRatio = 1 def __init__(self, algorithm, asset1, asset2, predictionInterval, threshold): '''Create a new pair Args: algorithm: The algorithm instance that experienced the change in securities asset1: The first asset's symbol in the pair asset2: The second asset's symbol in the pair predictionInterval: self.period over which this insight is expected to come to fruition threshold: The percent [0, 100] deviation of the ratio from the mean before emitting an insight''' self.state = self.State.FlatRatio self.period = 500 self.barsize = 1 consolidatorLength = timedelta(minutes=self.barsize) self.pair = (asset1, asset2) self.consolidator1 = TradeBarConsolidator(consolidatorLength) self.consolidator2 = TradeBarConsolidator(consolidatorLength) #algorithm.SubscriptionManager.AddConsolidator(asset1, tradeBarConsolidator1) #algorithm.SubscriptionManager.AddConsolidator(asset2, tradeBarConsolidator2) self.asset1 = asset1 self.asset2 = asset2 self.security1 = algorithm.Securities[asset1] self.security2 = algorithm.Securities[asset2] self.spreadDeque = deque(maxlen=self.period) self.asset1Price = Identity(asset1) self.asset2Price = Identity(asset2) algorithm.RegisterIndicator(self.asset1, self.asset1Price, self.consolidator1) algorithm.RegisterIndicator(self.asset2, self.asset2Price, self.consolidator2) #tradeBarConsolidator1.DataConsolidated += self.asset1Consolidated #tradeBarConsolidator2.DataConsolidated += self.asset2Consolidated self.upperThreshold = 1 + (threshold / 100) self.lowerThreshold = 1 - (threshold / 100) self.count = 0 self.predictionInterval = predictionInterval #self.Data1IsConsolidated = False #self.Data2IsConsolidated = False hist1 = algorithm.History(asset1, self.period * self.barsize, Resolution.Minute) hist2 = algorithm.History(asset2, self.period * self.barsize, Resolution.Minute) if not 'close' in hist1: return if not 'close' in hist2: return ###bar1 = np.log(np.array(hist1['close'][0::self.barsize])) bar1 = np.array(hist1['close']) ###bar2 = np.log(np.array(hist2['close'][0::self.barsize])) bar2 = np.array(hist2['close']) minhist = min(len(bar1), len(bar2)) self.spread = np.divide(bar1[:minhist], bar2[:minhist]) for i in range (0, len(self.spread)): self.spreadDeque.appendleft(self.spread[i]) self.mean = np.mean(self.spreadDeque) #self.std = np.std(self.spreadDeque) self.ratio = None #def asset1Consolidated(self, sender, consolidated): #self.Data1IsConsolidated = True #def asset2Consolidated(self, sender, consolidated): #self.Data2IsConsolidated = True def RemoveConsolidators(self, algorithm): algorithm.SubscriptionManager.RemoveConsolidator(self.asset1, self.consolidator1) algorithm.SubscriptionManager.RemoveConsolidator(self.asset2, self.consolidator2) def GetInsightGroup(self, algorithm): '''Gets the insights group for the pair Returns: Insights grouped by an unique group id''' # *** Add History Warmup if not (self.security1.IsTradable and self.security2.IsTradable): return [] ###self.ratio = np.log(self.asset1Price.Current.Value) / np.log(self.asset2Price.Current.Value) self.ratio = np.divide(self.asset1Price.Current.Value, self.asset2Price.Current.Value) self.spreadDeque.appendleft(self.ratio) self.mean = np.mean(self.spreadDeque) #self.std = np.std(self.spreadDeque) #self.zscore = (self.ratio - self.mean) / self.std #self.Data1IsConsolidated = False #self.Data2IsConsolidated = False if len(self.spreadDeque) != self.period: return [] if self.state is not self.State.LongRatio and self.ratio > (self.mean * self.upperThreshold): self.state = self.State.LongRatio # asset1/asset2 is more than 2 std away from mean, short asset1, long asset2 shortAsset1 = Insight.Price(self.asset1, self.predictionInterval, InsightDirection.Down) longAsset2 = Insight.Price(self.asset2, self.predictionInterval, InsightDirection.Up) #algorithm.SetHoldings(self.asset1, -weight) #algorithm.SetHoldings(self.asset2, weight) # creates a group id and set the GroupId property on each insight object return Insight.Group(shortAsset1, longAsset2) # don't re-emit the same direction if self.state is not self.State.ShortRatio and self.ratio < (self.mean * self.lowerThreshold): self.state = self.State.ShortRatio # asset1/asset2 is less than 2 std away from mean, long asset1, short asset2 longAsset1 = Insight.Price(self.asset1, self.predictionInterval, InsightDirection.Up) shortAsset2 = Insight.Price(self.asset2, self.predictionInterval, InsightDirection.Down) #algorithm.SetHoldings(self.asset1, weight) #algorithm.SetHoldings(self.asset2, -weight) # creates a group id and set the GroupId property on each insight object return Insight.Group(longAsset1, shortAsset2) return [] ##
# QUANTCONNECT.COM - Democratizing Finance, Empowering Individuals. # Lean Algorithmic Trading Engine v2.0. Copyright 2014 QuantConnect Corporation. # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. # You may obtain a copy of the License at http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. from collections import OrderedDict from clr import AddReference AddReference("QuantConnect.Common") AddReference("QuantConnect.Algorithm.Framework") AddReference("QuantConnect.Indicators") from QuantConnect import * from QuantConnect.Indicators import * from QuantConnect.Algorithm.Framework.Alphas import * from BasePairsFrameworkAlpha import BasePairsTradingAlphaModel from datetime import timedelta from scipy.stats import pearsonr from statsmodels.tsa.stattools import coint, adfuller import numpy as np import pandas as pd class CorrelationAndCointegrationPairsTrading(BasePairsTradingAlphaModel): ''' This alpha model is designed to rank every pair combination by its pearson corrDictelation and trade the pair with the hightest corrDictelation This model generates alternating long ratio/short ratio insights emitted as a group''' def __init__(self, lookback = 1080, resolution = Resolution.Daily, threshold = .1, minimumcorrelation = .99): '''Initializes a new instance of the PearsoncorrDictelationPairsTradingAlphaModel class Args: lookback: lookback period of the analysis resolution: analysis resolution threshold: The percent [0, 100] deviation of the ratio from the mean before emitting an insight minimumcorrDictelation: The minimum corrDictelation to consider a tradable pair''' super().__init__(lookback, resolution, threshold) self.lookback = lookback self.resolution = resolution self.minimumcorrelation = .85 self.max_p_value = 1 - minimumcorrelation self.pairsList = list() self.DataIsConsolidated = False def OnSecuritiesChanged(self, algorithm, changes): '''Event fired each time the we add/remove securities from the data feed. Args: algorithm: The algorithm instance that experienced the change in securities changes: The security additions and removals from the algorithm''' self.pairsList = [] symbols = [ x.Symbol for x in algorithm.ActiveSecurities.Values ] symbols.pop() pairsDict = dict() history = algorithm.History(symbols, self.lookback, self.resolution).close.unstack(level=0) if not history.empty: df = self.get_price_dataframe(history, algorithm) for i in range(0, len(df.columns)): for j in range(i+1, len(df.columns)): pair = (i, j) array1 = df.iloc[:,i] array2 = df.iloc[:,j] minhist = min(len(array1), len(array2)) spreadArray = np.divide(array1[:minhist], array2[:minhist]) IsFinite = np.isfinite(spreadArray) if not np.all(IsFinite): continue pearsonr_value = pearsonr(array1, array2)[0] if pearsonr_value == 1: continue if (pearsonr_value >= self.minimumcorrelation): p_value = coint(array1, array2)[1] if p_value == 0: continue if (p_value <= self.max_p_value): adfP = adfuller(spreadArray)[1] if adfP == 0: continue if adfP < 0.95: algorithm.Debug(str(p_value) + " = Coint P Value, " \ + str(adfP) + " = Adfuller P Value, " \ + str(pearsonr_value) + " = Pearson Value, " \ + str(df.columns[i]) + ", " \ + str(df.columns[j])) algorithm.Log(str(p_value) + " = Coint P Value, " \ + str(adfP) + " = Adfuller P Value, " \ + str(pearsonr_value) + " = Pearson Value, " \ + str(df.columns[i]) + ", " \ + str(df.columns[j])) self.pairsList.append( (df.columns[i], df.columns[j]) ) ################## if len(self.pairsList) == 0: algorithm.Debug("No Pairs; Date: " + str(algorithm.UtcTime)) algorithm.Log("No Pairs; Date: " + str(algorithm.UtcTime)) else: algorithm.Debug("Number of pairs: " + str(len(self.pairsList)) + ", Date: " + str(algorithm.UtcTime)) algorithm.Log("Number of pairs: " + str(len(self.pairsList)) + ", Date: " + str(algorithm.UtcTime)) super().OnSecuritiesChanged(algorithm, changes) def HasPassedTest(self, algorithm, asset1, asset2): '''Check whether the assets pass a pairs trading test Args: algorithm: The algorithm instance that experienced the change in securities asset1: The first asset's symbol in the pair asset2: The second asset's symbol in the pair Returns: True if the statistical test for the pair is successful''' pair = (str(asset1), str(asset2)) reverse = (str(asset2), str(asset1)) if pair in self.pairsList \ or reverse in self.pairsList: return True return False def get_price_dataframe(self, df, algorithm): df = np.log(df) return (df - df.shift(1)).dropna()
# QUANTCONNECT.COM - Democratizing Finance, Empowering Individuals. # Lean Algorithmic Trading Engine v2.0. Copyright 2014 QuantConnect Corporation. # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. # You may obtain a copy of the License at http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. from clr import AddReference AddReference("QuantConnect.Common") AddReference("QuantConnect.Algorithm.Framework") from QuantConnect import Resolution, Extensions from QuantConnect.Algorithm.Framework.Alphas import * from QuantConnect.Algorithm.Framework.Portfolio import * from itertools import groupby from datetime import datetime, timedelta from pytz import utc UTCMIN = datetime.min.replace(tzinfo=utc) class PairsTradingPortfolioConstruction(PortfolioConstructionModel): '''Provides an implementation of IPortfolioConstructionModel that gives equal weighting to all securities. The target percent holdings of each security is 1/N where N is the number of securities. For insights of direction InsightDirection.Up, long targets are returned and for insights of direction InsightDirection.Down, short targets are returned.''' def __init__(self, rebalancingParam = Resolution.Daily): '''Initialize a new instance of EqualWeightingPortfolioConstructionModel Args: rebalancingParam: Rebalancing parameter. If it is a timedelta or Resolution, it will be converted into a function. The function returns the next expected rebalance time for a given algorithm UTC DateTime''' self.insightCollection = InsightCollection() self.removedSymbols = [] self.nextExpiryTime = UTCMIN self.rebalancingTime = UTCMIN self.rebalancingFunc = rebalancingParam # If the argument is an instance if Resolution or Timedelta # Redefine self.rebalancingFunc if isinstance(rebalancingParam, int): rebalancingParam = Extensions.ToTimeSpan(rebalancingParam) if isinstance(rebalancingParam, timedelta): self.rebalancingFunc = lambda dt: dt + rebalancingParam def ShouldCreateTargetForInsight(self, insight): '''Method that will determine if the portfolio construction model should create a target for this insight Args: insight: The insight to create a target for''' return True def DetermineTargetPercent(self, activeInsights): '''Will determine the target percent for each insight Args: activeInsights: The active insights to generate a target for''' result = {} # give equal weighting to each security count = sum(x.Direction != InsightDirection.Flat for x in activeInsights) percent = 0 if count == 0 else 1.0 / count for insight in activeInsights: result[insight] = insight.Direction * percent return result def CreateTargets(self, algorithm, insights): '''Create portfolio targets from the specified insights Args: algorithm: The algorithm instance insights: The insights to create portfolio targets from Returns: An enumerable of portfolio targets to be sent to the execution model''' targets = [] if (algorithm.UtcTime <= self.nextExpiryTime and algorithm.UtcTime <= self.rebalancingTime and len(insights) == 0 and self.removedSymbols is None): return targets for insight in insights: if self.ShouldCreateTargetForInsight(insight): self.insightCollection.Add(insight) # Create flatten target for each security that was removed from the universe if self.removedSymbols is not None: universeDeselectionTargets = [ PortfolioTarget(symbol, 0) for symbol in self.removedSymbols ] targets.extend(universeDeselectionTargets) self.removedSymbols = None # Get insight that haven't expired of each symbol that is still in the universe activeInsights = self.insightCollection.GetActiveInsights(algorithm.UtcTime) # Get the last generated active insight for each symbol lastActiveInsights = [] for symbol, g in groupby(activeInsights, lambda x: x.Symbol): lastActiveInsights.append(sorted(g, key = lambda x: x.GeneratedTimeUtc)[-1]) # Determine target percent for the given insights percents = self.DetermineTargetPercent(lastActiveInsights) errorSymbols = {} for insight in lastActiveInsights: target = PortfolioTarget.Percent(algorithm, insight.Symbol, percents[insight]) if not target is None: targets.append(target) else: errorSymbols[insight.Symbol] = insight.Symbol # Get expired insights and create flatten targets for each symbol expiredInsights = self.insightCollection.RemoveExpiredInsights(algorithm.UtcTime) expiredTargets = [] for symbol, f in groupby(expiredInsights, lambda x: x.Symbol): if not self.insightCollection.HasActiveInsights(symbol, algorithm.UtcTime) and not symbol in errorSymbols: expiredTargets.append(PortfolioTarget(symbol, 0)) continue targets.extend(expiredTargets) self.nextExpiryTime = self.insightCollection.GetNextExpiryTime() if self.nextExpiryTime is None: self.nextExpiryTime = UTCMIN self.rebalancingTime = self.rebalancingFunc(algorithm.UtcTime) return targets def OnSecuritiesChanged(self, algorithm, changes): '''Event fired each time the we add/remove securities from the data feed Args: algorithm: The algorithm instance that experienced the change in securities changes: The security additions and removals from the algorithm''' # Get removed symbol and invalidate them in the insight collection self.removedSymbols = [x.Symbol for x in changes.RemovedSecurities] self.insightCollection.Clear(self.removedSymbols)
# QUANTCONNECT.COM - Democratizing Finance, Empowering Individuals. # Lean Algorithmic Trading Engine v2.0. Copyright 2014 QuantConnect Corporation. # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. # You may obtain a copy of the License at http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. from clr import AddReference AddReference("System") AddReference("QuantConnect.Common") AddReference("QuantConnect.Algorithm") AddReference("QuantConnect.Algorithm.Framework") import numpy as np from System import * from QuantConnect import * from QuantConnect.Orders import * from QuantConnect.Algorithm import * from QuantConnect.Algorithm.Framework import * from QuantConnect.Algorithm.Framework.Execution import * from QuantConnect.Algorithm.Framework.Portfolio import * class PairsTradingOrderExecution(ExecutionModel): '''Provides an implementation of IExecutionModel that immediately submits market orders to achieve the desired portfolio targets''' def __init__(self): '''Initializes a new instance of the ImmediateExecutionModel class''' self.targetsCollection = PortfolioTargetCollection() def Execute(self, algorithm, targets): '''Immediately submits orders for the specified portfolio targets. Args: algorithm: The algorithm instance targets: The portfolio targets to be ordered''' # for performance we check count value, OrderByMarginImpact and ClearFulfilled are expensive to call self.targetsCollection.AddRange(targets) if self.targetsCollection.Count > 0: for target in self.targetsCollection.OrderByMarginImpact(algorithm): open_quantity = sum([x.Quantity - x.QuantityFilled for x in algorithm.Transactions.GetOpenOrderTickets(target.Symbol)]) existing = algorithm.Securities[target.Symbol].Holdings.Quantity + open_quantity quantity = int(target.Quantity - existing) if np.absolute(quantity) > 1: try: algorithm.MarketOrder(target.Symbol, quantity) except: pass self.targetsCollection.ClearFulfilled(algorithm) #
from Selection.FundamentalUniverseSelectionModel import FundamentalUniverseSelectionModel class TechnologyUniverseModule(FundamentalUniverseSelectionModel): ''' This module selects the most liquid stocks listed on the Nasdaq Stock Exchange. ''' def __init__(self, filterFineData = True, universeSettings = None, securityInitializer = None): '''Initializes a new default instance of the TechnologyUniverseModule''' super().__init__(filterFineData, universeSettings, securityInitializer) self.numberOfSymbolsCoarse = 1000 self.numberOfSymbolsFine = 500 self.dollarVolumeBySymbol = {} self.symbols = [] self.lastMonth = -1 def SelectCoarse(self, algorithm, coarse): ''' Performs a coarse selection: -The stock must have fundamental data -The stock must have positive previous-day close price -The stock must have positive volume on the previous trading day ''' if algorithm.Time.month == self.lastMonth: return symbols filtered = [x for x in coarse if x.HasFundamentalData and x.Volume > 0 and x.Price > 0] sortedByDollarVolume = sorted(filtered, key = lambda x: x.DollarVolume, reverse=True)[:self.numberOfSymbolsCoarse] self.dollarVolumeBySymbol.clear() for x in sortedByDollarVolume: symbols.append(x.Symbol) self.dollarVolumeBySymbol[x.Symbol] = x.DollarVolume return symbols def SelectFine(self, algorithm, fine): ''' Performs a fine selection: -The company's headquarter must in the U.S. -The stock must be traded on the NASDAQ stock exchange -At least half a year since its initial public offering -The stock must be in the Industry Template Code catagory N ''' if algorithm.Time.month == self.lastMonth: return self.symbols self.lastMonth = algorithm.Time.month # Filter stocks filteredFine = [x for x in fine if x.CompanyReference.CountryId == "USA" #and (x.CompanyReference.PrimaryExchangeID == "NAS") and (algorithm.Time - x.SecurityReference.IPODate).days > 180 and x.CompanyReference.IndustryTemplateCode == "N"] sortedByDollarVolume = [] # Sort stocks on dollar volume sortedByDollarVolume = sorted(filteredFine, key = lambda x: self.dollarVolumeBySymbol[x.Symbol], reverse=True) self.symbols = [x.Symbol for x in sortedByDollarVolume[:self.numberOfSymbolsFine]] return self.symbols
from Execution.ImmediateExecutionModel import ImmediateExecutionModel from Portfolio.EqualWeightingPortfolioConstructionModel import EqualWeightingPortfolioConstructionModel #from CorrelatedPairsTrading import CorrelationAndCointegrationPairsTrading from PairsUniverseSelection import PairsUniverseSelection, CorrelationAndCointegrationPairsTrading from PairsTradingPortfolioConstruction import PairsTradingPortfolioConstruction from PairsTradingOrderExecution import PairsTradingOrderExecution from Selection.UncorrelatedUniverseSelectionModel import UncorrelatedUniverseSelectionModel class NadionParticleGearbox(QCAlgorithm): def Initialize(self): self.SetStartDate(2008, 1, 1) # Set Start Date self.SetEndDate(2008, 12, 31) # Set Start Date self.SetCash(2e4) # Set Strategy Cash ** min cash for alpha streams is 1M/1e6 ''' Universes: LiquidETFUniverse TechnologyUniverseModule VolatilityETFUniverse QC500UniverseSelectionModel UncorrelatedUniverseSelectionModel ''' self.SetUniverseSelection(PairsUniverseSelection()) self.UniverseSettings.Resolution = Resolution.Minute # Universe Settings benchmark = self.AddEquity("SPY").Symbol # Benchmark Symbol self.SetBenchmark(benchmark) # Setting Benchmark self.AddAlpha(CorrelationAndCointegrationPairsTrading()) # Alpha Model self.SetPortfolioConstruction(PairsTradingPortfolioConstruction()) # Portfolio Construction Model self.SetExecution(PairsTradingOrderExecution()) #self.SetRiskManagement(TrailingStopRiskManagementModel(0.02)) # Add Risk Managment # AlphaStreamsBrokerageModel # InteractiveBrokersBrokerageModel self.SetBrokerageModel(InteractiveBrokersBrokerageModel ()) # Brokerage Model (Alpha Streams is for Prime Brokerage) #