Overall Statistics |
Total Trades 102 Average Win 1.35% Average Loss -1.03% Compounding Annual Return 13.148% Drawdown 13.400% Expectancy 0.311 Net Profit 16.723% Sharpe Ratio 0.922 Probabilistic Sharpe Ratio 43.467% Loss Rate 43% Win Rate 57% Profit-Loss Ratio 1.31 Alpha 0.095 Beta -0.004 Annual Standard Deviation 0.103 Annual Variance 0.011 Information Ratio -0.309 Tracking Error 0.276 Treynor Ratio -22.752 Total Fees $78.89 Estimated Strategy Capacity $690000.00 |
import datetime, pytz class VirtualBlackDinosaur(QCAlgorithm): def Initialize(self): self.SetStartDate(2020, 1, 1) # Set Start Date self.SetEndDate(2021, 4, 1) self.SetCash(1000) # Set Strategy Cash self.ticker = "BTCUSD" self.BTC = self.AddCrypto(self.ticker, Resolution.Minute).Symbol #Commission self.Securities[self.ticker].SetFeeModel(CustomFeeModel()) #Indicators self.rsiFourteen = RelativeStrengthIndex(14) self.rsiSeven = RelativeStrengthIndex(7) self.rsiTwo = RelativeStrengthIndex(2) self.emaCinquanta = ExponentialMovingAverage(50) self.emaCento = ExponentialMovingAverage(100) self.emaDuecento = ExponentialMovingAverage(200) self.wEmaFast = ExponentialMovingAverage(9) self.wEmaSlow = ExponentialMovingAverage(20) #Conolidators consolidatorWeek = self.Consolidate(self.BTC, timedelta(minutes=10080), self.WeeklyBarHandler) consolidatorFifteen = self.Consolidate(self.BTC, timedelta(minutes=15), self.FifteenMinuteBarHandler) consolidatorFive = self.Consolidate(self.BTC, timedelta(minutes=5), self.FiveMinuteBarHandler) #Register self.RegisterIndicator(self.ticker, self.rsiFourteen, consolidatorFifteen) self.RegisterIndicator(self.ticker, self.rsiTwo, consolidatorFifteen) self.RegisterIndicator(self.ticker, self.emaCento, consolidatorFifteen) self.RegisterIndicator(self.ticker, self.emaDuecento, consolidatorFifteen) self.RegisterIndicator(self.ticker, self.emaCinquanta, consolidatorFifteen) self.RegisterIndicator(self.ticker, self.rsiSeven, consolidatorFive) self.RegisterIndicator(self.ticker, self.wEmaFast, consolidatorWeek) self.RegisterIndicator(self.ticker, self.wEmaSlow, consolidatorWeek) #Rolling Window self.rsiSevenWindow = RollingWindow[float](2) self.rsiFourWindow = RollingWindow[float](2) self.rsiTwoWindow = RollingWindow[float](2) self.emaCinquantaWindow = RollingWindow[float](2) self.emaCentoWindow = RollingWindow[float](2) self.emaDuecentoWindow = RollingWindow[float](2) self.fastWindow = RollingWindow[float](2) self.slowWindow = RollingWindow[float](2) self.loWindow = RollingWindow[float](1) #Warm up self.SetWarmUp(46000) #Variable self.buyMarketTicket = None self.newLow = False self.now = None self.BuyTime = None def FifteenMinuteBarHandler(self,consolidated): self.rsiFourWindow.Add(self.rsiFourteen.Current.Value) self.rsiTwoWindow.Add(self.rsiTwo.Current.Value) self.emaCentoWindow.Add(self.emaCento.Current.Value) self.emaDuecentoWindow.Add(self.emaDuecento.Current.Value) self.emaCinquantaWindow.Add(self.emaCinquanta.Current.Value) def FiveMinuteBarHandler(self,consolidated): self.rsiSevenWindow.Add(self.rsiSeven.Current.Value) if self.newLow is True: if consolidated.Low < self.loWindow[0] and (self.buyMarketTicket.Time < self.now < (self.buyMarketTicket.Time + datetime.timedelta(hours=4))): self.loWindow.Add(consolidated.Low) self.Log(consolidated.Low) self.Log(self.now) self.Log(self.buyMarketTicket.Time) def WeeklyBarHandler(self,consolidated): self.fastWindow.Add(self.wEmaFast.Current.Value) self.slowWindow.Add(self.wEmaSlow.Current.Value) def OnData(self, data): if not self.rsiSevenWindow.IsReady: return if not self.rsiFourWindow.IsReady: return if not self.rsiTwoWindow.IsReady: return if not self.emaCentoWindow.IsReady: return if not self.emaDuecentoWindow.IsReady: return if not self.fastWindow.IsReady: return if not self.slowWindow.IsReady: return if self.IsWarmingUp: return self.now = pytz.utc.localize(self.Time) if self.emaCentoWindow[0] > self.emaDuecentoWindow[0] and self.rsiFourWindow[0] < 30 and self.rsiSevenWindow[0] < 20 and self.buyMarketTicket == None and self.fastWindow[0] > self.slowWindow[0]: dollarShare = self.CalculateOrderQuantity(self.BTC, 1) self.buyMarketTicket = self.MarketOrder(self.BTC, dollarShare) #Add new Low self.loWindow.Add(data[self.BTC].Low) #Set Variable self.newLow = True if (self.rsiTwoWindow[0] > 90 or self.rsiSevenWindow[0] >= 70 or (self.loWindow.IsReady and self.buyMarketTicket is not None and ( self.now > (self.buyMarketTicket.Time + datetime.timedelta(hours=4))) and data[self.BTC].Close < self.loWindow[0])) and self.buyMarketTicket is not None: Held = self.Portfolio[self.BTC].Quantity self.MarketOrder(self.BTC, -Held) self.buyMarketTicket = None self.newLow = False class CustomFeeModel: def GetOrderFee(self, parameters): fee = (((parameters.Security.Price * parameters.Order.AbsoluteQuantity) * 0.075)/100) return OrderFee(CashAmount(fee, 'USD'))
''' Supervised classifier to handle IN/OUT predictions ''' from scipy.signal import savgol_filter as sg from sklearn.tree import DecisionTreeRegressor as dtr from sklearn import preprocessing from sklearn.decomposition import FastICA from sklearn.model_selection import cross_val_score import pandas as pd import numpy as np import scipy.stats as stats from sklearn.model_selection import RandomizedSearchCV as opt from sklearn.cluster import KMeans as kmean from sklearn.ensemble import RandomForestClassifier as clf class IOalphaModel: def __init__(self, algo, signalPeriod = 1, bounceThreshold = 0.01, resolution = Resolution.Daily): self.ica = FastICA( n_components=None, algorithm='parallel', whiten=True, fun='logcosh', fun_args=None, max_iter=200, tol=0.0001, w_init=None, random_state=0 ) self.params = { 'n_estimators' : stats.randint(2,300), 'ccp_alpha' : stats.expon(scale=0.01), 'bootstrap' : [True, False] } learner = clf() self.model = opt( learner, self.params, n_iter=10, scoring=None, n_jobs=None, refit=True, cv=None, verbose=0, pre_dispatch='2*n_jobs', random_state=1, return_train_score=False) self.cluster = kmean( n_clusters=8, init='k-means++', n_init=10, max_iter=300, tol=0.0001, verbose=0, random_state=1, copy_x=True, algorithm='auto' ) #self.resolution = resolution #self.insightPeriod = Time.Multiply(Extensions.ToTimeSpan(resolution), signalPeriod) # not used #self.bounceThreshold = bounceThreshold # not used self.algo = algo self.isTrained = False self.mu = -1 self.sigma = -1 def TrainModel(self, data, window_length=7, polyorder=5): copy = data.copy() if self.algo.IsWarmingUp: return # preprocess inputs y = copy.pop('SPY') X = copy X_1, X_2, X_3, y_1 = self.Preprocess(X, y, window_length, polyorder) # need to shift y left one timestep so we predict y(n+1) with X(n). Then drop last datapoint from X y_transformed = y_1[1:] y_transformed = self.Discretize(y_transformed) self.mu = np.mean(y_transformed) self.sigma = np.std(y_transformed) #X_transformed = self.cluster.fit_transform(X_transformed) X_transformed = np.vstack([ X_1, X_2, X_3, y_1]) for x in X_transformed[:]: x = self.Discretize(x) X_transformed = np.transpose(X_transformed)[:-1] self.model.fit(X_transformed, y_transformed) scores = cross_val_score(self.model, X_transformed, y_transformed, cv=3) # moment of truth self.algo.Debug(f"accuracy:{scores.mean()}, sigma:{scores.std()}") self.isTrained = True def Preprocess(self, X, y=[], window_length=7, polyorder=5): # apply savitzy-golay filter and transform y->y', X->X',X'',X''' X_1 = [sg(x, window_length, polyorder, deriv=1, delta=1.0, axis=-1, mode='interp', cval=0.0) for x in X.values()] X_2 = [sg(x, window_length, polyorder, deriv=2, delta=1.0, axis=-1, mode='interp', cval=0.0) for x in X.values()] X_3 = [sg(x, window_length, polyorder, deriv=3, delta=1.0, axis=-1, mode='interp', cval=0.0) for x in X.values()] if len(y) > 0: y_1 = sg(y, window_length, polyorder, deriv=1, delta=1.0, axis=-1, mode='interp', cval=0.0) return X_1, X_2, X_3, y_1 return X_1, X_2, X_3, y def Update(self, X): Xcopy = X.copy() if not self.isTrained: self.TrainModel(Xcopy) if 'SPY' in X.keys(): y = Xcopy.pop("SPY") X1, X2, X3, y1 = self.Preprocess(Xcopy, y) #TODO: here and in train method, this next bit goes in preprocess self.mu = np.mean(y1) self.sigma = np.std(y1) X_transformed = np.vstack([ X1, X2, X3, y1 ]) for x in X_transformed[:]: x = self.Discretize(x) X_transformed = np.transpose(X_transformed) return self.InverseZ( self.model.predict(X_transformed) ) def Discretize(self, x): mu = np.mean(x) std = np.std(x) x = [(i-mu)/std for i in x] coefs = [-3, -2, -1, -1/2, 0, 1/2, 1, 2, 3] bins = [c * std for c in coefs] #for i in sorted(bins): # self.algo.Debug(f'{i}') return np.digitize(x, bins, right=False) def InverseZ(self, Z_array): # get X return [self.sigma * z + self.mu for z in Z_array]