Overall Statistics |
Total Trades 152 Average Win 0.08% Average Loss -0.14% Compounding Annual Return -2.442% Drawdown 5.200% Expectancy -0.473 Net Profit -4.823% Sharpe Ratio -1.687 Probabilistic Sharpe Ratio 0.000% Loss Rate 67% Win Rate 33% Profit-Loss Ratio 0.60 Alpha -0.017 Beta 0.002 Annual Standard Deviation 0.01 Annual Variance 0 Information Ratio -0.853 Tracking Error 0.124 Treynor Ratio -9.115 Total Fees $498.73 Estimated Strategy Capacity $2100000.00 Lowest Capacity Asset GOOCV VP83T1ZUHROL Portfolio Turnover 10.38% |
# region imports from AlgorithmImports import * from QuantConnect.Data.UniverseSelection import * from AlgorithmImports import * from enum import Enum import statsmodels.api as sm from statsmodels.tsa.stattools import coint, adfuller # endregion ''' The Alpha model looks to implement as insights the ADF test statistics. This alpha model is designed to rank every pair combination by its ADF test and trade the pair with the highest correlation. The base model is the "Black Litterman Optimization Model" which calculates a ratio between the two securities by dividing their historical prices over a lookback window. It then calculates the mean of this ratio by taking the 500-period EMA of the quotient. When the ratio diverges far enough from the mean ratio, this model emits generates alternating long ratio/short ratio insights emitted as a group to capture the reversion of the ratio. Finally, we use the mean-variance optimization for the construction of the portfolio in combination with a Traling Stop for the risk management model. ''' class DeterminedTanKitten(QCAlgorithm): lookback = 6*60 # Last trading day entry_th = 2 my_resolution = Resolution.Minute def Initialize(self): self.SetStartDate(2018, 1, 1) # Set Start Date self.SetEndDate(2020, 1, 1) # Set Start Date self.SetCash(100000) # Set Strategy Cash # Broker self.SetBrokerageModel(BrokerageName.QuantConnectBrokerage, AccountType.Margin) # Init Universe, use only GOOG and MSFT self.UniverseSettings.Resolution = self.my_resolution tickers = ['GOOG', 'MSFT'] tickers = [Symbol.Create(ticker, SecurityType.Equity, Market.USA) for ticker in tickers] self.AddUniverseSelection(ManualUniverseSelectionModel(tickers)) # Alpha model base: https://www.quantconnect.com/docs/v2/writing-algorithms/algorithm-framework/alpha/supported-models#08-Base-Pairs-Trading-Model self.AddAlpha(ADFPairsTradingAlphaModel(resolution=self.my_resolution, minimumZScore=self.entry_th)) # Portfolio construction model: https://www.quantconnect.com/docs/v2/writing-algorithms/algorithm-framework/portfolio-construction/supported-models#08-Mean-Variance-Optimization-Model self.SetPortfolioConstruction(MeanVarianceOptimizationPortfolioConstructionModel(resolution=self.my_resolution, lookback=self.lookback)) # Risk Model :https://www.quantconnect.com/docs/v2/writing-algorithms/algorithm-framework/risk-management/supported-models#07-Trailing-Stop-Model self.AddRiskManagement(TrailingStopRiskManagementModel()) # Track of securities in the universe self.securityTracker = set() ## ------------------------------------- ALPHA MODEL ------------------------------------- ## class MyBasePairsTradingAlphaModel(AlphaModel): '''This alpha model is designed to accept every possible pair combination from securities selected by the universe selection model This model generates alternating long ratio/short ratio insights emitted as a group''' def __init__(self, lookback = 1, resolution = Resolution.Daily, threshold = 1): ''' Initializes a new instance of the PairsTradingAlphaModel class Args: lookback: Lookback period of the analysis resolution: Analysis resolution threshold: The percent [0, 100] deviation of the ratio from the mean before emitting an insight''' self.lookback = lookback self.resolution = resolution self.threshold = threshold self.predictionInterval = Time.Multiply(Extensions.ToTimeSpan(self.resolution), self.lookback) self.pairs = dict() self.Securities = list() resolutionString = Extensions.GetEnumString(resolution, Resolution) self.Name = f'{self.__class__.__name__}({self.lookback},{resolutionString},{Extensions.NormalizeToStr(threshold)})' def Update(self, algorithm, data): ''' Updates this alpha model with the latest data from the algorithm. This is called each time the algorithm receives data for subscribed securities Args: algorithm: The algorithm instance data: The new data available Returns: The new insights generated''' insights = [] for key, pair in self.pairs.items(): insights.extend(pair.GetInsightGroup()) return insights def OnSecuritiesChanged(self, algorithm, changes): '''Event fired each time the we add/remove securities from the data feed. Args: algorithm: The algorithm instance that experienced the change in securities changes: The security additions and removals from the algorithm''' for security in changes.AddedSecurities: self.Securities.append(security) for security in changes.RemovedSecurities: if security in self.Securities: self.Securities.remove(security) self.UpdatePairs(algorithm) for security in changes.RemovedSecurities: keys = [k for k in self.pairs.keys() if security.Symbol in k] for key in keys: self.pairs.pop(key).dispose() def UpdatePairs(self, algorithm): symbols = sorted([x.Symbol for x in self.Securities], key=lambda x: str(x.ID)) for i in range(0, len(symbols)): asset_i = symbols[i] for j in range(1 + i, len(symbols)): asset_j = symbols[j] pair_symbol = (asset_i, asset_j) invert = (asset_j, asset_i) if pair_symbol in self.pairs or invert in self.pairs: continue if not self.HasPassedTest(algorithm, asset_i, asset_j): continue pair = self.Pair(algorithm, asset_i, asset_j, self.predictionInterval, self.threshold) self.pairs[pair_symbol] = pair def HasPassedTest(self, algorithm, asset1, asset2): '''Check whether the assets pass a pairs trading test Args: algorithm: The algorithm instance that experienced the change in securities asset1: The first asset's symbol in the pair asset2: The second asset's symbol in the pair Returns: True if the statistical test for the pair is successful''' return True class Pair: class State(Enum): ShortRatio = -1 FlatRatio = 0 LongRatio = 1 def __init__(self, algorithm, asset1, asset2, predictionInterval, threshold): '''Create a new pair Args: algorithm: The algorithm instance that experienced the change in securities asset1: The first asset's symbol in the pair asset2: The second asset's symbol in the pair predictionInterval: Period over which this insight is expected to come to fruition threshold: The percent [0, 100] deviation of the ratio from the mean before emitting an insight''' self.state = self.State.FlatRatio self.algorithm = algorithm self.asset1 = asset1 self.asset2 = asset2 # Created the Identity indicator for a given Symbol and # the consolidator it is registered to. The consolidator reference # will be used to remove it from SubscriptionManager def CreateIdentityIndicator(symbol: Symbol): resolution = min([x.Resolution for x in algorithm.SubscriptionManager.SubscriptionDataConfigService.GetSubscriptionDataConfigs(symbol)]) name = algorithm.CreateIndicatorName(symbol, "close", resolution) identity = Identity(name) consolidator = algorithm.ResolveConsolidator(symbol, resolution) algorithm.RegisterIndicator(symbol, identity, consolidator) return identity, consolidator self.asset1Price, self.identityConsolidator1 = CreateIdentityIndicator(asset1); self.asset2Price, self.identityConsolidator2 = CreateIdentityIndicator(asset2); self.ratio = IndicatorExtensions.Over(self.asset1Price, self.asset2Price) self.mean = IndicatorExtensions.Of(ExponentialMovingAverage(500), self.ratio) upper = ConstantIndicator[IndicatorDataPoint]("ct", 1 + threshold / 100) self.upperThreshold = IndicatorExtensions.Times(self.mean, upper) lower = ConstantIndicator[IndicatorDataPoint]("ct", 1 - threshold / 100) self.lowerThreshold = IndicatorExtensions.Times(self.mean, lower) self.predictionInterval = predictionInterval def dispose(self): ''' On disposal, remove the consolidators from the subscription manager ''' self.algorithm.SubscriptionManager.RemoveConsolidator(self.asset1, self.identityConsolidator1) self.algorithm.SubscriptionManager.RemoveConsolidator(self.asset2, self.identityConsolidator2) def GetInsightGroup(self): '''Gets the insights group for the pair Returns: Insights grouped by an unique group id''' if not self.mean.IsReady: return [] # don't re-emit the same direction if self.state is not self.State.LongRatio and self.ratio > self.upperThreshold: self.state = self.State.LongRatio # asset1/asset2 is more than 2 std away from mean, short asset1, long asset2 shortAsset1 = Insight.Price(self.asset1, self.predictionInterval, direction=InsightDirection.Down,magnitude=self.ratio.Current.Value) longAsset2 = Insight.Price(self.asset2, self.predictionInterval, direction=InsightDirection.Up,magnitude=self.ratio.Current.Value) # creates a group id and set the GroupId property on each insight object return Insight.Group(shortAsset1, longAsset2) # don't re-emit the same direction if self.state is not self.State.ShortRatio and self.ratio < self.lowerThreshold: self.state = self.State.ShortRatio # asset1/asset2 is less than 2 std away from mean, long asset1, short asset2 longAsset1 = Insight.Price(self.asset1, self.predictionInterval, direction=InsightDirection.Up,magnitude=self.ratio.Current.Value) shortAsset2 = Insight.Price(self.asset2, self.predictionInterval, direction=InsightDirection.Down,magnitude=self.ratio.Current.Value) # creates a group id and set the GroupId property on each insight object return Insight.Group(longAsset1, shortAsset2) return [] class ADFPairsTradingAlphaModel(MyBasePairsTradingAlphaModel): ''' This model is an adaptation from the "PearsonCorrelationPairsTradingAlphaModel" implemented by QuantConnect, It looks to implement as insights the ADF test statistics. This alpha model is designed to rank every pair combination by its ADF test and trade the pair with the highest correlation This model generates alternating long ratio/short ratio insights emitted as a group. ''' def __init__(self, lookback:int = 15, resolution = Resolution.Minute, threshold = 1, minimumZScore = 1, min_statistical_significance = 0.05): '''Initializes a new instance of the PearsonCorrelationPairsTradingAlphaModel class Args: lookback: lookback period of the analysis resolution: analysis resolution threshold: The percent [0, 100] deviation of the ratio from the mean before emitting an insight minimumCorrelation: The minimum correlation to consider a tradable pair''' super().__init__(lookback, resolution, threshold) self.lookback = lookback self.resolution = resolution self.min_statistical_significance = min_statistical_significance self.minimumZScore = minimumZScore self.best_pair = () def Update(self, algorithm: QCAlgorithm, data: Slice) -> List[Insight]: # Update every hour if int(algorithm.Time.minute) != 0: return [] symbols = [] for x in self.Securities: symbols.append(x.Symbol) if not x.Exchange.Hours.IsOpen(algorithm.Time, extendedMarket=False): return [] history = algorithm.History(symbols, self.lookback, self.resolution).close.unstack(level=0) if not history.empty: df = self.get_price_dataframe(history) stop = len(df.columns) adf = dict() for i in range(0, stop): for j in range(i+1, stop): if (j, i) not in adf: adf[(i, j)] = self.ComputeADF(df.iloc[:,i], df.iloc[:,j]) # Statistical significan or Stationarity corr = list(filter(lambda kv: kv[1][0]<self.min_statistical_significance, adf.items())) if len(corr)==0: return [] else: # Lowest statistical test -> better conintegration corr = sorted(corr, key = lambda kv: kv[-1][1],reverse=True) # ZScore filter if abs(corr[-1][-1][2]) >= self.minimumZScore: algorithm.Log('The best pair was ({} - {}) with a p-value of {}, ADF-stat of {} and Z-Score of {}'.\ format(symbols[corr[-1][0][0]], symbols[corr[-1][0][1]], corr[-1][-1][0],corr[-1][-1][1], corr[-1][-1][2])) self.best_pair = (symbols[corr[-1][0][0]], symbols[corr[-1][0][1]]) self.UpdatePairs(algorithm) insights = super().Update(algorithm, data) return insights def ComputeADF(self, Y, X): # Get history logs X = sm.add_constant(X) # Regression model model = sm.OLS(Y,X) results = model.fit() # Standard deviation of the residual sigma = np.sqrt(results.mse_resid) slope = results.params[1] intercept = results.params[0] # Regression residual has mean =0 by definition res = results.resid zscore = res/sigma adf = adfuller (res) adf = pd.Series(adf[0:4], index=['Test Statistic','p-value','#Lags Used','Number of Observations Used']) return [adf['p-value'], adf['Test Statistic'], zscore.values[-1]] def HasPassedTest(self, algorithm, asset1, asset2): '''Check whether the assets pass a pairs trading test Args: algorithm: The algorithm instance that experienced the change in securities asset1: The first asset's symbol in the pair asset2: The second asset's symbol in the pair Returns: True if the statistical test for the pair is successful''' return self.best_pair is not None and self.best_pair == (asset1, asset2) def get_price_dataframe(self, df): timezones = { x.Symbol.Value: x.Exchange.TimeZone for x in self.Securities } # Use log prices df = np.log(df) is_single_timeZone = len(set(timezones.values())) == 1 if not is_single_timeZone: series_dict = dict() for column in df: # Change the dataframe index from data time to UTC time to_utc = lambda x: Extensions.ConvertToUtc(x, timezones[column]) if self.resolution == Resolution.Daily: to_utc = lambda x: Extensions.ConvertToUtc(x, timezones[column]).date() data = df[[column]] data.index = data.index.map(to_utc) series_dict[column] = data[column] df = pd.DataFrame(series_dict).dropna() return (df - df.shift(1)).dropna()