Overall Statistics |
Total Trades 20 Average Win 2.17% Average Loss -2.19% Compounding Annual Return 0.935% Drawdown 16.000% Expectancy 0.193 Net Profit 3.910% Sharpe Ratio 0.182 Probabilistic Sharpe Ratio 5.130% Loss Rate 40% Win Rate 60% Profit-Loss Ratio 0.99 Alpha -0.023 Beta 0.232 Annual Standard Deviation 0.062 Annual Variance 0.004 Information Ratio -1.232 Tracking Error 0.112 Treynor Ratio 0.048 Total Fees $20.00 |
import numpy as np import pandas as pd from statsmodels.tsa.stattools import adfuller def TestStationartiy(returns): # Return pandas Series with True/False for each symbol return pd.Series([adfuller(values)[1] < 0.05 for columns, values in returns.iteritems()], index = returns.columns) def GetZScores(returns): # Return pandas DataFrame containing z-scores return returns.subtract(returns.mean()).div(returns.std())
from HMM import * import numpy as np from StationarityAndZScores import * from QuantConnect.Data.UniverseSelection import * class ModulatedDynamicCircuit(QCAlgorithm): def Initialize(self): self.SetStartDate(2016, 1, 1) # Set Start Date self.SetCash(10000) # Set Strategy Cash self.SetBrokerageModel(AlphaStreamsBrokerageModel()) self.SetExecution(ImmediateExecutionModel()) self.SetPortfolioConstruction(EqualWeightingPortfolioConstructionModel()) self.UniverseSettings.Resolution = Resolution.Minute #self.SetUniverseSelection(LiquidETFUniverse()) #self.AddUniverse(self.CoarseSelectionFunction, self.FineSelectionFunction) self.models = {} SPY = self.AddEquity('SPY') self.Schedule.On(self.DateRules.EveryDay('SPY'), self.TimeRules.AfterMarketOpen('SPY', 5), self.GenerateInsights) self.Schedule.On(self.DateRules.MonthStart('SPY'), self.TimeRules.At(19,0), self.RefitModels) def RefitModels(self): for symbol, model in self.models.items(): RefitModel(self, symbol, model) def GenerateInsights(self): insights = [] qb = self '''--------------------------------------------------------------------------- Insert fundamental selection criteria here. Then pass that to symbols variable below. Use regime filter on SPY, not individual stocks. ---------------------------------------------------------------------------''' symbols = ["SPY"] #<---update this to take input from above, not activesecurities # Copy and paste from research notebook # ----------------------------------------------------------------------------- # Fetch history history = qb.History(symbols, 500, Resolution.Hour) # Convert to returns returns = history.unstack(level = 1).close.transpose().pct_change().dropna() # Test for stationarity stationarity = TestStationartiy(returns) # Get z-scores z_scores = GetZScores(returns) # ----------------------------------------------------------------------------- insights = [] # Iterate over symbols for symbol, value in stationarity.iteritems(): # Only emit Insights for those whose returns exhibit stationary behavior if value: # Get Hidden Markov model model = self.CheckForHMM(symbol) # Predict current state state_prediction = PredictState(self, model, symbol) # Get most recent z_score z_score = z_scores[symbol].tail(1).values[0] # Determine if we want to invest or not #if (z_score < -1) and (state_prediction == 0): if (state_prediction == 0): # and not self.Portfolio[symbol].Invested: insights.append(Insight.Price(symbol, timedelta(1), InsightDirection.Up)) elif z_score > 1: if self.Portfolio[symbol].Invested: insights.append(Insight.Price(symbol, timedelta(1), InsightDirection.Flat)) elif self.Portfolio[symbol].Invested and (state_prediction == 1): insights.append(Insight.Price(symbol, timedelta(1), InsightDirection.Flat)) self.Log("State = " + str(state_prediction)) self.EmitInsights(insights) def CheckForHMM(self, symbol): if self.models.get(symbol, None) is None: self.models[symbol] = CreateHMM(self, symbol) return self.models.get(symbol, None) def OnSecuritiesChanged(self, changes): symbols = ["SPY"] # Build model for each symbol for symbol in symbols: self.models[symbol] = CreateHMM(self, symbol)
import numpy as np import pandas as pd from hmmlearn.hmm import GaussianHMM def CreateHMM(algorithm, symbol): history = algorithm.History([symbol], 900, Resolution.Daily) returns = np.array(history.loc[symbol].close.pct_change().dropna()) # Reshape returns returns = np.array(returns).reshape((len(returns),1)) # Initialize Gaussian Hidden Markov Model return GaussianHMM(n_components=2, covariance_type="full", n_iter=10000).fit(returns) def PredictState(algorithm, model, symbol): # Predict current state if algorithm.CurrentSlice.ContainsKey(symbol): price = np.array(algorithm.CurrentSlice[symbol].Close).reshape((1,1)) else: price = np.array(algorithm.Securities[symbol].Price).reshape((1,1)) return model.predict(price)[0] def RefitModel(algorithm, symbol, model): history = algorithm.History([symbol], 900, Resolution.Daily) returns = np.array(history.loc[symbol].close.pct_change().dropna()) # Reshape returns returns = np.array(returns).reshape((len(returns),1)) return model.fit(returns)