Overall Statistics |
Total Trades 6807 Average Win 0.28% Average Loss -0.23% Compounding Annual Return 5.040% Drawdown 12.100% Expectancy 0.062 Net Profit 48.200% Sharpe Ratio 0.704 Loss Rate 51% Win Rate 49% Profit-Loss Ratio 1.19 Alpha 0.064 Beta -0.608 Annual Standard Deviation 0.074 Annual Variance 0.005 Information Ratio 0.433 Tracking Error 0.074 Treynor Ratio -0.085 Total Fees $13416.29 |
import math import bisect import operator from sklearn.preprocessing import StandardScaler from sklearn.model_selection import train_test_split import random import talib as tb import numpy as np import pandas as pd import sklearn as sn from sklearn.neighbors import KNeighborsRegressor as KNR from sklearn.linear_model import LinearRegression as LR from sklearn.tree import DecisionTreeRegressor as DTR from sklearn.ensemble import RandomForestRegressor as RFR from clr import AddReference from scipy.optimize import brute from sklearn.model_selection import train_test_split as TTS from sklearn.metrics import mean_absolute_error as mae from sklearn.metrics import mean_squared_error as mse AddReference("System") AddReference("QuantConnect.Algorithm") AddReference("QuantConnect.Common") from System import * from QuantConnect import * from QuantConnect.Algorithm import * seed = 1 random.seed(seed) np.random.seed(seed) class Model(): def __init__(self): # len of hitory self.eval_lookback = 500 self.train_lookback = 3000 self.n_features = 5 self.init = 0 self.warmup_count = self.train_lookback + self.eval_lookback self.ttrig = 1 def func(self,x): if np.count_nonzero(x) < 2 or np.count_nonzero(x) > 5: return 10000.0 reg = KNR(n_neighbors=5, weights='distance') X,Y = self.indicators(x) X_train, X_test, Y_train, Y_test = TTS(X, Y, test_size=0.33,shuffle=False) scaler_x = StandardScaler() X_train = scaler_x.fit_transform(X_train) X_test = scaler_x.transform(X_test) reg.fit(X_train,Y_train) Y_pred = reg.predict(X_test) f1 = Y_test >= 0 f2 = Y_pred >= 0 f3 = Y_test < 0 f4 = Y_pred < 0 both_positive = f1 & f2 both_neg = f3 & f4 Y_pred[both_positive] = 0 Y_test[both_positive] = 0 Y_pred[both_neg] = 0 Y_test[both_neg] = 0 sharpe = mae(Y_test,Y_pred) return sharpe def indicators(self, x, how='fit'): opn = self.market[:,0] high = self.market[:,1] low = self.market[:,2] close = self.market[:,3] vol = self.market[:,4] df_xy = np.zeros((self.market.shape[0],10),float) if x[0] != 0: df_xy[:,0] = tb.CCI(high, low, close, timeperiod=x[0]) if x[1] != 0: df_xy[:,1] = tb.AROONOSC(high,low, timeperiod=x[1]) if x[2] != 0: df_xy[:,2] = tb.SMA(close, timeperiod=x[2])/close if x[3] != 0: df_xy[:,3] = tb.SMA(close, timeperiod = x[3])/close if x[4] != 0: df_xy[:,4] = tb.ATR(high, low, close, timeperiod=x[4]) if x[5] != 0: df_xy[:,5] = tb.NATR(high, low, close, timeperiod=x[5]) if x[6] != 0: df_xy[:,6] = tb.CCI(high, low, close, timeperiod=x[6]) if x[7] != 0: df_xy[:,7] = tb.MFI(high, low, close, vol, timeperiod=x[7]) if x[8] != 0: df_xy[:,8] = tb.ATR(high, low, close, timeperiod=x[8]) if x[9] != 0: df_xy[:,9] = tb.ATR(high, low, close, timeperiod=x[9]) y_period = 200 y = (tb.SMA(close,timeperiod=y_period)[y_period:]-close[:-y_period])/close[:-y_period] y = y[500:] y = y.reshape(-1,1) x = df_xy[-y.shape[0]-y_period:-y_period,:] x[abs(x)>1e5] = 0 y[abs(y)>1] = 0 if how == 'predict': return df_xy[-1,:].reshape(1,-1) return x,y def find_solution(self): step = 250 reg = KNR() ranges = (slice(0, 500, step),) * 10 for i in range(7): x0,fval,grid,Jout = brute(self.func, ranges=ranges, disp=True, finish=None, full_output=True) step = int(step/1.5) ranges = ((np.max([x0[0]-step,0]),np.min([x0[0]+step,500]),step), (np.max([x0[1]-step,0]),np.min([x0[1]+step,500]),step), (np.max([x0[2]-step,0]),np.min([x0[2]+step,500]),step), (np.max([x0[3]-step,0]),np.min([x0[3]+step,500]),step), (0,1,2), (np.max([x0[5]-step,0]),np.min([x0[5]+step,500]),step), (np.max([x0[6]-step,0]),np.min([x0[6]+step,500]),step), (np.max([x0[7]-step,0]),np.min([x0[7]+step,500]),step), (0,1,2), (0,1,2)) X,Y = self.indicators(x0) scaler = StandardScaler() X = scaler.fit_transform(X) reg.fit(X,Y) return x0, reg, scaler def preprocessing_market(self): self.market = np.zeros((np.array(list(self.hist_open), float).shape[0],5),float) self.market[:,0] = np.array(list(self.hist_open)[::-1], float) self.market[:,1] = np.array(list(self.hist_high)[::-1], float) self.market[:,2] = np.array(list(self.hist_low)[::-1], float) self.market[:,3] = np.array(list(self.hist_close)[::-1], float) self.market[:,4] = np.array(list(self.hist_vol)[::-1], float) class Position(): def __init__(self): self.max_position = 4.0 self.num_of_stds = 14.0 self.price = 0.0 self.position = 0.0 self.volatility = 0.0 self.current_price = 0.0 self.pl_fac = 1.0 def manage(self): if self.position != 0.0: sign = abs(self.position)/self.position if (self.price+self.num_of_stds*self.volatility < self.current_price and self.position>0.0): self.price = self.current_price if abs(self.position) < self.max_position: self.position += 1.0 return 1.0 else: return 0.0 if (self.price-self.num_of_stds*self.volatility > self.current_price and self.position<0.0): self.price = self.current_price if abs(self.position) < self.max_position: self.position -= 1.0 return 1.0 else: return 0.0 if (self.price-self.volatility*(self.num_of_stds)/self.pl_fac > self.current_price and self.position>0): return -1.0 if (self.price+self.volatility*(self.num_of_stds)/self.pl_fac < self.current_price and self.position<0): return -1.0 return 0.0 class BasicTemplateAlgorithm(QCAlgorithm): def Initialize(self): self.model = Model() self.position1 = Position() self.cash = 100000 self.SetStartDate(2010,1,1) #Set Start Date self.SetEndDate(2018,1,1) #Set End Date self.SetCash(self.cash) #Set Strategy Cash self.multiplier = 100.0 self.init_coef = 0.6 # initial trade size - fraction of the deposit self.add_coef = 0.2 # position management trade size - fraction of the deposit # Find more symbols here: http://quantconnect.com/data self.symbol = "SPY" self.model.symbol = self.symbol self.granularity = Resolution.Minute self.position_size = 0.6 self.HighBar = RollingWindow[float](self.model.warmup_count) self.LowBar = RollingWindow[float](self.model.warmup_count) self.OpenBar = RollingWindow[float](self.model.warmup_count) self.CloseBar = RollingWindow[float](self.model.warmup_count) self.VolBar = RollingWindow[float](self.model.warmup_count) self.SetBrokerageModel(BrokerageName.InteractiveBrokersBrokerage, AccountType.Margin) mm = self.AddEquity(self.symbol, self.granularity) mm.MarginModel = PatternDayTradingMarginModel() self.SetWarmUp(self.model.warmup_count) self.consolidator = TradeBarConsolidator(5) self.consolidator.DataConsolidated += self.OnDataConsolidated self.SubscriptionManager.AddConsolidator(self.symbol, self.consolidator) sPlot = Chart('Strategy Equity') self.atr_slow = self.ATR(self.symbol, 160, MovingAverageType.Simple, Resolution.Minute); self.atr_fast = self.ATR(self.symbol, 30, MovingAverageType.Simple, Resolution.Minute); #self.filter1.volatility = self.atr_fast #self.filter1.current_price = float(self.Securities[self.symbol].Price) self.Schedule.On(self.DateRules.MonthStart(self.symbol), self.TimeRules.AfterMarketOpen(self.symbol), \ Action(self.reset_train)) self.Schedule.On(self.DateRules.EveryDay(self.symbol), self.TimeRules.BeforeMarketClose(self.symbol,20), \ Action(self.liqui)) self.counter = 0 def liqui(self): self.Liquidate(self.symbol) self.position1.position = 0.0 def OnDataConsolidated(self, sender, TradeBar): try: self.HighBar.Add(float(TradeBar.High)) self.LowBar.Add(float(TradeBar.Low)) self.OpenBar.Add(float(TradeBar.Open)) self.CloseBar.Add(float(TradeBar.Close)) self.VolBar.Add(float(TradeBar.Volume)) except: self.Debug('Failed to retrieve the quotes') self.counter += 1 if self.counter >= self.model.warmup_count: self.model.ttrig = 1 stock_coef = self.cash/float(self.Securities[self.symbol].Price) self.model.hist_open = self.OpenBar self.model.hist_close = self.CloseBar self.model.hist_high = self.HighBar self.model.hist_low = self.LowBar self.model.hist_vol = self.VolBar self.model.preprocessing_market() if self.model.market.shape[0] < self.model.warmup_count: return if self.model.ttrig == 1: self.x0,self.reg,self.scaler = self.model.find_solution() self.model.ttrig = 0 self.counter = 0 X_current = self.model.indicators(self.x0,how='predict') self.Debug(str(self.x0)) X_curr_scaled = self.scaler.transform(X_current) prediction = self.reg.predict(X_curr_scaled) holdings = float(self.Portfolio[self.symbol].Quantity) self.position1.volatility = float(self.atr_fast.Current.Value) self.position1.current_price = float(self.Securities[self.symbol].Price) #position management if self.position1.position != 0.0 and holdings != 0.0: self.position1.volatility = float(self.atr_fast.Current.Value) action = self.position1.manage() if self.position1.position > 0: direc = OrderDirection.Buy else: direc = OrderDirection.Sell bp = float(self.Portfolio.GetMarginRemaining(self.symbol)) if self.position1.position < 0: ordr = -1 else: ordr = 1 if action == 1.0 and abs(bp) > abs(stock_coef*self.add_coef)*1.5*self.position1.current_price: self.MarketOrder(self.symbol, ordr*stock_coef*self.add_coef) self.position1.price = float(self.Securities[self.symbol].Price) if action == -1.0: self.position1.position = 0.0 self.Liquidate(self.symbol) else: return bp = float(self.Portfolio.GetMarginRemaining(self.symbol)) if self.position1.position == 0.0 and holdings == 0.0: if prediction > 0.0 and abs(bp) > stock_coef*self.init_coef*self.position1.current_price: self.position1.position = 1.0 self.MarketOrder(self.symbol, stock_coef*self.init_coef) self.position1.price = float(self.Securities[self.symbol].Price) self.position1.current_price = float(self.Securities[self.symbol].Price) self.position1.volatility = float(self.atr_fast.Current.Value) if prediction < 0.0 and abs(bp) > stock_coef*self.init_coef*self.position1.current_price: self.position1.position = -1.0 self.MarketOrder(self.symbol, -stock_coef*self.init_coef) self.position1.volatility = float(self.atr_fast.Current.Value) self.position1.price = float(self.Securities[self.symbol].Price) self.position1.current_price = float(self.Securities[self.symbol].Price) if float(self.Portfolio[self.symbol].Quantity) != 0 and self.position1.position ==0: self.position1.position = abs(float(self.Portfolio[self.symbol].Quantity))/float(self.Portfolio[self.symbol].Quantity) self.Debug('ACHTUNG!!!!') self.previous = self.Time def reset_train(self): self.model.ttrig = 1