Overall Statistics |
Total Trades 895 Average Win 0.37% Average Loss -0.31% Compounding Annual Return 47.339% Drawdown 19.500% Expectancy 0.257 Net Profit 40.017% Sharpe Ratio 1.025 Probabilistic Sharpe Ratio 51.130% Loss Rate 42% Win Rate 58% Profit-Loss Ratio 1.18 Alpha 0.328 Beta 0.253 Annual Standard Deviation 0.374 Annual Variance 0.14 Information Ratio 0.429 Tracking Error 0.383 Treynor Ratio 1.517 Total Fees $4030.38 |
import numpy as np import pandas as pd from statsmodels.tsa.stattools import adfuller def TestStationartiy(returns): # Return pandas Series with True/False for each symbol return pd.Series([adfuller(values)[1] < 0.05 for columns, values in returns.iteritems()], index = returns.columns) def GetZScores(returns): # Return pandas DataFrame containing z-scores return returns.subtract(returns.mean()).div(returns.std())
from HMM import * import numpy as np from StationarityAndZScores import * from QuantConnect.Data.UniverseSelection import * class ModulatedDynamicCircuit(QCAlgorithm): def Initialize(self): self.SetStartDate(2019, 1, 1) # Set Start Date self.SetCash(100000) # Set Strategy Cash self.SetBrokerageModel(AlphaStreamsBrokerageModel()) self.SetExecution(ImmediateExecutionModel()) self.SetPortfolioConstruction(EqualWeightingPortfolioConstructionModel()) self.SetUniverseSelection(LiquidETFUniverse()) self.models = {} self.AddEquity('GLD') self.Schedule.On(self.DateRules.EveryDay('GLD'), self.TimeRules.AfterMarketOpen('GLD', 5), self.GenerateInsights) self.Schedule.On(self.DateRules.MonthStart('GLD'), self.TimeRules.At(19,0), self.RefitModels) def RefitModels(self): for symbol, model in self.models.items(): RefitModel(self, symbol, model) def GenerateInsights(self): insights = [] qb = self symbols = [x.Symbol for x in qb.ActiveSecurities.Values] # Copy and paste from research notebook # ----------------------------------------------------------------------------- # Fetch history history = qb.History(symbols, 500, Resolution.Hour) # Convert to returns returns = history.unstack(level = 1).close.transpose().pct_change().dropna() # Test for stationarity stationarity = TestStationartiy(returns) # Get z-scores z_scores = GetZScores(returns) # ----------------------------------------------------------------------------- insights = [] # Iterate over symbols for symbol, value in stationarity.iteritems(): # Only emit Insights for those whose returns exhibit stationary behavior if value: # Get Hidden Markov model model = self.CheckForHMM(symbol) # Predict current state state_prediction = PredictState(self, model, symbol) # Get most recent z_score z_score = z_scores[symbol].tail(1).values[0] # Determine if we want to invest or not if (z_score < -1) and (state_prediction == 0): insights.append(Insight.Price(symbol, timedelta(1), InsightDirection.Up)) elif z_score > 1: if self.Portfolio[symbol].Invested: insights.append(Insight.Price(symbol, timedelta(1), InsightDirection.Flat)) elif self.Portfolio[symbol].Invested and (state_prediction == 1): insights.append(Insight.Price(symbol, timedelta(1), InsightDirection.Flat)) self.EmitInsights(insights) def CheckForHMM(self, symbol): if self.models.get(symbol, None) is None: self.models[symbol] = CreateHMM(self, symbol) return self.models.get(symbol, None) def OnSecuritiesChanged(self, changes): symbols = [x.Symbol for x in changes.AddedSecurities] # Build model for each symbol for symbol in symbols: self.models[symbol] = CreateHMM(self, symbol)
import numpy as np import pandas as pd from hmmlearn.hmm import GaussianHMM def CreateHMM(algorithm, symbol): history = algorithm.History([symbol], 900, Resolution.Daily) returns = np.array(history.loc[symbol].close.pct_change().dropna()) # Reshape returns returns = np.array(returns).reshape((len(returns),1)) # Initialize Gaussian Hidden Markov Model return GaussianHMM(n_components=2, covariance_type="full", n_iter=1000).fit(returns) def PredictState(algorithm, model, symbol): # Predict current state if algorithm.CurrentSlice.ContainsKey(symbol): price = np.array(algorithm.CurrentSlice[symbol].Close).reshape((1,1)) else: price = np.array(algorithm.Securities[symbol].Price).reshape((1,1)) return model.predict(price)[0] def RefitModel(algorithm, symbol, model): history = algorithm.History([symbol], 900, Resolution.Daily) returns = np.array(history.loc[symbol].close.pct_change().dropna()) # Reshape returns returns = np.array(returns).reshape((len(returns),1)) return model.fit(returns)