Overall Statistics |
Total Trades 4 Average Win 0.06% Average Loss -0.07% Compounding Annual Return -0.017% Drawdown 0.000% Expectancy -0.061 Net Profit -0.009% Sharpe Ratio -1.425 Probabilistic Sharpe Ratio 0.033% Loss Rate 50% Win Rate 50% Profit-Loss Ratio 0.88 Alpha 0 Beta 0 Annual Standard Deviation 0 Annual Variance 0 Information Ratio -3.726 Tracking Error 0.112 Treynor Ratio 3.634 Total Fees $16.06 |
from Execution.ImmediateExecutionModel import ImmediateExecutionModel from datetime import timedelta, datetime import numpy as np import pandas as pd import statsmodels.tsa.stattools as ts import statsmodels.api from clr import AddReference AddReference("QuantConnect.Common") AddReference("QuantConnect.Algorithm.Framework") from QuantConnect import Resolution, Extensions from QuantConnect.Algorithm.Framework.Alphas import * from QuantConnect.Algorithm.Framework.Portfolio import * from itertools import groupby class ParticleNadionRegulators(QCAlgorithm): def Initialize(self): self.SetStartDate(2019, 1, 7) # Set Start Date self.SetEndDate(2019, 7, 7) self.SetCash(100000) # Set Strategy Cash #lookback = self.GetParameter("look-back") symbols = [Symbol.Create("PEP", SecurityType.Equity, Market.USA), Symbol.Create("KO", SecurityType.Equity, Market.USA)] stock1 = symbols[0] stock2 = symbols[1] self.AddUniverseSelection(ManualUniverseSelectionModel(symbols)) self.UniverseSettings.Resolution = Resolution.Hour self.UniverseSettings.DataNormalizationMode = DataNormalizationMode.Raw self.SetAlpha(SinglePairsTrade()) # works when addalpha #self.SetAlpha(SinglePairsTrade(self, lookback)) # works when addalpha self.SetExecution(ImmediateExecutionModel()) self.SetPortfolioConstruction(HedgeWeightingPortfolioConstructionModel()) #self.Settings.RebalancePortfolioOnInsightChanges = False #self.Settings.RebalancePortfolioOnSecurityChanges = True self.Settings.FreePortfolioValuePercentage = 0.1 ''' def OnEndOfDay(self): self.Log("MV of stock 1 is "+str((self.Portfolio["PEP"].Quantity)*(self.Portfolio["PEP"].Price))+", and MV of stock 2 is "+str((self.Portfolio["KO"].Quantity)*(self.Portfolio["KO"].Price))) ''' class SinglePairsTrade(AlphaModel): def __init__(self): #self.lookback1 = int(lookback) self.lookback1 = 100 self.pair = [] self.spreadMean = SimpleMovingAverage(self.lookback1) self.spreadStd = StandardDeviation(self.lookback1) self.period = timedelta(hours=1) self.w1 = 0 self.w2 = 0 pass def Update(self,algorithm,data): #Calculate pearson correlation coefficient h1 = algorithm.History(self.pair[0].Symbol, self.lookback1, Resolution.Hour) h2 = algorithm.History(self.pair[1].Symbol, self.lookback1, Resolution.Hour) h1 = h1['close'].unstack(level=0) h2 = h2['close'].unstack(level=0) h3 = pd.merge(h1, h2, on = "time", how="inner") correlation = h3.corr(method="pearson") pearsoncoef = correlation.iloc[0,1] algorithm.Plot("Pearson corr", "Pearsoncoef", pearsoncoef*100) #Calculate cointegration header1 = h3.columns[0] header2 = h3.columns[1] res = statsmodels.api.OLS(endog=h3[str(header1)], exog=h3[str(header2)]).fit() #algorithm.Debug("Original parameter output:" +str(res.params)) rsq = res.rsquared #algorithm.Debug("R squared is: "+str(res.rsquared)) #algorithm.Debug(res.params) #algorithm.Debug(res.params.loc[str(header2)]) beta_hr = res.params.loc[str(header2)] self.w1 = 1.0/(1.0+beta_hr) self.w2 = 1.0-self.w1 h3["res"] = h3[str(header1)] - beta_hr*h3[str(header2)] cadf = ts.adfuller(h3["res"]) #algorithm.Debug(cadf) #algorithm.Debug("pvalue is "+str(cadf[1])) significance = 0 pval = cadf[1] algorithm.Plot("P value", "Pvalue", cadf[1]*100) if cadf[1] < 0.05: significance = 1 algorithm.Plot("Hedge ratio", "Ratio of asset 2", beta_hr*100) algorithm.Plot("R squared plot", "Rsqaured", rsq*100) #Calculate spread based on cointegration spread = (self.w2*self.pair[1].Price) - (self.w1*self.pair[0].Price) self.spreadMean.Update(algorithm.Time, spread) self.spreadStd.Update(algorithm.Time, spread) upperthreshold = self.spreadMean.Current.Value + (2*self.spreadStd.Current.Value) lowerthreshold = self.spreadMean.Current.Value - (2*self.spreadStd.Current.Value) algorithm.Plot("Spread plot", "Spread", spread) #Check if we have any holdings. If we do and spread converged, then liquidate fund and take profit: if ((algorithm.Portfolio[self.pair[0].Symbol].Quantity != 0) or (algorithm.Portfolio[self.pair[1].Symbol].Quantity != 0)): if (spread < 0.5*self.spreadStd.Current.Value and spread > -0.5*self.spreadStd.Current.Value and algorithm.Portfolio[self.pair[0].Symbol].Quantity>0): return Insight.Group( [ Insight.Price(self.pair[0].Symbol, self.period, InsightDirection.Down, None, None, None, 0), Insight.Price(self.pair[1].Symbol, self.period, InsightDirection.Up, None, None, None, 0) ]) if (spread < 0.5*self.spreadStd.Current.Value and spread > -0.5*self.spreadStd.Current.Value and algorithm.Portfolio[self.pair[0].Symbol].Quantity<0): return Insight.Group( [ Insight.Price(self.pair[0].Symbol, self.period, InsightDirection.Up, None, None, None, 0), Insight.Price(self.pair[1].Symbol, self.period, InsightDirection.Down, None, None, None, 0) ]) return[] #Check if we don't have any holdings. If so, then if ((algorithm.Portfolio[self.pair[0].Symbol].Quantity == 0) and (algorithm.Portfolio[self.pair[1].Symbol].Quantity == 0)): if ((rsq>0.75) and (pval<0.05)): #Generate insights if spread > upperthreshold: #algorithm.SetHoldings([PortfolioTarget(self.pair[0].Symbol, self.w1), PortfolioTarget(self.pair[1].Symbol, -1*self.w2)]) return Insight.Group( [ Insight.Price(self.pair[0].Symbol, self.period, InsightDirection.Up,None, None, None, self.w1), Insight.Price(self.pair[1].Symbol, self.period, InsightDirection.Down, None, None, None, self.w2) ]) if spread < lowerthreshold: #algorithm.SetHoldings([PortfolioTarget(self.pair[0].Symbol, -1*self.w1), PortfolioTarget(self.pair[1].Symbol, self.w2)]) return Insight.Group( [ Insight.Price(self.pair[0].Symbol, self.period, InsightDirection.Down, None, None, None, self.w1), Insight.Price(self.pair[1].Symbol, self.period, InsightDirection.Up, None, None, None, self.w2) ]) return[] def OnSecuritiesChanged(self, algorithm, changes): self.pair = [x for x in changes.AddedSecurities] #1. Call for bars of history data for each symbol in the pair and save to the variable history history = algorithm.History([x.Symbol for x in self.pair], self.lookback1) #2. Unstack the Pandas data frame to reduce it to the history close price history = history.close.unstack(level=0) #3. Iterate through the history tuple and update the mean and standard deviation with historical data for tuple in history.itertuples(): self.spreadMean.Update(tuple[0], (self.w2*tuple[2])-(tuple[1]*self.w1)) self.spreadStd.Update(tuple[0], (self.w2*tuple[2])-(tuple[1]*self.w1)) class HedgeWeightingPortfolioConstructionModel(PortfolioConstructionModel): def __init__(self, rebalance = Resolution.Daily, portfolioBias = PortfolioBias.LongShort): self.portfolioBias = portfolioBias # If the argument is an instance of Resolution or Timedelta # Redefine rebalancingFunc rebalancingFunc = rebalance if isinstance(rebalance, int): rebalance = Extensions.ToTimeSpan(rebalance) if isinstance(rebalance, timedelta): rebalancingFunc = lambda dt: dt + rebalance if rebalancingFunc: self.SetRebalancingFunc(rebalancingFunc) def DetermineTargetPercent(self, activeInsights): result = {} # give equal weighting to each security count = sum(x.Direction != InsightDirection.Flat and self.RespectPortfolioBias(x) for x in activeInsights) for insight in activeInsights: #self.Log("Insight weights" + str(insight.Weight)) if count == 0: result[insight] = (insight.Direction if self.RespectPortfolioBias(insight) else InsightDirection.Flat)*0 else: result[insight] = (insight.Direction if self.RespectPortfolioBias(insight) else InsightDirection.Flat)*self.GetValue(insight) return result def RespectPortfolioBias(self, insight): return self.portfolioBias == PortfolioBias.LongShort or insight.Direction == self.portfolioBias def GetValue(self, insight): return insight.Weight