Overall Statistics |
Total Trades 13 Average Win 17.20% Average Loss -0.04% Compounding Annual Return 23.636% Drawdown 33.400% Expectancy 216.824 Net Profit 86.306% Sharpe Ratio 1.002 Probabilistic Sharpe Ratio 44.147% Loss Rate 50% Win Rate 50% Profit-Loss Ratio 434.65 Alpha 0.008 Beta 0.876 Annual Standard Deviation 0.17 Annual Variance 0.029 Information Ratio -0.254 Tracking Error 0.061 Treynor Ratio 0.195 Total Fees $13.00 Estimated Strategy Capacity $39000000.00 Lowest Capacity Asset SPY R735QTJ8XC9X |
# REF. https://www.quantconnect.com/forum/discussion/2657/a-simple-vix-strategy from QuantConnect.Python import PythonQuandl # quandl data not CLOSE from QuantConnect.Python import PythonData # custom data from QuantConnect.Data import SubscriptionDataSource from datetime import datetime, timedelta import decimal class CboeVix(PythonData): '''CBOE Vix Download Custom Data Class''' def GetSource(self, config, date, isLiveMode): url_vix = "https://cdn.cboe.com/api/global/us_indices/daily_prices/VIX_History.csv" return SubscriptionDataSource(url_vix, SubscriptionTransportMedium.RemoteFile) def Reader(self, config, line, date, isLiveMode): if not (line.strip() and line[0].isdigit()): return None # New CboeVix object index = CboeVix(); index.Symbol = config.Symbol try: # Example File Format: # Date VIX Open VIX High VIX Low VIX Close # 01/02/2004 17.96 18.68 17.54 18.22 #print line data = line.split(',') date = data[0].split('/') index.Time = datetime(int(date[2]), int(date[0]), int(date[1])) index.Value = decimal.Decimal(data[4]) index["Open"] = float(data[1]) index["High"] = float(data[2]) index["Low"] = float(data[3]) index["Close"] = float(data[4]) except ValueError: # Do nothing return None # except KeyError, e: # print 'I got a KeyError - reason "%s"' % str(e) return index # NB: CboeVxV class == CboeVix class, except for the URL class CboeVxV(PythonData): '''CBOE VXV Download Custom Data Class''' def GetSource(self, config, date, isLiveMode): url_vxv = "https://cdn.cboe.com/api/global/us_indices/daily_prices/VIX3M_History.csv" return SubscriptionDataSource(url_vxv, SubscriptionTransportMedium.RemoteFile) def Reader(self, config, line, date, isLiveMode): if not (line.strip() and line[0].isdigit()): return None index = CboeVxV(); index.Symbol = config.Symbol try: # Example File Format: # OPEN HIGH LOW CLOSE # 12/04/2007 24.8 25.01 24.15 24.65 data = line.split(',') date = data[0].split('/') index.Time = datetime(int(date[2]), int(date[0]), int(date[1])) index.Value = decimal.Decimal(data[4]) index["Open"] = float(data[1]) index["High"] = float(data[2]) index["Low"] = float(data[3]) index["Close"] = float(data[4]) except ValueError: # Do nothing return None return index # for using VIX futures settle in calc. ratios like VIX/VIX1 class QuandlFuture(PythonQuandl): '''Custom quandl data type for setting customized value column name. Value column is used for the primary trading calculations and charting.''' def __init__(self): # Define ValueColumnName: cannot be None, Empty or non-existant column name # If ValueColumnName is "Close", do not use PythonQuandl, use Quandl: # self.AddData[QuandlFuture](self.VIX1, Resolution.Daily) self.ValueColumnName = "Settle"
# REF. https://www.quantconnect.com/forum/discussion/6931/from-research-to-production-long-short-term-memory from MyLSTM import MyLSTM, LOOKBACK_DAYS, FEATURE_LEN from my_custom_data import CboeVix, CboeVxV import numpy as np import pandas as pd initial_lookback = LOOKBACK_DAYS*30 training_lookback = LOOKBACK_DAYS*10 predict_lookback = FEATURE_LEN*10 # look back seems like needs to be multiplied by a factor depend on number of symbols? class MultidimensionalHorizontalFlange(QCAlgorithm): def Initialize(self): # November 12, 2007 vxv +4 #self.SetStartDate(2014, 12, 1) # ^^^ Algo doesn't really trade. Likely bug, need research to debug # PSR: 59%, win loss rate: 100/0 is just wrong... self.SetStartDate(2019, 1, 1) # cherry picked date range...for wow factor. # PSR: 84% , win loss rate: 75/25. self.SetCash(10000) # Set Strategy Cash self.SetBrokerageModel(AlphaStreamsBrokerageModel()) self.SetExecution(ImmediateExecutionModel()) #self.SetPortfolioConstruction(EqualWeightingPortfolioConstructionModel()) self.UniverseSettings.Resolution = Resolution.Daily #self.SetUniverseSelection(LiquidETFUniverse()) self.short_volatility = self.AddEquity('SPY', Resolution.Daily).Symbol self.long_volatility = self.AddEquity('SHY', Resolution.Daily).Symbol self.spy = self.AddEquity('SPY', Resolution.Daily).Symbol #linked to my_custom_data.py self.vix = self.AddData(CboeVix, "VIX").Symbol self.vxv = self.AddData(CboeVxV, "VXV").Symbol #Another method #self.vix = self.AddData(CBOE, "VIX", Resolution.Daily).Symbol #self.vxv = self.AddData(CBOE, "VIX3M", Resolution.Daily).Symbol #self.vix = self.AddData(CBOE, "VIX").Symbol #self.vxv = self.AddData(CBOE, "VXV").Symbol self.SetWarmUp(timedelta(LOOKBACK_DAYS)) self.models = { self.spy:None, } self.macro_symbols = { 'Bull':self.spy, } # Use Train() method to avoid runtime error self.Train(self.TrainMyModel) self.Train(self.DateRules.MonthEnd(), self.TimeRules.At(8,0), self.TrainMyModel) # Schedule prediction and plotting self.AddEquity('SPY') self.Schedule.On(self.DateRules.EveryDay(self.spy), self.TimeRules.AfterMarketOpen(self.spy, 5), self.Predict) #self.Schedule.On(self.DateRules.EveryDay(self.spy), self.TimeRules.BeforeMarketClose(self.spy, 5), self.ClosePosition) self.Schedule.On(self.DateRules.EveryDay(self.spy), self.TimeRules.BeforeMarketClose(self.spy, 5), self.PlotMe) # Create custom charts prediction = Chart('Prediction Plot') prediction.AddSeries(Series('Actual Bull', SeriesType.Line, 0)) prediction.AddSeries(Series('Predicted Bull', SeriesType.Line, 0)) prediction.AddSeries(Series('Actual Bear', SeriesType.Line, 1)) prediction.AddSeries(Series('Predicted Bear', SeriesType.Line, 1)) self.ready = False def get_data(self,lookback): history = self.History([self.spy,self.vix,self.vxv], lookback, Resolution.Daily) df = pd.DataFrame() #df['date'] = history.loc[self.spy].index df['SPY'] = history.loc[self.spy].close df['VIX'] = history.loc[self.vix].close df['VXV'] = history.loc[self.vxv].close return df def TrainMyModel(self): qb = self for key, symbol in self.macro_symbols.items(): if self.models[symbol] is None: df = self.get_data(initial_lookback) else: df = self.get_data(training_lookback) self.Log('data {}...train'.format(df.shape)) if df.shape[0] < predict_lookback: # dont really have that much training_lookback, nor initial_lookback self.ready = False continue self.Log('shape {}'.format(df.shape)) # Build model layers if self.models[symbol] is None: # Initialize LSTM class instance lstm = MyLSTM() # Prepare data features_set, labels, stratify = lstm.ProcessData(df) # Create model lstm.CreateModel() # Fit model lstm.FitModel(features_set, labels, stratify) # Add LSTM class to dictionary to store later self.models[symbol] = lstm self.ready = True self.Log('training done') else: lstm = self.models[symbol] # uncomment for real test, might take a while or timeout... #features_set, labels, stratify = lstm.ProcessData(df) #lstm.FitModel(features_set, labels) self.Log('no training done') # close position at the day. def ClosePosition(self): self.Log('closing position') self.SetHoldings([PortfolioTarget(self.short_volatility, 0.0)]) self.SetHoldings([PortfolioTarget(self.long_volatility, 0.0)]) def Predict(self): delta = {} qb = self for key, symbol in self.macro_symbols.items(): self.Log('predict') self.Log('ready is {}'.format(self.ready)) if self.ready is False: continue self.Log('fetch data.') # Fetch history df = self.get_data(predict_lookback) self.Log('data {}...predict'.format(df.shape)) if df.shape[0] < predict_lookback: raise ValueError('not enough data {}'.format(df.shape)) continue # Fetch LSTM class lstm = self.models[symbol] # Predict predictions = lstm.PredictFromModel(df) # Grab latest prediction and calculate if predict symbol to go up or down delta[key] = predictions # Plot prediction self.Plot('Prediction Plot', f'Predicted {key}', predictions) #confidence = np.clip(np.abs(predictions-0.5)/0.10,0,1) #insight = Insight.Price(symbol, timedelta(1), InsightDirection.Up if predictions > 0.5 else InsightDirection.Down, confidence) confidence = np.clip(np.abs(predictions)/0.10,0,1) insight = Insight.Price(symbol, timedelta(1), InsightDirection.Up if predictions > 0.0 else InsightDirection.Down, confidence) self.EmitInsights(insight) #if predictions > 0.5: if predictions > 0.0: self.Log('Long!') self.SetHoldings([PortfolioTarget(self.short_volatility, 1.0)]) self.SetHoldings([PortfolioTarget(self.long_volatility, 0.0)]) else: self.Log('Short!') self.SetHoldings([PortfolioTarget(self.short_volatility, 0.0)]) self.SetHoldings([PortfolioTarget(self.long_volatility, 0.5)]) def PlotMe(self): # Plot current price of symbols to match against prediction for key, symbol in self.macro_symbols.items(): up = 1.0 if (self.Securities[symbol].Close-self.Securities[symbol].Open) > 0 else 0.0 self.Plot('Prediction Plot', f'Actual {key}', up) self.Plot('Prediction Plot', f'Actual {key}', self.Securities[symbol].Price)
# https://stackoverflow.com/questions/38714959/understanding-keras-lstms?rq=1 SEED = 42 import os import random as rn import numpy as np from tensorflow import set_random_seed os.environ['PYTHONHASHSEED']=str(SEED) np.random.seed(SEED) set_random_seed(SEED) rn.seed(SEED) import pandas as pd from sklearn.preprocessing import MinMaxScaler from keras.models import load_model from keras.optimizers import Adam from keras.layers import LSTM from keras.layers import Dense from keras.layers import Dropout, BatchNormalization from keras.layers import LeakyReLU from keras.models import Sequential from sklearn.model_selection import train_test_split from keras import backend as K K.clear_session() LOOKBACK_DAYS = 1200 FEATURE_LEN = 40 #400 FEATURE_DIM = 2 def get_model(): #input_shape = (features_set.shape[1],features_set.shape[2]) input_shape = (FEATURE_LEN,FEATURE_DIM) feature_len = input_shape[0] dropout_rate = 0.2 drop = True norm = False model = Sequential() model.add(LSTM(units=feature_len, return_sequences=True, input_shape=input_shape)) if drop: model.add(Dropout(rate=dropout_rate)) if norm: model.add(BatchNormalization()) model.add(LeakyReLU()) model.add(LSTM(units=feature_len, return_sequences=True)) if drop: model.add(Dropout(rate=dropout_rate)) if norm: model.add(BatchNormalization()) model.add(LeakyReLU()) model.add(LSTM(units=int(feature_len/2), return_sequences=True)) if drop: model.add(Dropout(rate=dropout_rate)) if norm: model.add(BatchNormalization()) model.add(LeakyReLU()) model.add(LSTM(units=int(feature_len/2))) if drop: model.add(Dropout(rate=dropout_rate)) if norm: model.add(BatchNormalization()) model.add(LeakyReLU()) model.add(Dense(units=10)) if drop: model.add(Dropout(rate=dropout_rate)) if norm: model.add(BatchNormalization()) model.add(LeakyReLU()) model.add(Dense(units=10)) if drop: model.add(Dropout(rate=dropout_rate)) if norm: model.add(BatchNormalization()) model.add(LeakyReLU()) model.add(Dense(units=1,activation='linear')) return model class MyLSTM: def __init__(self): self.model = None self.scaler = MinMaxScaler(feature_range = (0, 1)) self.feature_len = FEATURE_LEN self.feature_dim = FEATURE_DIM self.lookback_num = LOOKBACK_DAYS def _transform(self,df,shift=True): df['SPY_RET'] = np.log(df['SPY']).diff() df['HV'] = df['SPY_RET'].rolling(10).std() * np.sqrt(252) df['HV_MA'] = df['HV'].rolling(5).mean() df['VRP'] = df['VIX'] - df['HV_MA']*100 df['VV'] = df['VIX']/df['VXV'] if shift: df['VRP'] = df['VRP'].shift(1) df['VV'] = df['VV'].shift(1) df = df.dropna() #if df.shape[0] < LOOKBACK_DAYS: # raise ValueError('No Data! df shape {}'.format(df.shape)) return df def ProcessData(self, df): df = self._transform(df,shift=True) data = df[['SPY_RET','VRP','VV']].values # build dataset features_set = [] labels = [] stratify = [] for i in range(self.feature_len, data.shape[0]): tmp_ret = data[i,0] tmp_c = 1.0 if tmp_ret > 0 else 0.0 #tmp_y = [tmp_c] tmp_y = [tmp_ret] tmp_x = data[i-self.feature_len:i, 1:] features_set.append(tmp_x) stratify.append(tmp_c) # wishful thinking for predicting 1-day direction, but still fun. labels.append(tmp_y) features_set, labels, stratify = np.array(features_set), np.array(labels), np.array(stratify) if features_set.shape[1] != self.feature_len: raise ValueError('Debug {} {} {} {} {}'.format(df.shape,data.shape,features_set.shape, labels.shape, np.unique(labels))) return features_set, labels, stratify def CreateModel(self): lr = 0.001 mom = 0.9 # Create Model self.model = get_model() self.opt = Adam(lr,mom) #self.model.compile(optimizer=self.opt,loss='binary_crossentropy',metrics=['acc']) self.model.compile(optimizer=self.opt,loss='mse',metrics=['acc']) def FitModel(self, features_set, labels, stratify): epochs = 3 batch_size = 32 x, y= features_set, labels X_train, X_test, y_train, y_test = train_test_split(x, y, test_size=0.33, random_state=42,stratify=stratify) self.model.fit(X_train, y_train, epochs=epochs, batch_size=batch_size,validation_data=(X_test,y_test)) def PredictFromModel(self, df): df = self._transform(df,shift=True) # shift set to True to avoid look ahead?? df = df[['VRP','VV']].values test_inputs = df[-1*self.feature_len:,:] test_inputs = np.expand_dims(test_inputs,axis=0) if test_inputs.shape[1] != self.feature_len: raise ValueError('Prdict... Debug {}'.format(df.shape)) predictions = self.model.predict(test_inputs) return predictions[0][0]