Overall Statistics |
Total Trades 1267 Average Win 0.08% Average Loss -0.04% Compounding Annual Return 7.764% Drawdown 6.500% Expectancy 0.308 Net Profit 7.126% Sharpe Ratio 0.772 Probabilistic Sharpe Ratio 40.699% Loss Rate 59% Win Rate 41% Profit-Loss Ratio 2.17 Alpha 0.078 Beta -0.058 Annual Standard Deviation 0.084 Annual Variance 0.007 Information Ratio -1.175 Tracking Error 0.146 Treynor Ratio -1.112 Total Fees $1426.42 |
from MyLSTM import MyLSTM class MultidimensionalHorizontalFlange(QCAlgorithm): def Initialize(self): self.SetStartDate(2019, 1, 4) # Set Start Date self.SetCash(100000) # Set Strategy Cash self.SetBrokerageModel(AlphaStreamsBrokerageModel()) self.SetExecution(ImmediateExecutionModel()) self.SetPortfolioConstruction(EqualWeightingPortfolioConstructionModel()) self.UniverseSettings.Resolution = Resolution.Minute self.SetUniverseSelection(LiquidETFUniverse()) # Helper dictionaries self.macro_symbols = {'Bull' : Symbol.Create('SPY', SecurityType.Equity, Market.USA)} self.models = {'Bull': None, 'Bear': None} # Use Train() method to avoid runtime error self.Train(self.TrainMyModel) self.Train(self.DateRules.MonthEnd(), self.TimeRules.At(8,0), self.TrainMyModel) # Schedule prediction and plotting self.AddEquity('SPY') self.Schedule.On(self.DateRules.EveryDay('SPY'), self.TimeRules.AfterMarketOpen('SPY', 5), self.Predict) self.Schedule.On(self.DateRules.EveryDay('SPY'), self.TimeRules.AfterMarketOpen('SPY', 6), self.PlotMe) # Create custom charts prediction = Chart('Prediction Plot') prediction.AddSeries(Series('Actual Bull', SeriesType.Line, 0)) prediction.AddSeries(Series('Predicted Bull', SeriesType.Line, 0)) prediction.AddSeries(Series('Actual Bear', SeriesType.Line, 1)) prediction.AddSeries(Series('Predicted Bear', SeriesType.Line, 1)) def TrainMyModel(self): qb = self # Fetch history history = qb.History([symbol for key, symbol in self.macro_symbols.items()], 1280, Resolution.Daily) # Iterate over macro symbols for key, symbol in self.macro_symbols.items(): # Initialize LSTM class instance lstm = MyLSTM() # Prepare data features_set, labels, training_data, test_data = lstm.ProcessData(history.loc[symbol].close) # Build model layers lstm.CreateModel(features_set, labels) # Fit model lstm.FitModel(features_set, labels) # Add LSTM class to dictionary to store later self.models[key] = lstm def Predict(self): delta = {} qb = self for key, symbol in self.macro_symbols.items(): # Fetch LSTM class lstm = self.models[key] # Fetch history history = qb.History([symbol for key, symbol in self.macro_symbols.items()], 80, Resolution.Daily) # Predict predictions = lstm.PredictFromModel(history.loc[symbol].close) # Grab latest prediction and calculate if predict symbol to go up or down delta[key] = ( predictions[-1] / self.Securities[symbol].Price ) - 1 # Plot prediction self.Plot('Prediction Plot', f'Predicted {key}', predictions[-1]) insights = [] # Iterate over macro symbols for key, change in delta.items(): if key == 'Bull': insights += [Insight.Price(symbol, timedelta(1), InsightDirection.Up if change > 0 else InsightDirection.Flat) for symbol in LiquidETFUniverse.SP500Sectors.Long if self.Securities.ContainsKey(symbol)] insights += [Insight.Price(symbol, timedelta(1), InsightDirection.Up if change > 0 else InsightDirection.Flat) for symbol in LiquidETFUniverse.Treasuries.Inverse if self.Securities.ContainsKey(symbol)] insights += [Insight.Price(symbol, timedelta(1), InsightDirection.Flat if change > 0 else InsightDirection.Up) for symbol in LiquidETFUniverse.Treasuries.Long if self.Securities.ContainsKey(symbol)] self.EmitInsights(insights) def PlotMe(self): # Plot current price of symbols to match against prediction for key, symbol in self.macro_symbols.items(): self.Plot('Prediction Plot', f'Actual {key}', self.Securities[symbol].Price)
import numpy as np import pandas as pd from keras.layers import LSTM from keras.layers import Dense from keras.layers import Dropout from keras.models import Sequential from sklearn.preprocessing import MinMaxScaler class MyLSTM: def __init__(self): self.model = None self.scaler = MinMaxScaler(feature_range = (0, 1)) def ProcessData(self, data, n = 60): # Split the data training_data = data[:1260] test_data = data[1260:] # Transform data training_data_array = np.array(training_data).reshape((len(training_data), 1)) training_data_scaled = self.scaler.fit_transform(training_data_array) # Get features and labels features_set = [] labels = [] for i in range(60, 1260): features_set.append(training_data_scaled[i-60:i, 0]) labels.append(training_data_scaled[i, 0]) features_set, labels = np.array(features_set), np.array(labels) features_set = np.reshape(features_set, (features_set.shape[0], features_set.shape[1], 1)) return features_set, labels, training_data, test_data def CreateModel(self, features_set, labels): # Create Model self.model = Sequential() self.model.add(LSTM(units = 50, return_sequences=True, input_shape=(features_set.shape[1], 1))) self.model.add(Dropout(0.2)) self.model.add(LSTM(units=50, return_sequences=True)) self.model.add(Dropout(0.2)) self.model.add(LSTM(units=50, return_sequences=True)) self.model.add(Dropout(0.2)) self.model.add(LSTM(units=50)) self.model.add(Dropout(0.2)) self.model.add(Dense(units = 1)) self.model.compile(optimizer = 'adam', loss = 'mean_squared_error') def FitModel(self, features_set, labels): self.model.fit(features_set, labels, epochs = 50, batch_size = 32) def PredictFromModel(self, test_data): test_inputs = test_data[-80:].values test_inputs = test_inputs.reshape(-1,1) test_inputs = self.scaler.transform(test_inputs) test_features = [] for i in range(60, 80): test_features.append(test_inputs[i-60:i, 0]) test_features = np.array(test_features) test_features = np.reshape(test_features, (test_features.shape[0], test_features.shape[1], 1)) predictions = self.model.predict(test_features) predictions = self.scaler.inverse_transform(predictions) return predictions