Overall Statistics |
Total Trades 352 Average Win 0.42% Average Loss -0.32% Compounding Annual Return 7.613% Drawdown 5.400% Expectancy 0.172 Net Profit 7.656% Sharpe Ratio 0.727 Probabilistic Sharpe Ratio 36.171% Loss Rate 49% Win Rate 51% Profit-Loss Ratio 1.30 Alpha 0 Beta 0 Annual Standard Deviation 0.076 Annual Variance 0.006 Information Ratio 0.727 Tracking Error 0.076 Treynor Ratio 0 Total Fees $71989.90 Estimated Strategy Capacity $3000000.00 Lowest Capacity Asset ATVI R735QTJ8XC9X Portfolio Turnover 16.04% |
from AlgorithmImports import * import numpy as np import pandas as pd from nltk.sentiment import SentimentIntensityAnalyzer from scipy.optimize import linprog from sklearn.ensemble import GradientBoostingRegressor class XGP(QCAlgorithm): def Initialize(self): # set up self.SetStartDate(2017, 1, 1) self.SetEndDate(2017, 3, 1) self.InitCash = 10000000 self.SetCash(self.InitCash) self.lookback = 25 # setting brokerage and reality modeling params self.SetBrokerageModel(BrokerageName.InteractiveBrokersBrokerage, AccountType.Margin) self.SetSecurityInitializer(lambda s : s.SetSlippageModel(VolumeShareSlippageModel())) # benchmarking against SPY self.MKT = self.AddEquity("SPY", Resolution.Daily).Symbol self.mkt = [] # manually keeping track of securities self.securities = [] # We're getting an error when this is removed self.weights = [] self.trained = True # Requesting data self.AddUniverseSelection(FineFundamentalUniverseSelectionModel(self.SelectCoarse, self.SelectFine)) self.UniverseSettings.Resolution = Resolution.Daily self.num_coarse_symbols = 40 self.num_fine_symbols = 7 # keeps 8 since python is zero-indexed # Train immediately self.Train(self.classifier_training) # Train every Sunday at 4am or first day of month (because new universe) self.Train(self.DateRules.Every(DayOfWeek.Sunday), self.TimeRules.At(4, 0), self.classifier_training) self.Train(self.DateRules.MonthStart(daysOffset = 0), self.TimeRules.At(4, 0), self.classifier_training) #self.Train(self.DateRules.MonthStart(daysOffset = 10), self.TimeRules.At(4, 0), self.classifier_training) #self.Train(self.DateRules.MonthStart(daysOffset = 20), self.TimeRules.At(4, 0), self.classifier_training) self.Schedule.On(self.DateRules.Every(DayOfWeek.Monday), self.TimeRules.AfterMarketOpen("SPY", 10), self.actions) self.Schedule.On(self.DateRules.MonthStart(daysOffset = 0), self.TimeRules.AfterMarketOpen("SPY", 10), self.actions) def SelectCoarse(self, coarse): """selecting CoarseFundamental objects based on criteria in paper""" if self.Time.day != 1: return Universe.Unchanged selected = [c for c in coarse if c.HasFundamentalData and c.Price > 5] sorted_by_dollar_volume = sorted(selected, key=lambda c: c.DollarVolume, reverse=True) return [c.Symbol for c in sorted_by_dollar_volume[:self.num_coarse_symbols]] def SelectFine(self, fine): """selecting FineFundamental objects based on our criteria""" if self.Time.day != 1: return Universe.Unchanged selected = [f for f in fine if f.ValuationRatios.PERatio < 100 and f.MarketCap > 300000000 and f.ValuationRatios.PEGRatio < 3 and f.OperationRatios.TotalDebtEquityRatio.Value < 2 and f.OperationRatios.CurrentRatio.Value > 1] sorted_by_pe_ratio = sorted(selected, key=lambda f: f.ValuationRatios.PERatio, reverse=True) return [f.Symbol for f in sorted_by_pe_ratio[:self.num_fine_symbols]] def OnSecuritiesChanged(self, changes): """triggers when Universe changes as result of filtering""" for security in changes.AddedSecurities: self.Debug(f"{self.Time}: Added {security}") for security in changes.RemovedSecurities: self.Debug(f"{self.Time}: Removed {security}") added = changes.AddedSecurities removed = changes.RemovedSecurities self.securities = list(set(self.securities).union(set(added)).difference(set(removed))) def OnData(self, data): return #if self.trained: # self.Liquidate() #Liquidate the whole portfolio # self.make_predictions() # self.LinOptProg() # self.Debug(self.weights) # a_securities = [s for s in self.securities] # for (security, wt) in zip(a_securities, [i[0] for i in self.weights]): # if wt != 0: # self.Debug(f"{security.Symbol}: {wt}") # self.SetHoldings(security.Symbol, wt) # self.trained = False def OnEndOfDay(self): # code here plots benchmark against our portfolio performance on the equity chart mkt_price = self.History(self.MKT, 2, Resolution.Daily)['close'].unstack(level= 0).iloc[-1] self.mkt.append(mkt_price) mkt_perf = self.InitCash * self.mkt[-1] / self.mkt[0] self.Plot('Strategy Equity', self.MKT, mkt_perf) def OnOrderEvent(self, orderEvent): """logs the details of an order""" self.Log(f'{orderEvent}') # ===================================================================================================================== # begin custom functions # ===================================================================================================================== def classifier_training(self): self.return_mods = [] self.quantile_mods_lg = [] self.quantile_mods_st = [] #active_securities = [s.Symbol.Value for s in self.securities] active_securities = [s.Symbol for s in self.securities] self.Log(f"Training Started at {self.Time}") for security in active_securities: data = self.get_all_data([security], training=True, backtesting=False) # get tickers try: y_reg = data["return"] X = data.drop(["direction", "return", "symbol"], axis = 1) (ret, qut_lg, qut_st) = self.gb_returns(X, y_reg) except: ret = "NoModel" qut_lg = "NoModel" qut_st = "NoModel" self.return_mods.append(ret) self.quantile_mods_lg.append(qut_lg) self.quantile_mods_st.append(qut_st) self.trained = True self.trained = True def make_predictions(self): self.returns = [] self.quantiles = [] act_securities = [s.Symbol for s in self.securities] for i in range(len(act_securities)): security = act_securities[i] data = self.get_all_data([security], training=False) data = data[data.index == data.index.max()] prediction_data = data.drop(["direction", "return", "symbol"], axis = 1) try: r_pred = self.return_mods[i].predict(prediction_data)[0] except: r_pred = 0 if r_pred > 0: q_pred = self.quantile_mods_lg[i].predict(prediction_data)[0] elif r_pred < 0: q_pred = self.quantile_mods_st[i].predict(prediction_data)[0] else: q_pred = 0 self.returns.append(r_pred) self.quantiles.append(q_pred) def gb_returns(self, X, y, security): """ Function to calculate expected returns and quantile loss """ mean_clf = GradientBoostingRegressor(n_estimators = 150, loss = "squared_error", criterion = "friedman_mse", learning_rate = 0.05, random_state = 585, n_iter_no_change = 20) mean_fit_out = mean_clf.fit(X,y) data = self.get_all_data([security], training=False) data = data[data.index == data.index.max()] prediction_data = data.drop(["direction", "return", "symbol"], axis = 1) direct = mean_fit_out.predict(prediction_data) if direct >= 0: quantile_clf = GradientBoostingRegressor(n_estimators = 150, loss = "quantile", alpha = 0.05, n_iter_no_change = 20, learning_rate = 0.05, criterion = "friedman_mse", random_state = 585) elif direct < 0: quantile_clf = GradientBoostingRegressor(n_estimators = 150, loss = "quantile", alpha = 0.95, n_iter_no_change = 20, learning_rate = 0.05, criterion = "friedman_mse", random_state = 585) quantile_fit_out = quantile_clf.fit(X,y) return (mean_fit_out, quantile_fit_out) def LinOptProg(self): """ Convex optimization Function """ self.weights = [] self.returns = np.array(self.returns).reshape(-1,1) self.quantiles = np.array(self.quantiles).reshape(-1,1) dirs = np.array([1 if d > 0 else 0 if d == 0 else -1 for d in self.returns]).reshape(-1,1) self.Debug(dirs) bounds = [(0, min(0.6, 2.5 / len(self.returns))) if d == 1 else (max(-0.6, -2.5 / len(self.returns)), 0) for d in dirs] A = np.array([-1*self.quantiles, dirs, -1*dirs]).squeeze() b = np.array([0.02, 1, 0]) res = linprog(-1*self.returns, A_ub = A, b_ub = b, bounds = bounds) if res.status == 0: self.weights = res.x.reshape(-1,1) else: self.Log("Optimization failed") # If optimization fails, give uniform weight 0 (buy nothing) self.weights = dirs * (1/len(returns)) del self.returns del self.quantiles def bollinger_bands(self, data, window=20, num_std=2): # Calculate the moving average data['MA'] = data['close'].rolling(window=window).mean() # Calculate the standard deviation data['STD'] = data['close'].rolling(window=window).std() # Calculate the Bollinger Bands data['Upper_BB'] = data['MA'] + (data['STD'] * num_std) data['Lower_BB'] = data['MA'] - (data['STD'] * num_std) return data def calculate_rsi(self,data, period=20): # Calculate the daily price changes (gains and losses) delta = data['close'].diff().dropna() # Separate gains and losses into their own series gains = delta.where(delta > 0, 0) losses = -delta.where(delta < 0, 0) # Calculate the average gain and average loss avg_gain = gains.ewm(com=period - 1, min_periods=period).mean() avg_loss = losses.ewm(com=period - 1, min_periods=period).mean() # Calculate the Relative Strength (RS) rs = avg_gain / avg_loss # Calculate the Relative Strength Index (RSI) rsi = 100 - (100 / (1 + rs)) data['rsi']=rsi return data def get_all_data(self, tickers, historical=True, training=False, backtesting=True): """ Gets historical data for training and prediction Parameters: ----------- tickers : list list of tickers to retrieve data historical : Bool, default True Flag to determine if we are training or backtesting; False if live trading training : Bool, default False If True, retrieves training data, a 90-day period. If performing predictions, False retrieves most recent day of data. For example, if called at 8 A.M., retrieves the previous trading days' data. backtesting : Bool, default True Flag to determine if we are backtesting or training Return: ------- self.dat : pd.DataFrame DataFrame containing data """ if historical: if backtesting: shift_factor = 30 # overshooting and select the maximum hist_lookback = 1 + shift_factor tiingo_lookback = 12 # in case of weekends? elif training: hist_lookback = self.lookback + 25 tiingo_lookback = self.lookback * 1.5 else: raise ValueError("Please train or backtest if using historical = True") else: shift_factor = 7 # needed so we can calculate lagged data hist_lookback = 1 + shift_factor tiingo_lookback = 1 + shift_factor # in case of weekends? full_data = pd.DataFrame() for symbol in tickers: # Get Price History history = self.History(symbol, hist_lookback) history = pd.DataFrame(history) # convert the historical data to a pandas DataFrame history['direction'] = np.where(history['close'] > history['open'], 1, 0) history['return']=history['close'].pct_change(periods=5) history = self.bollinger_bands(history) history = self.calculate_rsi(history) # Add relevant columns history['price_diff']=history["open"]-history["MA"] history['band_diff_up']=history["open"]-history["Upper_BB"] history['band_diff_lower']=history["open"]-history["Lower_BB"] # Add Tiingo Data data = self.AddData(TiingoNews, symbol).Symbol tiingo = self.History(data, int(tiingo_lookback), Resolution.Daily) if len(tiingo)!=0 and set(['description','publisheddate']).issubset(tiingo.columns): analyzer = SentimentIntensityAnalyzer() tiingo['polarity'] = tiingo['description'].dropna().apply(lambda x: analyzer.polarity_scores(x)) tiingo = pd.concat([tiingo.drop(['polarity'], axis=1), tiingo['polarity'].apply(pd.Series)], axis=1) #tiingo['sentiment'] = tiingo['compound'].apply(lambda x: 'positive' if x >0 else 'neutral' if x==0 else 'negative') tiingo = tiingo[[ 'publisheddate', 'compound']] tiingo['publisheddate'] = pd.to_datetime(tiingo['publisheddate'],utc=True).dt.date tiingo = tiingo.groupby(by=[ 'publisheddate'], as_index=False).sum() tiingo.rename(columns={'publisheddate' : 'time'}, inplace=True) tiingo.set_index('time',inplace=True) history = history.join(tiingo) lags = range(1,5) history=history.assign(**{ f'{col} (t-{lag})': history[col].shift(lag) for lag in lags for col in history }).dropna().drop(columns = ['close','high','low','volume'], errors='ignore') history['symbol'] = symbol.Value full_data=pd.concat([full_data, history]) self.Log(full_data) return full_data def value_at_risk(self, returns, weights, conf_level=0.05, num_days=30): """ Calculates the (absolute) value-at-risk of the portfolio. --------------------------------------------------- Parameters returns : pd.DataFrame periodic returns conf_level : float confidence level. 0.05 by default weights : np.array portfolio weights days : int number of days the VaR is calculated over. 30 by default """ cov_matrix = returns.cov() cov_matrix avg_return = returns.mean() portfolio_mean = avg_return.dot(weights) portfolio_stdev = np.sqrt(weights.T.dot(cov_matrix).dot(weights)) cutoff = norm.ppf(conf_level, portfolio_mean, portfolio_stdev) # n-Day VaR VaR = cutoff * np.sqrt(num_days) return VaR def cvar(self, returns, stdev, conf_level=0.05): """ Calculates the portfolio CVaR ------------------------------------ Parameters returns : pd.DataFrame portfolio returns stdev : portfolio standard deviation conf_level : float confidence level """ CVaR = conf_level**-1 * norm.pdf(conf_level) * stdev - returns return CVaR
from AlgorithmImports import * import numpy as np import pandas as pd from nltk.sentiment import SentimentIntensityAnalyzer from scipy.optimize import linprog from sklearn.ensemble import GradientBoostingRegressor class XGP(QCAlgorithm): def Initialize(self): # set up self.SetStartDate(2017, 1, 1) self.SetEndDate(2017, 2, 1) self.InitCash = 10000000 self.SetCash(self.InitCash) self.lookback = 25 # setting brokerage and reality modeling params self.SetBrokerageModel(BrokerageName.InteractiveBrokersBrokerage, AccountType.Margin) self.SetSecurityInitializer(lambda s : s.SetSlippageModel(VolumeShareSlippageModel())) # benchmarking against SPY self.MKT = self.AddEquity("SPY", Resolution.Daily).Symbol self.mkt = [] # manually keeping track of securities self.securities = [] # We're getting an error when this is removed self.weights = [] self.no_trade = True # Requesting data # small number of coarse / fine symbols for testing, eventually increase # self.AddUniverseSelection(ScheduledUniverseSelectionModel( # self.DateRules.Every(DayOfWeek.Sunday), # self.TimeRules.At(1,0), # FineFundamentalUniverseSelectionModel(self.SelectCoarse, self.SelectFine))) self.AddUniverseSelection(FineFundamentalUniverseSelectionModel(self.SelectCoarse, self.SelectFine)) self.UniverseSettings.Resolution = Resolution.Daily self.num_coarse_symbols = 40 self.num_fine_symbols = 4 # keeps 5 since python is zero-indexed # self._changes = None # Train immediately self.Train(self.classifier_training) # Train every Sunday at 4am or first day of month (because new universe) self.Train(self.DateRules.EveryDay(), self.TimeRules.At(4, 0), self.classifier_training) #self.Train(self.DateRules.Every(DayOfWeek.Sunday), self.TimeRules.At(4, 0), self.classifier_training) #self.Train(self.DateRules.MonthStart(daysOffset = 0), self.TimeRules.At(4, 0), self.classifier_training) def SelectCoarse(self, coarse): """selecting CoarseFundamental objects based on criteria in paper""" if self.Time.day != 1: return Universe.Unchanged selected = [c for c in coarse if c.HasFundamentalData and c.Price > 5] sorted_by_dollar_volume = sorted(selected, key=lambda c: c.DollarVolume, reverse=True) # filtering reduces run time, may want to choose a max number to select based on some criteria? return [c.Symbol for c in sorted_by_dollar_volume[:self.num_coarse_symbols]] def SelectFine(self, fine): """selecting FineFundamental objects based on our criteria""" if self.Time.day != 1: return Universe.Unchanged selected = [f for f in fine if f.ValuationRatios.PERatio < 100 and f.MarketCap > 300000000 and f.ValuationRatios.PEGRatio < 3 and f.OperationRatios.TotalDebtEquityRatio.Value < 2 and f.OperationRatios.CurrentRatio.Value > 1] # condition to select top n stocks sorted_by_pe_ratio = sorted(selected, key=lambda f: f.ValuationRatios.PERatio, reverse=True) return [f.Symbol for f in sorted_by_pe_ratio[:self.num_fine_symbols]] def OnSecuritiesChanged(self, changes): """triggers when Universe changes as result of filtering""" for security in changes.AddedSecurities: self.Debug(f"{self.Time}: Added {security}") for security in changes.RemovedSecurities: self.Debug(f"{self.Time}: Removed {security}") added = changes.AddedSecurities removed = changes.RemovedSecurities self.securities = list(set(self.securities).union(set(added)).difference(set(removed))) def OnData(self, data): # self.Log(f"{self.weights}") # if we have no changes, do nothing # if self._changes is None: # return # liquidate removed securities # for security in self._changes.RemovedSecurities: # if security.Invested: # self.Liquidate(security.Symbol) # we want 1/n allocation in each security in our universe # n = len(self._changes.AddedSecurities) # for security in self._changes.AddedSecurities: # self.SetHoldings(security.Symbol, 0.7/n) # self._changes = None if self.no_trade: self.make_predictions() self.LinOptProg() self.Debug(self.weights) a_securities = [s for s in self.securities] for (security, wt) in zip(a_securities, [i[0] for i in self.weights]): self.Liquidate(security.Symbol) if wt != 0: self.Debug(f"{security.Symbol}: {wt}") self.SetHoldings(security.Symbol, wt) self.no_trade = False def OnEndOfDay(self): # code here plots benchmark against our portfolio performance on the equity chart mkt_price = self.History(self.MKT, 2, Resolution.Daily)['close'].unstack(level= 0).iloc[-1] self.mkt.append(mkt_price) mkt_perf = self.InitCash * self.mkt[-1] / self.mkt[0] self.Plot('Strategy Equity', self.MKT, mkt_perf) # put stuff here to see if it's working in the algo #for i in self.securities: #self.Log(f"At end of day we have {i.Symbol.Value} in universe") self.no_trade = True def OnOrderEvent(self, orderEvent): """logs the details of an order""" self.Log(f'{orderEvent}') # ===================================================================================================================== # begin custom functions # ===================================================================================================================== def classifier_training(self): # self.direction_mods = [] self.return_mods = [] self.quantile_mods = [] # FIX THIS LATER, TEMPORARY ATTEMPT - Will active_securities = [s.Symbol.Value for s in self.securities] self.Debug(active_securities) self.Log(f"Training Started at {self.Time}") # self.Log(f"Length of Active Securities : {len(self.securities)}") self.dat = self.get_all_data(active_securities) # get tickers # self.Log(f"Shape of Training Data : {self.dat.shape}") for security in active_securities: # self.Log(f"Security: {security}") data = self.dat[self.dat["symbol"] == security] # self.Log(f"data shape : {data.shape}") try: # y_class = data["direction"] y_reg = data["return"] X = data.drop(["direction", "return", "symbol"], axis = 1) # Train the models # classf = self.gb_select(X, y_class) (ret, qut) = self.gb_returns(X, y_reg) except: # If the training fails, return No Model # classf = 'NoModel' ret = "NoModel" qut = "NoModel" # self.direction_mods.append(classf) self.return_mods.append(ret) self.quantile_mods.append(qut) # self.Log(f"Training Ended at {self.Time}") del self.dat def make_predictions(self): # self.directions = [] self.returns = [] self.quantiles = [] act_securities = [s.Symbol.Value for s in self.securities] self.dat = self.will_get_all_data(act_securities, training=False) for i in range(len(act_securities)): security = act_securities[i] data = self.dat[self.dat["symbol"] == security] data = data[data.index == data.index.max()] prediction_data = data.drop(["direction", "return", "symbol"], axis = 1) try: # c_pred = self.direction_mods[i].predict(prediction_data) r_pred = self.return_mods[i].predict(prediction_data) q_pred = self.quantile_mods[i].predict(prediction_data) except: r_pred = 0 q_pred = 0 # self.directions.append(c_pred) self.returns.append(r_pred[0]) self.quantiles.append(q_pred[0]) del self.dat del self.return_mods del self.quantile_mods # def gb_select(self, X, y): # """ # Gradient Boosting implementation for classification # """ # clf = GradientBoostingClassifier(n_estimators = 100, # learning_rate = 0.05, # max_depth = 6, # random_state = 1693) # clf_fit = clf.fit(X, y) # # return clf_fit def gb_returns(self, X, y): """ Function to calculate expected returns and quantile loss """ mean_clf = GradientBoostingRegressor(n_estimators = 100, loss = "squared_error", criterion = "friedman_mse", learning_rate = 0.05, random_state = 1693, n_iter_no_change = 20) quantile_clf = GradientBoostingRegressor(n_estimators = 100, loss = "quantile", alpha = 0.05, n_iter_no_change = 20, learning_rate = 0.05, criterion = "friedman_mse", random_state = 1693) quantile_fit_out = quantile_clf.fit(X,y) mean_fit_out = mean_clf.fit(X,y) return (mean_fit_out, quantile_fit_out) def LinOptProg(self): """ Convex optimization Function """ #if len(self.returns) == 0: # return self.weights = [] self.returns = np.array(self.returns).reshape(-1,1) self.quantiles = np.array(self.quantiles).reshape(-1,1) dirs = np.array([1 if d > 0 else 0 if d == 0 else -1 for d in self.returns]).reshape(-1,1) bounds = (0, min(0.6, 2 / len(self.returns))) A = np.array([-1*np.multiply(dirs, self.quantiles), np.ones(len(dirs)).reshape(-1,1)]).squeeze() b = np.array([0.02, 1]) res = linprog(-1*abs(self.returns), A_ub = A, b_ub = b, bounds = bounds) if res.status == 0: self.weights = res.x.reshape(-1,1) else: self.Log("Optimization failed") # If optimization fails, give uniform weight 0 (buy nothing) self.weights = dirs * (1/len(returns)) del self.returns del self.quantiles def get_all_data(self,tickers): """ Gets historical data for training from tickers argument, pass in tickers in active universe """ self.dat = pd.DataFrame() for ticker in tickers: # Create a Quant Book Object as in Research qb=QuantBook() symbol = qb.AddEquity(ticker,Resolution.Daily).Symbol # Get Price History history = self.History(symbol, self.lookback+6) # convert the historical data to a pandas DataFrame df = pd.DataFrame(history) df['direction'] = np.where(df['close'] > df['open'], 1, 0) df['return']=df['close'].pct_change() # Prepare indicator data start = self.Time-timedelta(days=self.lookback*2+20) end = self.Time rsi = qb.Indicator(RelativeStrengthIndex(20),symbol, Resolution.Daily,start=start,end=end) bbdf = qb.Indicator(BollingerBands(20, 2), symbol, Resolution.Daily,start=start,end=end) sma = qb.Indicator(SimpleMovingAverage(20), symbol, Resolution.Daily,start=start,end=end) # Add relevant columns final_df=df.droplevel(0).join(rsi).join(bbdf).join(sma) final_df['price_diff']=final_df["open"]-final_df["simplemovingaverage"] final_df['band_diff_up']=final_df["open"]-final_df["upperband"] final_df['band_diff_mid']=final_df["open"]-final_df["middleband"] final_df['band_diff_lower']=final_df["open"]-final_df["lowerband"] #Add tiingo data data = self.AddData(TiingoNews, symbol).Symbol tiingo = self.History(data, 2*self.lookback,Resolution.Daily) analyzer = SentimentIntensityAnalyzer() tiingo['polarity'] = tiingo['description'].dropna().apply(lambda x: analyzer.polarity_scores(x)) tiingo = pd.concat( [tiingo.drop(['polarity'], axis=1), tiingo['polarity'].apply(pd.Series)], axis=1) tiingo['sentiment'] = tiingo['compound'].apply(lambda x: 'positive' if x >0 else 'neutral' if x==0 else 'negative') tiingo = tiingo[[ 'publisheddate', 'compound']] tiingo['publisheddate'] = pd.to_datetime(tiingo['publisheddate'],utc=True).dt.date tiingo = tiingo.groupby(by=[ 'publisheddate'], as_index=False).sum() hist=final_df.join(tiingo.set_index('publisheddate')) # Create 5 days lag data lags = range(1, 5) final_df=hist.assign(**{ f'{col} (t-{lag})': final_df[col].shift(lag) for lag in lags for col in final_df }).dropna().drop(columns = ['close','high','low','volume']) final_df['symbol'] = ticker self.dat=pd.concat([self.dat, final_df]) self.Debug(self.dat.shape) return self.dat def will_get_all_data(self, tickers, historical=True, training=False, backtesting=True): """ Gets historical data for training and prediction Parameters: ----------- tickers : list list of tickers to retrieve data training : Bool, default False If True, retrieves training data, a 90-day period. If performing predictions, False retrieves most recent day of data. For example, if called at 8 A.M., retrieves the previous trading days' data. backtesting : Bool, default True Return: ------- self.dat : pd.DataFrame DataFrame containing data """ if historical: if backtesting: shift_factor = 25 # overshooting and select the maximum hist_lookback = 1 + shift_factor ind_lookback = 20 + shift_factor tiingo_lookback = 1 + shift_factor # in case of weekends? elif training: hist_lookback = self.lookback + 6 ind_lookback = self.lookback * 2 + 20 tiingo_lookback = self.lookback * 2 else: raise ValueError("Please train or backtest if using historical = True") else: shift_factor = 7 # needed so we can calculate lagged data hist_lookback = 1 + shift_factor ind_lookback = 20 + shift_factor tiingo_lookback = 1 + shift_factor # in case of weekends? self.dat = pd.DataFrame() for ticker in tickers: # Create a Quant Book Object as in Research qb=QuantBook() symbol = qb.AddEquity(ticker, Resolution.Daily).Symbol # Get Price History # fix this, don't need qb history in second one maybe? # or maybe use delta in indicators? if historical: history = self.History(symbol, hist_lookback) else: history = qb.History(symbol, hist_lookback) # convert the historical data to a pandas DataFrame df = pd.DataFrame(history) df['direction'] = np.where(df['close'] > df['open'], 1, 0) df['return']=df['close'].pct_change() # Prepare indicator data if historical: start = self.Time-timedelta(days=ind_lookback) end = self.Time rsi = qb.Indicator(RelativeStrengthIndex(20),symbol, Resolution.Daily, start=start,end=end) bbdf = qb.Indicator(BollingerBands(20, 2), symbol, Resolution.Daily, start=start,end=end) sma = qb.Indicator(SimpleMovingAverage(20), symbol, Resolution.Daily, start=start,end=end) else: rsi = qb.Indicator(RelativeStrengthIndex(20), symbol, ind_lookback + 1, Resolution.Daily) bbdf = qb.Indicator(BollingerBands(20, 2), symbol, ind_lookback + 1, Resolution.Daily) sma = qb.Indicator(SimpleMovingAverage(20), symbol, ind_lookback, Resolution.Daily) # Add relevant columns final_df=df.droplevel(0).join(rsi).join(bbdf).join(sma) final_df['price_diff']=final_df["open"]-final_df["simplemovingaverage"] final_df['band_diff_up']=final_df["open"]-final_df["upperband"] final_df['band_diff_mid']=final_df["open"]-final_df["middleband"] final_df['band_diff_lower']=final_df["open"]-final_df["lowerband"] # Add tiingo data if historical: data = self.AddData(TiingoNews, symbol).Symbol tiingo = self.History(data, tiingo_lookback, Resolution.Daily) else: data = qb.AddData(TiingoNews, symbol).Symbol tiingo = qb.History(data, tiingo_lookback, Resolution.Daily) analyzer = SentimentIntensityAnalyzer() tiingo['polarity'] = tiingo['description'].dropna().apply(lambda x: analyzer.polarity_scores(x)) tiingo = pd.concat([tiingo.drop(['polarity'], axis=1), tiingo['polarity'].apply(pd.Series)], axis=1) tiingo['sentiment'] = tiingo['compound'].apply(lambda x: 'positive' if x >0 else 'neutral' if x==0 else 'negative') tiingo = tiingo[[ 'publisheddate', 'compound']] tiingo['publisheddate'] = pd.to_datetime(tiingo['publisheddate'],utc=True).dt.date tiingo = tiingo.groupby(by=[ 'publisheddate'], as_index=False).sum() tiingo.rename(columns={'publisheddate' : 'time'}, inplace=True) tiingo.set_index('time',inplace=True) hist = final_df.join(tiingo) #Create 5 days lag data lags = range(1, 5) final_df=hist.assign(**{ f'{col} (t-{lag})': final_df[col].shift(lag) for lag in lags for col in final_df }).dropna().drop(columns = ['close','high','low','volume'], errors='ignore') final_df['symbol'] = ticker self.dat=pd.concat([self.dat, final_df]) return self.dat
from AlgorithmImports import * import numpy as np import pandas as pd from nltk.sentiment import SentimentIntensityAnalyzer from scipy.optimize import linprog from sklearn.ensemble import GradientBoostingRegressor from scipy.stats import norm import math class XGP(QCAlgorithm): def Initialize(self): # set up self.SetStartDate(2017, 1, 1) self.SetEndDate(2017, 6, 1) self.InitCash = 10000000 self.SetCash(self.InitCash) self.lookback = 50 # setting brokerage and reality modeling params self.SetBrokerageModel(BrokerageName.InteractiveBrokersBrokerage, AccountType.Margin) self.SetSecurityInitializer(lambda s : s.SetSlippageModel(VolumeShareSlippageModel())) # manually keeping track of securities self.securities = [] # We're getting an error when this is removed self.weights = [] self.trained = True # Risk threshold self.risk_threshold = -0.015 # Requesting data self.AddUniverseSelection(FineFundamentalUniverseSelectionModel(self.SelectCoarse, self.SelectFine)) self.UniverseSettings.Resolution = Resolution.Daily self.num_coarse_symbols = 100 self.num_fine_symbols = 10 # Train immediately self.Train(self.classifier_training) self.Schedule.On(self.DateRules.On(2023, 4, 19), self.TimeRules.Now, self.actions) # Train every Sunday at 4am or first day of month (because new universe) self.Train(self.DateRules.Every(DayOfWeek.Sunday), self.TimeRules.At(4, 0), self.classifier_training) self.Train(self.DateRules.MonthStart(daysOffset = 0), self.TimeRules.At(4, 0), self.classifier_training) self.Schedule.On(self.DateRules.Every(DayOfWeek.Monday), self.TimeRules.At(10, 0), self.actions) self.Schedule.On(self.DateRules.MonthStart(daysOffset = 0), self.TimeRules.At(10, 0), self.actions) def SelectCoarse(self, coarse): """selecting CoarseFundamental objects based on criteria in paper""" if len(self.securities) == 0: selected = [c for c in coarse if c.HasFundamentalData and c.Price > 5] sorted_by_dollar_volume = sorted(selected, key=lambda c: c.DollarVolume, reverse=True) return [c.Symbol for c in sorted_by_dollar_volume[:self.num_coarse_symbols]] elif self.Time.day != 1: return Universe.Unchanged selected = [c for c in coarse if c.HasFundamentalData and c.Price > 5] sorted_by_dollar_volume = sorted(selected, key=lambda c: c.DollarVolume, reverse=True) return [c.Symbol for c in sorted_by_dollar_volume[:self.num_coarse_symbols]] def SelectFine(self, fine): """selecting FineFundamental objects based on our criteria""" if len(self.securities) == 0: selected = [f for f in fine if f.ValuationRatios.PERatio < 100 and f.MarketCap > 300000000 and f.ValuationRatios.PEGRatio < 3 and f.OperationRatios.TotalDebtEquityRatio.Value < 2 and f.OperationRatios.CurrentRatio.Value > 1] sorted_by_pe_ratio = sorted(selected, key=lambda f: f.ValuationRatios.PERatio, reverse=True) return [f.Symbol for f in sorted_by_pe_ratio[:self.num_fine_symbols]] if self.Time.day != 1: return Universe.Unchanged selected = [f for f in fine if f.ValuationRatios.PERatio < 100 and f.MarketCap > 300000000 and f.ValuationRatios.PEGRatio < 3 and f.OperationRatios.TotalDebtEquityRatio.Value < 2 and f.OperationRatios.CurrentRatio.Value > 1] sorted_by_pe_ratio = sorted(selected, key=lambda f: f.ValuationRatios.PERatio, reverse=True) return [f.Symbol for f in sorted_by_pe_ratio[:self.num_fine_symbols]] def OnSecuritiesChanged(self, changes): """triggers when Universe changes as result of filtering""" for security in changes.AddedSecurities: self.Debug(f"{self.Time}: Added {security}") for security in changes.RemovedSecurities: self.Debug(f"{self.Time}: Removed {security}") added = changes.AddedSecurities removed = changes.RemovedSecurities self.securities = list(set(self.securities).union(set(added)).difference(set(removed))) def OnData(self, data): return def OnEndOfDay(self): self.yesterday_total_profit = self.Portfolio.TotalProfit self.yesterday_total_fees = self.Portfolio.TotalFees def OnOrderEvent(self, orderEvent): """logs the details of an order""" self.Log(f'{orderEvent}') # ===================================================================================================================== # begin custom functions # ===================================================================================================================== # def actions(self): # self.Liquidate() #Liquidate the whole portfolio # self.make_predictions() # self.LinOptProg() # a_securities = [s for s in self.securities] # for (security, wt) in zip(a_securities, [i[0] for i in self.weights]): # if wt != 0: # self.SetHoldings(security.Symbol, wt) def actions(self): self.Liquidate() #Liquidate the whole portfolio self.make_predictions() self.LinOptProg() lookback = 30 active_securities = [s.Symbol for s in self.securities] position_resize = 0.6 # risk management if len(self.weights) > 0: history = self.History(active_securities, timedelta(days=lookback), resolution=Resolution.Daily) history = history['close'].unstack(level=0) history.columns = active_securities returns = history.pct_change() w = np.array([i[0] for i in self.weights]) VaR = self.value_at_risk(returns, w) # calculation of value-at-risk limit self.Debug(f"VaR={VaR}") # position sizing if VaR <= self.risk_threshold: # if estimated loss in the next day is greater than our maximum risk threshold self.Debug(f"estimated risk {VaR} exceeds threshold") reduction_size = self.risk_threshold - VaR for (security, wt) in zip(active_securities, [i[0] for i in self.weights]): quantity = self.CalculateOrderQuantity(security, wt) reduced_quantity = math.ceil(quantity * position_resize) if reduced_quantity != 0: self.Debug(f"VaR limit reached; expected loss is {VaR}. Reducing position size of \ {security} from {quantity} to {reduced_quantity}") self.MarketOrder(security, reduced_quantity) else: a_securities = [s for s in self.securities] for (security, wt) in zip(a_securities, [i[0] for i in self.weights]): if wt != 0: self.SetHoldings(security.Symbol, wt) self.prices_at_order[security.Symbol] = self.Securities[security.Symbol].Price def classifier_training(self): self.return_mods = [] self.quantile_mods_lg = [] self.quantile_mods_st = [] #active_securities = [s.Symbol.Value for s in self.securities] active_securities = [s.Symbol for s in self.securities] self.Log(f"Training Started at {self.Time}") for security in active_securities: data = self.get_all_data([security], training=True, backtesting=False) # get tickers try: y_reg = data["return"] X = data.drop(["direction", "return", "symbol"], axis = 1) (ret, qut_lg, qut_st) = self.gb_returns(X, y_reg) except: ret = "NoModel" qut_lg = "NoModel" qut_st = "NoModel" self.return_mods.append(ret) self.quantile_mods_lg.append(qut_lg) self.quantile_mods_st.append(qut_st) self.trained = True def make_predictions(self): self.returns = [] self.quantiles = [] act_securities = [s.Symbol for s in self.securities] for i in range(len(act_securities)): security = act_securities[i] data = self.get_all_data([security], training=False) data = data[data.index == data.index.max()] prediction_data = data.drop(["direction", "return", "symbol"], axis = 1) try: r_pred = self.return_mods[i].predict(prediction_data)[0] except: r_pred = 0 if r_pred > 0: q_pred = self.quantile_mods_lg[i].predict(prediction_data)[0] elif r_pred < 0: q_pred = self.quantile_mods_st[i].predict(prediction_data)[0] else: q_pred = 0 self.returns.append(r_pred) self.quantiles.append(q_pred) self.Debug(self.returns) def gb_returns(self, X, y): """ Function to calculate expected returns and quantile loss """ mean_clf = GradientBoostingRegressor(n_estimators = 150, loss = "squared_error", criterion = "friedman_mse", learning_rate = 0.05, random_state = 1693, n_iter_no_change = 15) mean_fit_out = mean_clf.fit(X,y) quantile_clf_lg = GradientBoostingRegressor(n_estimators = 150, loss = "quantile", alpha = 0.05, n_iter_no_change = 15, learning_rate = 0.05, criterion = "friedman_mse", random_state = 1693) quantile_clf_st = GradientBoostingRegressor(n_estimators = 150, loss = "quantile", alpha = 0.95, n_iter_no_change = 15, learning_rate = 0.05, criterion = "friedman_mse", random_state = 1693) quantile_fit_lg = quantile_clf_lg.fit(X,y) quantile_fit_st = quantile_clf_st.fit(X,y) return (mean_fit_out, quantile_fit_lg, quantile_fit_st) def LinOptProg(self): """ Convex optimization Function """ self.weights = [] self.returns = np.array(self.returns).reshape(-1,1) self.quantiles = np.array(self.quantiles).reshape(-1,1) dirs = np.array([1 if d > 0 else 0 if d == 0 else -1 for d in self.returns]).reshape(-1,1) bounds = [(0, min(0.6, 3 / len(self.returns))) if d == 1 else (max(-0.6, -1.5 / len(self.returns)), 0) for d in dirs] A = np.array([-1*self.quantiles, dirs, -1*dirs]).squeeze() b = np.array([0.01, 1, 0]) res = linprog(-1*self.returns, A_ub = A, b_ub = b, bounds = bounds) if res.status == 0: self.weights = res.x.reshape(-1,1) else: self.Log("Optimization failed") # If optimization fails, give uniform weight 0 (buy nothing) self.weights = dirs * (1/len(self.returns)) del self.returns del self.quantiles def bollinger_bands(self, data, window=20, num_std=2): # Calculate the moving average data['MA'] = data['close'].rolling(window=window).mean() # Calculate the standard deviation data['STD'] = data['close'].rolling(window=window).std() # Calculate the Bollinger Bands data['Upper_BB'] = data['MA'] + (data['STD'] * num_std) data['Lower_BB'] = data['MA'] - (data['STD'] * num_std) return data def calculate_rsi(self,data, period=20): # Calculate the daily price changes (gains and losses) delta = data['close'].diff().dropna() # Separate gains and losses into their own series gains = delta.where(delta > 0, 0) losses = -delta.where(delta < 0, 0) # Calculate the average gain and average loss avg_gain = gains.ewm(com=period - 1, min_periods=period).mean() avg_loss = losses.ewm(com=period - 1, min_periods=period).mean() # Calculate the Relative Strength (RS) rs = avg_gain / avg_loss # Calculate the Relative Strength Index (RSI) rsi = 100 - (100 / (1 + rs)) data['rsi']=rsi return data def get_all_data(self, tickers, historical=True, training=False, backtesting=True): """ Gets historical data for training and prediction Parameters: ----------- tickers : list list of tickers to retrieve data historical : Bool, default True Flag to determine if we are training or backtesting; False if live trading training : Bool, default False If True, retrieves training data, a 90-day period. If performing predictions, False retrieves most recent day of data. For example, if called at 8 A.M., retrieves the previous trading days' data. backtesting : Bool, default True Flag to determine if we are backtesting or training Return: ------- self.dat : pd.DataFrame DataFrame containing data """ if historical: if backtesting: shift_factor = 30 # overshooting and select the maximum hist_lookback = 1 + shift_factor tiingo_lookback = 12 # in case of weekends? elif training: hist_lookback = self.lookback + 25 tiingo_lookback = self.lookback * 1.5 else: raise ValueError("Please train or backtest if using historical = True") else: shift_factor = 7 # needed so we can calculate lagged data hist_lookback = 1 + shift_factor tiingo_lookback = 1 + shift_factor # in case of weekends? full_data = pd.DataFrame() for symbol in tickers: # Get Price History history = self.History(symbol, hist_lookback) history = pd.DataFrame(history) # convert the historical data to a pandas DataFrame history['direction'] = np.where(history['close'] > history['open'], 1, 0) history['return']=history['close'].pct_change(periods=5) history = self.bollinger_bands(history) history = self.calculate_rsi(history) # Add relevant columns history['price_diff']=history["open"]-history["MA"] history['band_diff_up']=history["open"]-history["Upper_BB"] history['band_diff_lower']=history["open"]-history["Lower_BB"] # Add Tiingo Data data = self.AddData(TiingoNews, symbol).Symbol tiingo = self.History(data, int(tiingo_lookback), Resolution.Daily) if len(tiingo)!=0 and set(['description','publisheddate']).issubset(tiingo.columns): analyzer = SentimentIntensityAnalyzer() tiingo['polarity'] = tiingo['description'].dropna().apply(lambda x: analyzer.polarity_scores(x)) tiingo = pd.concat([tiingo.drop(['polarity'], axis=1), tiingo['polarity'].apply(pd.Series)], axis=1) #tiingo['sentiment'] = tiingo['compound'].apply(lambda x: 'positive' if x >0 else 'neutral' if x==0 else 'negative') tiingo = tiingo[[ 'publisheddate', 'compound']] tiingo['publisheddate'] = pd.to_datetime(tiingo['publisheddate'],utc=True).dt.date tiingo = tiingo.groupby(by=[ 'publisheddate'], as_index=False).sum() tiingo.rename(columns={'publisheddate' : 'time'}, inplace=True) tiingo.set_index('time',inplace=True) history = history.join(tiingo) lags = range(1,5) history=history.assign(**{ f'{col} (t-{lag})': history[col].shift(lag) for lag in lags for col in history }).dropna().drop(columns = ['close','high','low','volume'], errors='ignore') history['symbol'] = symbol.Value full_data=pd.concat([full_data, history]) self.Log(full_data) return full_data def get_daily_realized_pnl(self): daily_gross_profit = self.Portfolio.TotalProfit - self.yesterday_total_profit daily_fees = self.Portfolio.TotalFees - self.yesterday_total_fees return daily_gross_profit - daily_fees def value_at_risk(self, returns, weights, conf_level=0.05, num_days=1): """ Calculates the value-at-risk of the portfolio. --------------------------------------------------- Parameters returns : pd.DataFrame periodic returns conf_level : float confidence level. 0.05 by default weights : np.array portfolio weights num_days : int length of the period the VaR is calculated over """ cov_matrix = returns.cov() avg_return = returns.mean() portfolio_mean = avg_return.dot(weights) portfolio_stdev = np.sqrt(weights.T.dot(cov_matrix).dot(weights)) cutoff = norm.ppf(conf_level, portfolio_mean, portfolio_stdev) # n-Day VaR VaR = cutoff * np.sqrt(num_days) return VaR def cvar(self, returns, weights, conf_level=0.05): """ Calculates the portfolio CVaR ------------------------------------ Parameters returns : pd.DataFrame portfolio returns stdev : portfolio standard deviation conf_level : float confidence level """ VaR = value_at_risk(returns, weights) return VaR.mean()
from AlgorithmImports import * import numpy as np import pandas as pd from nltk.sentiment import SentimentIntensityAnalyzer from scipy.optimize import linprog from sklearn.ensemble import GradientBoostingRegressor from scipy.stats import norm import math class XGP(QCAlgorithm): def Initialize(self): # set up self.SetStartDate(2016, 1, 1) self.SetEndDate(2017, 1, 1) self.InitCash = 10000000 self.SetCash(self.InitCash) self.lookback = 50 # setting brokerage and reality modeling params self.SetBrokerageModel(BrokerageName.InteractiveBrokersBrokerage, AccountType.Margin) self.SetSecurityInitializer(lambda s : s.SetSlippageModel(VolumeShareSlippageModel())) # manually keeping track of securities self.securities = [] # We're getting an error when this is removed self.weights = [] self.trained = True self.risk_threshold = -0.015 # self.SetWarmup(timedelta(days=50)) self.prices_at_order = {} self.yesterday_total_profit = 0 self.yesterday_total_fees = 0 # Requesting data self.AddUniverseSelection(FineFundamentalUniverseSelectionModel(self.SelectCoarse, self.SelectFine)) self.UniverseSettings.Resolution = Resolution.Daily self.num_coarse_symbols = 100 self.num_fine_symbols = 10 # Train immediately self.Train(self.classifier_training) # Train every Sunday at 4am or first day of month (because new universe) self.Train(self.DateRules.Every(DayOfWeek.Sunday), self.TimeRules.At(4, 0), self.classifier_training) self.Train(self.DateRules.MonthStart(daysOffset = 0), self.TimeRules.At(4, 0), self.classifier_training) self.Schedule.On(self.DateRules.Every(DayOfWeek.Monday), self.TimeRules.At(10, 0), self.actions) self.Schedule.On(self.DateRules.MonthStart(daysOffset = 0), self.TimeRules.At(10, 0), self.actions) def SelectCoarse(self, coarse): """selecting CoarseFundamental objects based on criteria in paper""" if self.Time.day != 1: return Universe.Unchanged selected = [c for c in coarse if c.HasFundamentalData and c.Price > 5] sorted_by_dollar_volume = sorted(selected, key=lambda c: c.DollarVolume, reverse=True) return [c.Symbol for c in sorted_by_dollar_volume[:self.num_coarse_symbols]] def SelectFine(self, fine): """selecting FineFundamental objects based on our criteria""" if self.Time.day != 1: return Universe.Unchanged selected = [f for f in fine if f.ValuationRatios.PERatio < 100 and f.MarketCap > 300000000 and f.ValuationRatios.PEGRatio < 3 and f.OperationRatios.TotalDebtEquityRatio.Value < 2 and f.OperationRatios.CurrentRatio.Value > 1] sorted_by_pe_ratio = sorted(selected, key=lambda f: f.ValuationRatios.PERatio, reverse=True) return [f.Symbol for f in sorted_by_pe_ratio[:self.num_fine_symbols]] def OnSecuritiesChanged(self, changes): """triggers when Universe changes as result of filtering""" for security in changes.AddedSecurities: self.Debug(f"{self.Time}: Added {security}") for security in changes.RemovedSecurities: self.Debug(f"{self.Time}: Removed {security}") added = changes.AddedSecurities removed = changes.RemovedSecurities self.securities = list(set(self.securities).union(set(added)).difference(set(removed))) def OnData(self, data): return def OnEndOfDay(self): self.yesterday_total_profit = self.Portfolio.TotalProfit self.yesterday_total_fees = self.Portfolio.TotalFees def OnOrderEvent(self, orderEvent): """logs the details of an order""" self.Log(f'{orderEvent}') # ===================================================================================================================== # begin custom functions # ===================================================================================================================== def actions(self): self.Liquidate() #Liquidate the whole portfolio self.make_predictions() self.LinOptProg() lookback = 30 active_securities = [s.Symbol for s in self.securities] # risk management if len(self.weights) > 0: history = self.History(active_securities, timedelta(days=lookback), resolution=Resolution.Daily) history = history['close'].unstack(level=0) history.columns = active_securities returns = history.pct_change() w = np.array([i[0] for i in self.weights]) VaR = self.value_at_risk(returns, w) # calculation of value-at-risk limit self.Debug(f"VaR={VaR}") # position sizing max_loss_dollars = self.InitCash * self.risk_threshold # maximum loss in dollars we are willing to have in one trade # self.Debug(f"max_loss_dollars={max_loss_dollars}") if VaR <= self.risk_threshold: # if estimated loss in the next day is greater than our maximum risk threshold self.Debug(f"estimated risk {VaR} exceeds threshold") reduction_size = self.risk_threshold - VaR for (security, wt) in zip(active_securities, [i[0] for i in self.weights]): quantity = self.CalculateOrderQuantity(security, wt) reduced_quantity = math.ceil(quantity * 0.6) if reduced_quantity != 0: self.Debug(f"VaR limit reached; expected loss is {VaR}. Reducing position size of \ {security} from {quantity} to {reduced_quantity}") self.MarketOrder(security, reduced_quantity) else: a_securities = [s for s in self.securities] for (security, wt) in zip(a_securities, [i[0] for i in self.weights]): if wt != 0: self.SetHoldings(security.Symbol, wt) self.prices_at_order[security.Symbol] = self.Securities[security.Symbol].Price def classifier_training(self): self.return_mods = [] self.quantile_mods_lg = [] self.quantile_mods_st = [] #active_securities = [s.Symbol.Value for s in self.securities] active_securities = [s.Symbol for s in self.securities] self.Log(f"Training Started at {self.Time}") for security in active_securities: data = self.get_all_data([security], training=True, backtesting=False) # get tickers try: y_reg = data["return"] X = data.drop(["direction", "return", "symbol"], axis = 1) (ret, qut_lg, qut_st) = self.gb_returns(X, y_reg) except: ret = "NoModel" qut_lg = "NoModel" qut_st = "NoModel" self.return_mods.append(ret) self.quantile_mods_lg.append(qut_lg) self.quantile_mods_st.append(qut_st) self.trained = True def make_predictions(self): self.returns = [] self.quantiles = [] act_securities = [s.Symbol for s in self.securities] for i in range(len(act_securities)): security = act_securities[i] data = self.get_all_data([security], training=False) data = data[data.index == data.index.max()] prediction_data = data.drop(["direction", "return", "symbol"], axis = 1) try: r_pred = self.return_mods[i].predict(prediction_data)[0] except: r_pred = 0 if r_pred > 0: q_pred = self.quantile_mods_lg[i].predict(prediction_data)[0] elif r_pred < 0: q_pred = self.quantile_mods_st[i].predict(prediction_data)[0] else: q_pred = 0 self.returns.append(r_pred) self.quantiles.append(q_pred) self.Debug(self.returns) def gb_returns(self, X, y): """ Function to calculate expected returns and quantile loss """ mean_clf = GradientBoostingRegressor(n_estimators = 150, loss = "squared_error", criterion = "friedman_mse", learning_rate = 0.05, random_state = 1693, n_iter_no_change = 15) mean_fit_out = mean_clf.fit(X,y) quantile_clf_lg = GradientBoostingRegressor(n_estimators = 150, loss = "quantile", alpha = 0.05, n_iter_no_change = 15, learning_rate = 0.05, criterion = "friedman_mse", random_state = 1693) quantile_clf_st = GradientBoostingRegressor(n_estimators = 150, loss = "quantile", alpha = 0.95, n_iter_no_change = 15, learning_rate = 0.05, criterion = "friedman_mse", random_state = 1693) quantile_fit_lg = quantile_clf_lg.fit(X,y) quantile_fit_st = quantile_clf_st.fit(X,y) return (mean_fit_out, quantile_fit_lg, quantile_fit_st) def LinOptProg(self): """ Convex optimization Function """ self.weights = [] self.returns = np.array(self.returns).reshape(-1,1) self.quantiles = np.array(self.quantiles).reshape(-1,1) dirs = np.array([1 if d > 0 else 0 if d == 0 else -1 for d in self.returns]).reshape(-1,1) bounds = [(0, min(0.6, 3 / len(self.returns))) if d == 1 else (max(-0.6, -1.5 / len(self.returns)), 0) for d in dirs] A = np.array([-1*self.quantiles, dirs, -1*dirs]).squeeze() b = np.array([0.01, 1, 0]) res = linprog(-1*self.returns, A_ub = A, b_ub = b, bounds = bounds) if res.status == 0: self.weights = res.x.reshape(-1,1) else: self.Log("Optimization failed") # If optimization fails, give uniform weight 0 (buy nothing) self.weights = dirs * (1/len(self.returns)) del self.returns del self.quantiles def bollinger_bands(self, data, window=20, num_std=2): # Calculate the moving average data['MA'] = data['close'].rolling(window=window).mean() # Calculate the standard deviation data['STD'] = data['close'].rolling(window=window).std() # Calculate the Bollinger Bands data['Upper_BB'] = data['MA'] + (data['STD'] * num_std) data['Lower_BB'] = data['MA'] - (data['STD'] * num_std) return data def calculate_rsi(self,data, period=20): # Calculate the daily price changes (gains and losses) delta = data['close'].diff().dropna() # Separate gains and losses into their own series gains = delta.where(delta > 0, 0) losses = -delta.where(delta < 0, 0) # Calculate the average gain and average loss avg_gain = gains.ewm(com=period - 1, min_periods=period).mean() avg_loss = losses.ewm(com=period - 1, min_periods=period).mean() # Calculate the Relative Strength (RS) rs = avg_gain / avg_loss # Calculate the Relative Strength Index (RSI) rsi = 100 - (100 / (1 + rs)) data['rsi']=rsi return data def get_all_data(self, tickers, historical=True, training=False, backtesting=True): """ Gets historical data for training and prediction Parameters: ----------- tickers : list list of tickers to retrieve data historical : Bool, default True Flag to determine if we are training or backtesting; False if live trading training : Bool, default False If True, retrieves training data, a 90-day period. If performing predictions, False retrieves most recent day of data. For example, if called at 8 A.M., retrieves the previous trading days' data. backtesting : Bool, default True Flag to determine if we are backtesting or training Return: ------- self.dat : pd.DataFrame DataFrame containing data """ if historical: if backtesting: shift_factor = 30 # overshooting and select the maximum hist_lookback = 1 + shift_factor tiingo_lookback = 12 # in case of weekends elif training: hist_lookback = self.lookback + 25 tiingo_lookback = self.lookback * 1.5 else: raise ValueError("Please train or backtest if using historical = True") else: shift_factor = 7 # needed so we can calculate lagged data hist_lookback = 1 + shift_factor tiingo_lookback = 1 + shift_factor # in case of weekends full_data = pd.DataFrame() for symbol in tickers: # Get Price History history = self.History(symbol, hist_lookback) history = pd.DataFrame(history) # convert the historical data to a pandas DataFrame history['direction'] = np.where(history['close'] > history['open'], 1, 0) history['return']=history['close'].pct_change(periods=5) history = self.bollinger_bands(history) history = self.calculate_rsi(history) # Add relevant columns history['price_diff']=history["open"]-history["MA"] history['band_diff_up']=history["open"]-history["Upper_BB"] history['band_diff_lower']=history["open"]-history["Lower_BB"] # Add Tiingo Data data = self.AddData(TiingoNews, symbol).Symbol tiingo = self.History(data, int(tiingo_lookback), Resolution.Daily) if len(tiingo)!=0 and set(['description','publisheddate']).issubset(tiingo.columns): analyzer = SentimentIntensityAnalyzer() tiingo['polarity'] = tiingo['description'].dropna().apply(lambda x: analyzer.polarity_scores(x)) tiingo = pd.concat([tiingo.drop(['polarity'], axis=1), tiingo['polarity'].apply(pd.Series)], axis=1) #tiingo['sentiment'] = tiingo['compound'].apply(lambda x: 'positive' if x >0 else 'neutral' if x==0 else 'negative') tiingo = tiingo[[ 'publisheddate', 'compound']] tiingo['publisheddate'] = pd.to_datetime(tiingo['publisheddate'],utc=True).dt.date tiingo = tiingo.groupby(by=[ 'publisheddate'], as_index=False).sum() tiingo.rename(columns={'publisheddate' : 'time'}, inplace=True) tiingo.set_index('time',inplace=True) history = history.join(tiingo) lags = range(1,5) history=history.assign(**{ f'{col} (t-{lag})': history[col].shift(lag) for lag in lags for col in history }).dropna().drop(columns = ['close','high','low','volume'], errors='ignore') history['symbol'] = symbol.Value full_data=pd.concat([full_data, history]) return full_data def get_daily_realized_pnl(self): daily_gross_profit = self.Portfolio.TotalProfit - self.yesterday_total_profit daily_fees = self.Portfolio.TotalFees - self.yesterday_total_fees return daily_gross_profit - daily_fees def value_at_risk(self, returns, weights, conf_level=0.05, num_days=1): """ Calculates the value-at-risk of the portfolio. --------------------------------------------------- Parameters returns : pd.DataFrame periodic returns conf_level : float confidence level. 0.05 by default weights : np.array portfolio weights num_days : int length of the period the VaR is calculated over """ cov_matrix = returns.cov() avg_return = returns.mean() portfolio_mean = avg_return.dot(weights) portfolio_stdev = np.sqrt(weights.T.dot(cov_matrix).dot(weights)) cutoff = norm.ppf(conf_level, portfolio_mean, portfolio_stdev) # n-Day VaR VaR = cutoff * np.sqrt(num_days) return VaR def cvar(self, returns, weights, conf_level=0.05): """ Calculates the portfolio CVaR ------------------------------------ Parameters returns : pd.DataFrame portfolio returns stdev : portfolio standard deviation conf_level : float confidence level """ VaR = value_at_risk(returns, weights) return VaR.mean()
from AlgorithmImports import * import numpy as np import pandas as pd from nltk.sentiment import SentimentIntensityAnalyzer from scipy.optimize import linprog from sklearn.ensemble import GradientBoostingRegressor """ Parameters to consider: ----------------------- number of fine / coarse boosting hyperparameters fundamental criteria (changing them did worse) training / buying frequency lookback period (careful with periods needed for bb / rsi) To Do: ------ maybe remove mkt benchmark remove unused comments """ class XGP(QCAlgorithm): def Initialize(self): # set up self.SetStartDate(2017, 1, 1) self.SetEndDate(2017, 6, 1) self.InitCash = 10000000 self.SetCash(self.InitCash) self.lookback = 25 # setting brokerage and reality modeling params self.SetBrokerageModel(BrokerageName.InteractiveBrokersBrokerage, AccountType.Margin) self.SetSecurityInitializer(lambda s : s.SetSlippageModel(VolumeShareSlippageModel())) # benchmarking against SPY self.MKT = self.AddEquity("SPY", Resolution.Daily).Symbol self.mkt = [] # manually keeping track of securities self.securities = [] # We're getting an error when this is removed self.weights = [] self.trained = True # Requesting data self.AddUniverseSelection(FineFundamentalUniverseSelectionModel(self.SelectCoarse, self.SelectFine)) self.UniverseSettings.Resolution = Resolution.Daily self.num_coarse_symbols = 40 self.num_fine_symbols = 7 # keeps 8 since python is zero-indexed # Train immediately self.Train(self.classifier_training) # Train every Sunday at 4am or first day of month (because new universe) self.Train(self.DateRules.Every(DayOfWeek.Sunday), self.TimeRules.At(4, 0), self.classifier_training) self.Train(self.DateRules.MonthStart(daysOffset = 0), self.TimeRules.At(4, 0), self.classifier_training) #self.Train(self.DateRules.MonthStart(daysOffset = 10), self.TimeRules.At(4, 0), self.classifier_training) #self.Train(self.DateRules.MonthStart(daysOffset = 20), self.TimeRules.At(4, 0), self.classifier_training) self.Schedule.On(self.DateRules.Every(DayOfWeek.Monday), self.TimeRules.AfterMarketOpen("SPY", 10), self.actions) self.Schedule.On(self.DateRules.MonthStart(daysOffset = 0), self.TimeRules.AfterMarketOpen("SPY", 10), self.actions) def SelectCoarse(self, coarse): """selecting CoarseFundamental objects based on criteria in paper""" if self.Time.day != 1: return Universe.Unchanged selected = [c for c in coarse if c.HasFundamentalData and c.Price > 5] sorted_by_dollar_volume = sorted(selected, key=lambda c: c.DollarVolume, reverse=True) return [c.Symbol for c in sorted_by_dollar_volume[:self.num_coarse_symbols]] def SelectFine(self, fine): """selecting FineFundamental objects based on our criteria""" if self.Time.day != 1: return Universe.Unchanged selected = [f for f in fine if f.ValuationRatios.PERatio < 100 and f.MarketCap > 300000000 and f.ValuationRatios.PEGRatio < 3 and f.OperationRatios.TotalDebtEquityRatio.Value < 2 and f.OperationRatios.CurrentRatio.Value > 1] sorted_by_pe_ratio = sorted(selected, key=lambda f: f.ValuationRatios.PERatio, reverse=True) return [f.Symbol for f in sorted_by_pe_ratio[:self.num_fine_symbols]] def OnSecuritiesChanged(self, changes): """triggers when Universe changes as result of filtering""" for security in changes.AddedSecurities: self.Debug(f"{self.Time}: Added {security}") for security in changes.RemovedSecurities: self.Debug(f"{self.Time}: Removed {security}") added = changes.AddedSecurities removed = changes.RemovedSecurities self.securities = list(set(self.securities).union(set(added)).difference(set(removed))) def OnData(self, data): return # Put CVaR and VaR here def OnEndOfDay(self): # code here plots benchmark against our portfolio performance on the equity chart mkt_price = self.History(self.MKT, 2, Resolution.Daily)['close'].unstack(level= 0).iloc[-1] self.mkt.append(mkt_price) mkt_perf = self.InitCash * self.mkt[-1] / self.mkt[0] self.Plot('Strategy Equity', self.MKT, mkt_perf) def OnOrderEvent(self, orderEvent): """logs the details of an order""" self.Log(f'{orderEvent}') # ===================================================================================================================== # begin custom functions # ===================================================================================================================== def actions(self): self.Liquidate() #Liquidate the whole portfolio self.make_predictions() self.LinOptProg() a_securities = [s for s in self.securities] for (security, wt) in zip(a_securities, [i[0] for i in self.weights]): if wt != 0: self.SetHoldings(security.Symbol, wt) def classifier_training(self): self.return_mods = [] self.quantile_mods_lg = [] self.quantile_mods_st = [] #active_securities = [s.Symbol.Value for s in self.securities] active_securities = [s.Symbol for s in self.securities] self.Log(f"Training Started at {self.Time}") for security in active_securities: data = self.get_all_data([security], training=True, backtesting=False) # get tickers try: y_reg = data["return"] X = data.drop(["direction", "return", "symbol"], axis = 1) (ret, qut_lg, qut_st) = self.gb_returns(X, y_reg) except: ret = "NoModel" qut_lg = "NoModel" qut_st = "NoModel" self.return_mods.append(ret) self.quantile_mods_lg.append(qut_lg) self.quantile_mods_st.append(qut_st) self.trained = True def make_predictions(self): self.returns = [] self.quantiles = [] act_securities = [s.Symbol for s in self.securities] for i in range(len(act_securities)): security = act_securities[i] data = self.get_all_data([security], training=False) data = data[data.index == data.index.max()] prediction_data = data.drop(["direction", "return", "symbol"], axis = 1) try: r_pred = self.return_mods[i].predict(prediction_data)[0] except: r_pred = 0 if r_pred > 0: q_pred = self.quantile_mods_lg[i].predict(prediction_data)[0] elif r_pred < 0: q_pred = self.quantile_mods_st[i].predict(prediction_data)[0] else: q_pred = 0 self.returns.append(r_pred) self.quantiles.append(q_pred) self.Debug(self.returns) def gb_returns(self, X, y): """ Function to calculate expected returns and quantile loss """ mean_clf = GradientBoostingRegressor(n_estimators = 150, loss = "squared_error", criterion = "friedman_mse", learning_rate = 0.05, random_state = 1693, n_iter_no_change = 15) mean_fit_out = mean_clf.fit(X,y) quantile_clf_lg = GradientBoostingRegressor(n_estimators = 150, loss = "quantile", alpha = 0.05, n_iter_no_change = 15, learning_rate = 0.05, criterion = "friedman_mse", random_state = 1693) quantile_clf_st = GradientBoostingRegressor(n_estimators = 150, loss = "quantile", alpha = 0.95, n_iter_no_change = 15, learning_rate = 0.05, criterion = "friedman_mse", random_state = 1693) quantile_fit_lg = quantile_clf_lg.fit(X,y) quantile_fit_st = quantile_clf_st.fit(X,y) return (mean_fit_out, quantile_fit_lg, quantile_fit_st) def LinOptProg(self): """ Convex optimization Function """ self.weights = [] self.returns = np.array(self.returns).reshape(-1,1) self.quantiles = np.array(self.quantiles).reshape(-1,1) dirs = np.array([1 if d > 0 else 0 if d == 0 else -1 for d in self.returns]).reshape(-1,1) bounds = [(0, min(0.6, 3 / len(self.returns))) if d == 1 else (max(-0.6, -1.5 / len(self.returns)), 0) for d in dirs] A = np.array([-1*self.quantiles, dirs, -1*dirs]).squeeze() b = np.array([0.01, 1, 0]) res = linprog(-1*self.returns, A_ub = A, b_ub = b, bounds = bounds) if res.status == 0: self.weights = res.x.reshape(-1,1) else: self.Log("Optimization failed") # If optimization fails, give uniform weight 0 (buy nothing) self.weights = dirs * (1/len(self.returns)) del self.returns del self.quantiles def bollinger_bands(self, data, window=20, num_std=2): # Calculate the moving average data['MA'] = data['close'].rolling(window=window).mean() # Calculate the standard deviation data['STD'] = data['close'].rolling(window=window).std() # Calculate the Bollinger Bands data['Upper_BB'] = data['MA'] + (data['STD'] * num_std) data['Lower_BB'] = data['MA'] - (data['STD'] * num_std) return data def calculate_rsi(self,data, period=20): # Calculate the daily price changes (gains and losses) delta = data['close'].diff().dropna() # Separate gains and losses into their own series gains = delta.where(delta > 0, 0) losses = -delta.where(delta < 0, 0) # Calculate the average gain and average loss avg_gain = gains.ewm(com=period - 1, min_periods=period).mean() avg_loss = losses.ewm(com=period - 1, min_periods=period).mean() # Calculate the Relative Strength (RS) rs = avg_gain / avg_loss # Calculate the Relative Strength Index (RSI) rsi = 100 - (100 / (1 + rs)) data['rsi']=rsi return data def get_all_data(self, tickers, historical=True, training=False, backtesting=True): """ Gets historical data for training and prediction Parameters: ----------- tickers : list list of tickers to retrieve data historical : Bool, default True Flag to determine if we are training or backtesting; False if live trading training : Bool, default False If True, retrieves training data, a 90-day period. If performing predictions, False retrieves most recent day of data. For example, if called at 8 A.M., retrieves the previous trading days' data. backtesting : Bool, default True Flag to determine if we are backtesting or training Return: ------- self.dat : pd.DataFrame DataFrame containing data """ if historical: if backtesting: shift_factor = 30 # overshooting and select the maximum hist_lookback = 1 + shift_factor tiingo_lookback = 12 # in case of weekends? elif training: hist_lookback = self.lookback + 25 tiingo_lookback = self.lookback * 1.5 else: raise ValueError("Please train or backtest if using historical = True") else: shift_factor = 7 # needed so we can calculate lagged data hist_lookback = 1 + shift_factor tiingo_lookback = 1 + shift_factor # in case of weekends? full_data = pd.DataFrame() for symbol in tickers: # Get Price History history = self.History(symbol, hist_lookback) history = pd.DataFrame(history) # convert the historical data to a pandas DataFrame history['direction'] = np.where(history['close'] > history['open'], 1, 0) history['return']=history['close'].pct_change(periods=5) history = self.bollinger_bands(history) history = self.calculate_rsi(history) # Add relevant columns history['price_diff']=history["open"]-history["MA"] history['band_diff_up']=history["open"]-history["Upper_BB"] history['band_diff_lower']=history["open"]-history["Lower_BB"] # Add Tiingo Data data = self.AddData(TiingoNews, symbol).Symbol tiingo = self.History(data, int(tiingo_lookback), Resolution.Daily) if len(tiingo)!=0 and set(['description','publisheddate']).issubset(tiingo.columns): analyzer = SentimentIntensityAnalyzer() tiingo['polarity'] = tiingo['description'].dropna().apply(lambda x: analyzer.polarity_scores(x)) tiingo = pd.concat([tiingo.drop(['polarity'], axis=1), tiingo['polarity'].apply(pd.Series)], axis=1) #tiingo['sentiment'] = tiingo['compound'].apply(lambda x: 'positive' if x >0 else 'neutral' if x==0 else 'negative') tiingo = tiingo[[ 'publisheddate', 'compound']] tiingo['publisheddate'] = pd.to_datetime(tiingo['publisheddate'],utc=True).dt.date tiingo = tiingo.groupby(by=[ 'publisheddate'], as_index=False).sum() tiingo.rename(columns={'publisheddate' : 'time'}, inplace=True) tiingo.set_index('time',inplace=True) history = history.join(tiingo) lags = range(1,5) history=history.assign(**{ f'{col} (t-{lag})': history[col].shift(lag) for lag in lags for col in history }).dropna().drop(columns = ['close','high','low','volume'], errors='ignore') history['symbol'] = symbol.Value full_data=pd.concat([full_data, history]) self.Log(full_data) return full_data def value_at_risk(self, returns, weights, conf_level=0.05, num_days=30): """ Calculates the (absolute) value-at-risk of the portfolio. --------------------------------------------------- Parameters returns : pd.DataFrame periodic returns conf_level : float confidence level. 0.05 by default weights : np.array portfolio weights days : int number of days the VaR is calculated over. 30 by default """ cov_matrix = returns.cov() cov_matrix avg_return = returns.mean() portfolio_mean = avg_return.dot(weights) portfolio_stdev = np.sqrt(weights.T.dot(cov_matrix).dot(weights)) cutoff = norm.ppf(conf_level, portfolio_mean, portfolio_stdev) # n-Day VaR VaR = cutoff * np.sqrt(num_days) return VaR def cvar(self, returns, stdev, conf_level=0.05): """ Calculates the portfolio CVaR ------------------------------------ Parameters returns : pd.DataFrame portfolio returns stdev : portfolio standard deviation conf_level : float confidence level """ CVaR = conf_level**-1 * norm.pdf(conf_level) * stdev - returns return CVaR
from AlgorithmImports import * import numpy as np import pandas as pd from nltk.sentiment import SentimentIntensityAnalyzer from scipy.optimize import linprog from sklearn.ensemble import GradientBoostingRegressor from scipy.stats import norm import math """ Parameters to consider: ----------------------- number of fine / coarse boosting hyperparameters fundamental criteria (changing them did worse) training / buying frequency lookback period (careful with periods needed for bb / rsi) To Do: ------ maybe remove mkt benchmark remove unused comments """ class XGP(QCAlgorithm): def Initialize(self): # set up self.SetStartDate(2017, 1, 1) self.SetEndDate(2017, 6, 1) self.InitCash = 10000000 self.SetCash(self.InitCash) self.lookback = 25 # setting brokerage and reality modeling params self.SetBrokerageModel(BrokerageName.InteractiveBrokersBrokerage, AccountType.Margin) self.SetSecurityInitializer(lambda s : s.SetSlippageModel(VolumeShareSlippageModel())) # benchmarking against SPY self.MKT = self.AddEquity("SPY", Resolution.Daily).Symbol self.mkt = [] # manually keeping track of securities self.securities = [] # We're getting an error when this is removed self.weights = [] self.trained = True # Value-at-Risk limit: maximum risk we are willing to take on any one trade, # as a percentage loss of the investment capital self.var_limit = -0.015 # Requesting data self.AddUniverseSelection(FineFundamentalUniverseSelectionModel(self.SelectCoarse, self.SelectFine)) self.UniverseSettings.Resolution = Resolution.Daily self.num_coarse_symbols = 40 self.num_fine_symbols = 7 # keeps 8 since python is zero-indexed # Train immediately self.Train(self.classifier_training) # Train every Sunday at 4am or first day of month (because new universe) self.Train(self.DateRules.Every(DayOfWeek.Sunday), self.TimeRules.At(4, 0), self.classifier_training) self.Train(self.DateRules.MonthStart(daysOffset = 0), self.TimeRules.At(4, 0), self.classifier_training) #self.Train(self.DateRules.MonthStart(daysOffset = 10), self.TimeRules.At(4, 0), self.classifier_training) #self.Train(self.DateRules.MonthStart(daysOffset = 20), self.TimeRules.At(4, 0), self.classifier_training) self.Schedule.On(self.DateRules.Every(DayOfWeek.Monday), self.TimeRules.AfterMarketOpen("SPY", 10), self.actions) self.Schedule.On(self.DateRules.MonthStart(daysOffset = 0), self.TimeRules.AfterMarketOpen("SPY", 10), self.actions) def SelectCoarse(self, coarse): """selecting CoarseFundamental objects based on criteria in paper""" if self.Time.day != 1: return Universe.Unchanged selected = [c for c in coarse if c.HasFundamentalData and c.Price > 5] sorted_by_dollar_volume = sorted(selected, key=lambda c: c.DollarVolume, reverse=True) return [c.Symbol for c in sorted_by_dollar_volume[:self.num_coarse_symbols]] def SelectFine(self, fine): """selecting FineFundamental objects based on our criteria""" if self.Time.day != 1: return Universe.Unchanged selected = [f for f in fine if f.ValuationRatios.PERatio < 100 and f.MarketCap > 300000000 and f.ValuationRatios.PEGRatio < 3 and f.OperationRatios.TotalDebtEquityRatio.Value < 2 and f.OperationRatios.CurrentRatio.Value > 1] sorted_by_pe_ratio = sorted(selected, key=lambda f: f.ValuationRatios.PERatio, reverse=True) return [f.Symbol for f in sorted_by_pe_ratio[:self.num_fine_symbols]] def OnSecuritiesChanged(self, changes): """triggers when Universe changes as result of filtering""" for security in changes.AddedSecurities: self.Debug(f"{self.Time}: Added {security}") for security in changes.RemovedSecurities: self.Debug(f"{self.Time}: Removed {security}") added = changes.AddedSecurities removed = changes.RemovedSecurities self.securities = list(set(self.securities).union(set(added)).difference(set(removed))) def OnData(self, data): # risk management (value-at-risk) lookback = 30 reduction_size = 0.8 # by how much to reduce the portion size when VaR limit is exceeded active_securities = [s.Symbol for s in self.securities] if len(self.weights) > 0: history = self.History(active_securities, timedelta(days=lookback), resolution=Resolution.Daily) history = history['close'].unstack(level=0) history.columns = active_securities returns = history.pct_change() w = np.array([i[0] for i in self.weights]) VaR = self.value_at_risk(returns, w) # VaR is expressed as a negative quantity, so this is when the var_limit is reached if VaR <= self.var_limit: self.Debug(f"VaR limit reached; expected loss is {VaR}") for (security, wt) in zip(self.securities, [i[0] for i in self.weights]): quantity = self.CalculateOrderQuantity(security.Symbol, wt) reduced_quantity = math.ceil(quantity * reduction_size) if reduced_quantity != 0: # reduce position size self.MarketOrder(security.Symbol, reduced_quantity) def OnEndOfDay(self): # code here plots benchmark against our portfolio performance on the equity chart mkt_price = self.History(self.MKT, 2, Resolution.Daily)['close'].unstack(level= 0).iloc[-1] self.mkt.append(mkt_price) mkt_perf = self.InitCash * self.mkt[-1] / self.mkt[0] self.Plot('Strategy Equity', self.MKT, mkt_perf) def OnOrderEvent(self, orderEvent): """logs the details of an order""" self.Log(f'{orderEvent}') # ===================================================================================================================== # begin custom functions # ===================================================================================================================== def actions(self): self.Liquidate() #Liquidate the whole portfolio self.make_predictions() self.LinOptProg() a_securities = [s for s in self.securities] for (security, wt) in zip(a_securities, [i[0] for i in self.weights]): if wt != 0: self.SetHoldings(security.Symbol, wt) def classifier_training(self): self.return_mods = [] self.quantile_mods_lg = [] self.quantile_mods_st = [] #active_securities = [s.Symbol.Value for s in self.securities] active_securities = [s.Symbol for s in self.securities] self.Log(f"Training Started at {self.Time}") for security in active_securities: data = self.get_all_data([security], training=True, backtesting=False) # get tickers try: y_reg = data["return"] X = data.drop(["direction", "return", "symbol"], axis = 1) (ret, qut_lg, qut_st) = self.gb_returns(X, y_reg) except: ret = "NoModel" qut_lg = "NoModel" qut_st = "NoModel" self.return_mods.append(ret) self.quantile_mods_lg.append(qut_lg) self.quantile_mods_st.append(qut_st) self.trained = True def make_predictions(self): self.returns = [] self.quantiles = [] act_securities = [s.Symbol for s in self.securities] for i in range(len(act_securities)): security = act_securities[i] data = self.get_all_data([security], training=False) data = data[data.index == data.index.max()] prediction_data = data.drop(["direction", "return", "symbol"], axis = 1) try: r_pred = self.return_mods[i].predict(prediction_data)[0] except: r_pred = 0 if r_pred > 0: q_pred = self.quantile_mods_lg[i].predict(prediction_data)[0] elif r_pred < 0: q_pred = self.quantile_mods_st[i].predict(prediction_data)[0] else: q_pred = 0 self.returns.append(r_pred) self.quantiles.append(q_pred) self.Debug(self.returns) def gb_returns(self, X, y): """ Function to calculate expected returns and quantile loss """ mean_clf = GradientBoostingRegressor(n_estimators = 150, loss = "squared_error", criterion = "friedman_mse", learning_rate = 0.05, random_state = 1693, n_iter_no_change = 15) mean_fit_out = mean_clf.fit(X,y) quantile_clf_lg = GradientBoostingRegressor(n_estimators = 150, loss = "quantile", alpha = 0.05, n_iter_no_change = 15, learning_rate = 0.05, criterion = "friedman_mse", random_state = 1693) quantile_clf_st = GradientBoostingRegressor(n_estimators = 150, loss = "quantile", alpha = 0.95, n_iter_no_change = 15, learning_rate = 0.05, criterion = "friedman_mse", random_state = 1693) quantile_fit_lg = quantile_clf_lg.fit(X,y) quantile_fit_st = quantile_clf_st.fit(X,y) return (mean_fit_out, quantile_fit_lg, quantile_fit_st) def LinOptProg(self): """ Convex optimization Function """ self.weights = [] self.returns = np.array(self.returns).reshape(-1,1) self.quantiles = np.array(self.quantiles).reshape(-1,1) dirs = np.array([1 if d > 0 else 0 if d == 0 else -1 for d in self.returns]).reshape(-1,1) bounds = [(0, min(0.6, 3 / len(self.returns))) if d == 1 else (max(-0.6, -1.5 / len(self.returns)), 0) for d in dirs] A = np.array([-1*self.quantiles, dirs, -1*dirs]).squeeze() b = np.array([0.01, 1, 0]) res = linprog(-1*self.returns, A_ub = A, b_ub = b, bounds = bounds) if res.status == 0: self.weights = res.x.reshape(-1,1) else: self.Log("Optimization failed") # If optimization fails, give uniform weight 0 (buy nothing) self.weights = dirs * (1/len(self.returns)) del self.returns del self.quantiles def bollinger_bands(self, data, window=20, num_std=2): # Calculate the moving average data['MA'] = data['close'].rolling(window=window).mean() # Calculate the standard deviation data['STD'] = data['close'].rolling(window=window).std() # Calculate the Bollinger Bands data['Upper_BB'] = data['MA'] + (data['STD'] * num_std) data['Lower_BB'] = data['MA'] - (data['STD'] * num_std) return data def calculate_rsi(self,data, period=20): # Calculate the daily price changes (gains and losses) delta = data['close'].diff().dropna() # Separate gains and losses into their own series gains = delta.where(delta > 0, 0) losses = -delta.where(delta < 0, 0) # Calculate the average gain and average loss avg_gain = gains.ewm(com=period - 1, min_periods=period).mean() avg_loss = losses.ewm(com=period - 1, min_periods=period).mean() # Calculate the Relative Strength (RS) rs = avg_gain / avg_loss # Calculate the Relative Strength Index (RSI) rsi = 100 - (100 / (1 + rs)) data['rsi']=rsi return data def get_all_data(self, tickers, historical=True, training=False, backtesting=True): """ Gets historical data for training and prediction Parameters: ----------- tickers : list list of tickers to retrieve data historical : Bool, default True Flag to determine if we are training or backtesting; False if live trading training : Bool, default False If True, retrieves training data, a 90-day period. If performing predictions, False retrieves most recent day of data. For example, if called at 8 A.M., retrieves the previous trading days' data. backtesting : Bool, default True Flag to determine if we are backtesting or training Return: ------- self.dat : pd.DataFrame DataFrame containing data """ if historical: if backtesting: shift_factor = 30 # overshooting and select the maximum hist_lookback = 1 + shift_factor tiingo_lookback = 12 # in case of weekends? elif training: hist_lookback = self.lookback + 25 tiingo_lookback = self.lookback * 1.5 else: raise ValueError("Please train or backtest if using historical = True") else: shift_factor = 7 # needed so we can calculate lagged data hist_lookback = 1 + shift_factor tiingo_lookback = 1 + shift_factor # in case of weekends? full_data = pd.DataFrame() for symbol in tickers: # Get Price History history = self.History(symbol, hist_lookback) history = pd.DataFrame(history) # convert the historical data to a pandas DataFrame history['direction'] = np.where(history['close'] > history['open'], 1, 0) history['return']=history['close'].pct_change(periods=5) history = self.bollinger_bands(history) history = self.calculate_rsi(history) # Add relevant columns history['price_diff']=history["open"]-history["MA"] history['band_diff_up']=history["open"]-history["Upper_BB"] history['band_diff_lower']=history["open"]-history["Lower_BB"] # Add Tiingo Data data = self.AddData(TiingoNews, symbol).Symbol tiingo = self.History(data, int(tiingo_lookback), Resolution.Daily) if len(tiingo)!=0 and set(['description','publisheddate']).issubset(tiingo.columns): analyzer = SentimentIntensityAnalyzer() tiingo['polarity'] = tiingo['description'].dropna().apply(lambda x: analyzer.polarity_scores(x)) tiingo = pd.concat([tiingo.drop(['polarity'], axis=1), tiingo['polarity'].apply(pd.Series)], axis=1) #tiingo['sentiment'] = tiingo['compound'].apply(lambda x: 'positive' if x >0 else 'neutral' if x==0 else 'negative') tiingo = tiingo[[ 'publisheddate', 'compound']] tiingo['publisheddate'] = pd.to_datetime(tiingo['publisheddate'],utc=True).dt.date tiingo = tiingo.groupby(by=[ 'publisheddate'], as_index=False).sum() tiingo.rename(columns={'publisheddate' : 'time'}, inplace=True) tiingo.set_index('time',inplace=True) history = history.join(tiingo) lags = range(1,5) history=history.assign(**{ f'{col} (t-{lag})': history[col].shift(lag) for lag in lags for col in history }).dropna().drop(columns = ['close','high','low','volume'], errors='ignore') history['symbol'] = symbol.Value full_data=pd.concat([full_data, history]) self.Log(full_data) return full_data def value_at_risk(self, returns, weights, conf_level=0.05, num_days=1): """ Calculates the (absolute) value-at-risk of the portfolio. --------------------------------------------------- Parameters returns : pd.DataFrame periodic returns conf_level : float confidence level. 0.05 by default weights : np.array portfolio weights num_days : int length of the period the VaR is calculated over """ cov_matrix = returns.cov() avg_return = returns.mean() portfolio_mean = avg_return.dot(weights) portfolio_stdev = np.sqrt(weights.T.dot(cov_matrix).dot(weights)) cutoff = norm.ppf(conf_level, portfolio_mean, portfolio_stdev) # n-Day VaR VaR = cutoff * np.sqrt(num_days) return VaR def cvar(self, returns, weights, conf_level=0.05): """ Calculates the portfolio CVaR ------------------------------------ Parameters returns : pd.DataFrame portfolio returns stdev : portfolio standard deviation conf_level : float confidence level """ VaR = value_at_risk(returns, weights) return VaR.mean()
from AlgorithmImports import * import numpy as np import pandas as pd from nltk.sentiment import SentimentIntensityAnalyzer from scipy.optimize import linprog from sklearn.ensemble import GradientBoostingRegressor from scipy.stats import norm import math class XGP(QCAlgorithm): def Initialize(self): # set up self.SetStartDate(2017, 1, 1) self.SetEndDate(2017, 6, 1) self.InitCash = 10000000 self.SetCash(self.InitCash) self.lookback = 50 # setting brokerage and reality modeling params self.SetBrokerageModel(BrokerageName.InteractiveBrokersBrokerage, AccountType.Margin) self.SetSecurityInitializer(lambda s : s.SetSlippageModel(VolumeShareSlippageModel())) # manually keeping track of securities self.securities = [] # We're getting an error when this is removed self.weights = [] self.trained = True # Value-at-Risk limit: maximum risk we are willing to take on any one trade, # as a percentage loss of the investment capital self.var_limit = -0.015 self.yesterday_total_profit = 0 self.yesterday_total_fees = 0 # Requesting data self.AddUniverseSelection(FineFundamentalUniverseSelectionModel(self.SelectCoarse, self.SelectFine)) self.UniverseSettings.Resolution = Resolution.Daily self.num_coarse_symbols = 100 self.num_fine_symbols = 10 # Train immediately self.Train(self.classifier_training) # Train every Sunday at 4am or first day of month (because new universe) self.Train(self.DateRules.Every(DayOfWeek.Sunday), self.TimeRules.At(4, 0), self.classifier_training) self.Train(self.DateRules.MonthStart(daysOffset = 0), self.TimeRules.At(4, 0), self.classifier_training) self.Schedule.On(self.DateRules.Every(DayOfWeek.Monday), self.TimeRules.At(10, 0), self.actions) self.Schedule.On(self.DateRules.MonthStart(daysOffset = 0), self.TimeRules.At(10, 0), self.actions) def SelectCoarse(self, coarse): """selecting CoarseFundamental objects based on criteria in paper""" if self.Time.day != 1: return Universe.Unchanged selected = [c for c in coarse if c.HasFundamentalData and c.Price > 5] sorted_by_dollar_volume = sorted(selected, key=lambda c: c.DollarVolume, reverse=True) return [c.Symbol for c in sorted_by_dollar_volume[:self.num_coarse_symbols]] def SelectFine(self, fine): """selecting FineFundamental objects based on our criteria""" if self.Time.day != 1: return Universe.Unchanged selected = [f for f in fine if f.ValuationRatios.PERatio < 100 and f.MarketCap > 300000000 and f.ValuationRatios.PEGRatio < 3 and f.OperationRatios.TotalDebtEquityRatio.Value < 2 and f.OperationRatios.CurrentRatio.Value > 1] sorted_by_pe_ratio = sorted(selected, key=lambda f: f.ValuationRatios.PERatio, reverse=True) return [f.Symbol for f in sorted_by_pe_ratio[:self.num_fine_symbols]] def OnSecuritiesChanged(self, changes): """triggers when Universe changes as result of filtering""" for security in changes.AddedSecurities: self.Debug(f"{self.Time}: Added {security}") for security in changes.RemovedSecurities: self.Debug(f"{self.Time}: Removed {security}") added = changes.AddedSecurities removed = changes.RemovedSecurities self.securities = list(set(self.securities).union(set(added)).difference(set(removed))) def OnData(self, data): return def OnEndOfDay(self): # code here plots benchmark against our portfolio performance on the equity chart self.yesterday_total_profit = self.Portfolio.TotalProfit self.yesterday_total_fees = self.Portfolio.TotalFees def OnOrderEvent(self, orderEvent): """logs the details of an order""" self.Log(f'{orderEvent}') # ===================================================================================================================== # begin custom functions # ===================================================================================================================== def actions(self): self.Liquidate() #Liquidate the whole portfolio self.make_predictions() self.LinOptProg() a_securities = [s for s in self.securities] pnl = self.get_daily_realized_pnl() # risk management (value-at-risk) lookback = 30 reduction_size = 0.8 # by how much to reduce the portion size when VaR limit is exceeded active_securities = [s.Symbol for s in self.securities] if len(self.weights) > 0: history = self.History(active_securities, timedelta(days=lookback), resolution=Resolution.Daily) history = history['close'].unstack(level=0) history.columns = active_securities returns = history.pct_change() w = np.array([i[0] for i in self.weights]) self.var_limit = self.value_at_risk(returns, w) # VaR is expressed as a negative quantity, so this is when the var_limit is reached if pnl <= self.var_limit: self.Debug(f"VaR limit ({self.var_limit}) reached; expected loss is {VaR}") for (security, wt) in zip(self.securities, [i[0] for i in self.weights]): quantity = self.CalculateOrderQuantity(security.Symbol, wt) reduced_quantity = math.ceil(quantity * reduction_size) if reduced_quantity != 0: # reduce position size self.MarketOrder(security.Symbol, reduced_quantity) else: for (security, wt) in zip(a_securities, [i[0] for i in self.weights]): if wt != 0: self.SetHoldings(security.Symbol, wt) def classifier_training(self): self.return_mods = [] self.quantile_mods_lg = [] self.quantile_mods_st = [] #active_securities = [s.Symbol.Value for s in self.securities] active_securities = [s.Symbol for s in self.securities] self.Log(f"Training Started at {self.Time}") for security in active_securities: data = self.get_all_data([security], training=True, backtesting=False) # get tickers try: y_reg = data["return"] X = data.drop(["direction", "return", "symbol"], axis = 1) (ret, qut_lg, qut_st) = self.gb_returns(X, y_reg) except: ret = "NoModel" qut_lg = "NoModel" qut_st = "NoModel" self.return_mods.append(ret) self.quantile_mods_lg.append(qut_lg) self.quantile_mods_st.append(qut_st) self.trained = True def make_predictions(self): self.returns = [] self.quantiles = [] act_securities = [s.Symbol for s in self.securities] for i in range(len(act_securities)): security = act_securities[i] data = self.get_all_data([security], training=False) data = data[data.index == data.index.max()] prediction_data = data.drop(["direction", "return", "symbol"], axis = 1) try: r_pred = self.return_mods[i].predict(prediction_data)[0] except: r_pred = 0 if r_pred > 0: q_pred = self.quantile_mods_lg[i].predict(prediction_data)[0] elif r_pred < 0: q_pred = self.quantile_mods_st[i].predict(prediction_data)[0] else: q_pred = 0 self.returns.append(r_pred) self.quantiles.append(q_pred) self.Debug(self.returns) def gb_returns(self, X, y): """ Function to calculate expected returns and quantile loss """ mean_clf = GradientBoostingRegressor(n_estimators = 150, loss = "squared_error", criterion = "friedman_mse", learning_rate = 0.05, random_state = 1693, n_iter_no_change = 15) mean_fit_out = mean_clf.fit(X,y) quantile_clf_lg = GradientBoostingRegressor(n_estimators = 150, loss = "quantile", alpha = 0.05, n_iter_no_change = 15, learning_rate = 0.05, criterion = "friedman_mse", random_state = 1693) quantile_clf_st = GradientBoostingRegressor(n_estimators = 150, loss = "quantile", alpha = 0.95, n_iter_no_change = 15, learning_rate = 0.05, criterion = "friedman_mse", random_state = 1693) quantile_fit_lg = quantile_clf_lg.fit(X,y) quantile_fit_st = quantile_clf_st.fit(X,y) return (mean_fit_out, quantile_fit_lg, quantile_fit_st) def LinOptProg(self): """ Convex optimization Function """ self.weights = [] self.returns = np.array(self.returns).reshape(-1,1) self.quantiles = np.array(self.quantiles).reshape(-1,1) dirs = np.array([1 if d > 0 else 0 if d == 0 else -1 for d in self.returns]).reshape(-1,1) bounds = [(0, min(0.6, 3 / len(self.returns))) if d == 1 else (max(-0.6, -1.5 / len(self.returns)), 0) for d in dirs] A = np.array([-1*self.quantiles, dirs, -1*dirs]).squeeze() b = np.array([0.01, 1, 0]) res = linprog(-1*self.returns, A_ub = A, b_ub = b, bounds = bounds) if res.status == 0: self.weights = res.x.reshape(-1,1) else: self.Log("Optimization failed") # If optimization fails, give uniform weight 0 (buy nothing) self.weights = dirs * (1/len(self.returns)) del self.returns del self.quantiles def bollinger_bands(self, data, window=20, num_std=2): # Calculate the moving average data['MA'] = data['close'].rolling(window=window).mean() # Calculate the standard deviation data['STD'] = data['close'].rolling(window=window).std() # Calculate the Bollinger Bands data['Upper_BB'] = data['MA'] + (data['STD'] * num_std) data['Lower_BB'] = data['MA'] - (data['STD'] * num_std) return data def calculate_rsi(self,data, period=20): # Calculate the daily price changes (gains and losses) delta = data['close'].diff().dropna() # Separate gains and losses into their own series gains = delta.where(delta > 0, 0) losses = -delta.where(delta < 0, 0) # Calculate the average gain and average loss avg_gain = gains.ewm(com=period - 1, min_periods=period).mean() avg_loss = losses.ewm(com=period - 1, min_periods=period).mean() # Calculate the Relative Strength (RS) rs = avg_gain / avg_loss # Calculate the Relative Strength Index (RSI) rsi = 100 - (100 / (1 + rs)) data['rsi']=rsi return data def get_all_data(self, tickers, historical=True, training=False, backtesting=True): """ Gets historical data for training and prediction Parameters: ----------- tickers : list list of tickers to retrieve data historical : Bool, default True Flag to determine if we are training or backtesting; False if live trading training : Bool, default False If True, retrieves training data, a 90-day period. If performing predictions, False retrieves most recent day of data. For example, if called at 8 A.M., retrieves the previous trading days' data. backtesting : Bool, default True Flag to determine if we are backtesting or training Return: ------- self.dat : pd.DataFrame DataFrame containing data """ if historical: if backtesting: shift_factor = 30 # overshooting and select the maximum hist_lookback = 1 + shift_factor tiingo_lookback = 12 # in case of weekends? elif training: hist_lookback = self.lookback + 25 tiingo_lookback = self.lookback * 1.5 else: raise ValueError("Please train or backtest if using historical = True") else: shift_factor = 7 # needed so we can calculate lagged data hist_lookback = 1 + shift_factor tiingo_lookback = 1 + shift_factor # in case of weekends? full_data = pd.DataFrame() for symbol in tickers: # Get Price History history = self.History(symbol, hist_lookback) history = pd.DataFrame(history) # convert the historical data to a pandas DataFrame history['direction'] = np.where(history['close'] > history['open'], 1, 0) history['return']=history['close'].pct_change(periods=5) history = self.bollinger_bands(history) history = self.calculate_rsi(history) # Add relevant columns history['price_diff']=history["open"]-history["MA"] history['band_diff_up']=history["open"]-history["Upper_BB"] history['band_diff_lower']=history["open"]-history["Lower_BB"] # Add Tiingo Data data = self.AddData(TiingoNews, symbol).Symbol tiingo = self.History(data, int(tiingo_lookback), Resolution.Daily) if len(tiingo)!=0 and set(['description','publisheddate']).issubset(tiingo.columns): analyzer = SentimentIntensityAnalyzer() tiingo['polarity'] = tiingo['description'].dropna().apply(lambda x: analyzer.polarity_scores(x)) tiingo = pd.concat([tiingo.drop(['polarity'], axis=1), tiingo['polarity'].apply(pd.Series)], axis=1) #tiingo['sentiment'] = tiingo['compound'].apply(lambda x: 'positive' if x >0 else 'neutral' if x==0 else 'negative') tiingo = tiingo[[ 'publisheddate', 'compound']] tiingo['publisheddate'] = pd.to_datetime(tiingo['publisheddate'],utc=True).dt.date tiingo = tiingo.groupby(by=[ 'publisheddate'], as_index=False).sum() tiingo.rename(columns={'publisheddate' : 'time'}, inplace=True) tiingo.set_index('time',inplace=True) history = history.join(tiingo) lags = range(1,5) history=history.assign(**{ f'{col} (t-{lag})': history[col].shift(lag) for lag in lags for col in history }).dropna().drop(columns = ['close','high','low','volume'], errors='ignore') history['symbol'] = symbol.Value full_data=pd.concat([full_data, history]) self.Log(full_data) return full_data def get_daily_realized_pnl(self): daily_gross_profit = self.Portfolio.TotalProfit - self.yesterday_total_profit daily_fees = self.Portfolio.TotalFees - self.yesterday_total_fees return daily_gross_profit - daily_fees def value_at_risk(self, returns, weights, conf_level=0.05, num_days=1): """ Calculates the value-at-risk of the portfolio. --------------------------------------------------- Parameters returns : pd.DataFrame periodic returns conf_level : float confidence level. 0.05 by default weights : np.array portfolio weights num_days : int length of the period the VaR is calculated over """ cov_matrix = returns.cov() avg_return = returns.mean() portfolio_mean = avg_return.dot(weights) portfolio_stdev = np.sqrt(weights.T.dot(cov_matrix).dot(weights)) cutoff = norm.ppf(conf_level, portfolio_mean, portfolio_stdev) # n-Day VaR VaR = cutoff * np.sqrt(num_days) return VaR def cvar(self, returns, weights, conf_level=0.05): """ Calculates the portfolio CVaR ------------------------------------ Parameters returns : pd.DataFrame portfolio returns stdev : portfolio standard deviation conf_level : float confidence level """ VaR = value_at_risk(returns, weights) return VaR.mean()
#region imports from AlgorithmImports import * #endregion # Your New Python File class FocusedGreenDonkey(QCAlgorithm): def Initialize(self): self.SetStartDate(2019, 10, 12) self.SetCash(100000) self.AddUniverse(self.MyCoarseFilterFunction) self.symbolDataBySymbol = {} def MyCoarseFilterFunction(self,coarse): StocksUnder10 = [c for c in coarse if c.Price<10] symbols = [c.Symbol for c in StocksUnder10 if c.Symbol not in self.symbolDataBySymbol] history = self.History(symbols, 252, Resolution.Daily) if not history.empty: history = history.close.unstack(0) for symbol in symbols: if str(symbol) not in history: continue df = history[symbol].dropna() if not df.empty: self.symbolDataBySymbol[symbol] = SymbolData(self, symbol, df) for x in coarse: symbol = x.Symbol if symbol in self.symbolDataBySymbol: self.symbolDataBySymbol[symbol].Update(x.EndTime, x.AdjustedPrice) selectedSymbols = [symbol for symbol, symbolData in self.symbolDataBySymbol.items() if symbolData.HasNewMax and symbolData.rsi.Current.Value>80] return selectedSymbols def OnData(self, data): pass def OnSecuritiesChanged(self, changes): for security in changes.AddedSecurities: symbol = security.Symbol self.MarketOrder(symbol, 100) for security in changes.RemovedSecurities: symbol = security.Symbol self.Liquidate(symbol) class SymbolData: def __init__(self, algorithm, symbol, history): self.symbol = symbol self.max = Maximum(252) self.rsi = RelativeStrengthIndex(10, MovingAverageType.Simple) self.maxWindow = RollingWindow[IndicatorDataPoint](2) self.max.Updated += self.OnMax for time, close in history.iteritems(): self.Update(time, close) def OnMax(self, sender, updated): if self.max.IsReady: self.maxWindow.Add(updated) def Update(self, time, close): self.max.Update(time, close) self.rsi.Update(time,close) @property def HasNewMax(self): if self.maxWindow.IsReady: return self.maxWindow[0] > self.maxWindow[1] else: return False
from AlgorithmImports import * class USCoarseUniverseConstituentsDataAlgorithm(QCAlgorithm): _number_of_symbols = 3 _changes = None def Initialize(self): self.SetStartDate(2021, 1, 1) self.SetEndDate(2021, 7, 1) self.SetCash(100000) # Requesting data self.AddUniverse(self.CoarseSelectionFunction) def CoarseSelectionFunction(self, coarse): sortedByDollarVolume = sorted(coarse, key=lambda x: x.DollarVolume, reverse=True) return [ x.Symbol for x in sortedByDollarVolume[:self._number_of_symbols] ] def OnData(self, data): # if we have no changes, do nothing if self._changes is None: return # liquidate removed securities for security in self._changes.RemovedSecurities: if security.Invested: self.Liquidate(security.Symbol) # we want 1/N allocation in each security in our universe for security in self._changes.AddedSecurities: self.SetHoldings(security.Symbol, 1 / self._number_of_symbols) self._changes = None self.Log({security.Symbol}) def OnSecuritiesChanged(self, changes): self._changes = changes for security in changes.AddedSecurities: # Historical data history = self.History(security.Symbol, 7, Resolution.Daily) self.Debug(f"We got {len(history)} from our history request for {security.Symbol}")
from QuantConnect.Data.Custom.Tiingo import * from AlgorithmImports import * # ref: https://www.quantconnect.com/forum/discussion/6638/new-data-source-tiingo-news-data/p1 class TiingoNLPDemonstration(QCAlgorithm): def Initialize(self): self.wordSentiment = { "bad": -0.5, "good": 0.5, "negative": -0.5, "great": 0.5, "growth": 0.5, "fail": -0.5, "failed": -0.5, "success": 0.5, "nailed": 0.5, "beat": 0.5, "missed": -0.5, } self.SetStartDate(2022, 1, 1) self.SetCash(8000) aapl = self.AddEquity("AAPL", Resolution.Hour).Symbol self.aaplCustom = self.AddData(TiingoNews, aapl).Symbol def OnData(self, data): if not data.ContainsKey(self.aaplCustom): return news = data[self.aaplCustom] descriptionWords = news.Description.lower().split(" ") intersection = set(self.wordSentiment.keys()).intersection(descriptionWords) sentimentSum = sum([self.wordSentiment[i] for i in intersection]) self.SetHoldings(self.aaplCustom.Underlying, sentimentSum)
from AlgorithmImports import * import pandas as pd class Algorithm(QCAlgorithm()): def Initialize(self): pass def value_at_risk(self, returns, weights): """ Need to modify -- check if returns are approximately normally distributed or needs modifications on this assumption --------------------------------------------------- Parameters returns: pd.DataFrame periodic returns weights: np.array portfolio weights """ cov_matrix = returns.cov() cov_matrix avg_return = returns.mean() portfolio_mean = avg_return.dot(weights) portfolio_stdev = np.sqrt(weights.T.dot(cov_matrix).dot(weights)) mean_investment = (1+portfolio_mean) * self.InitCash stdev_investment = self.InitCash * portfolio_stdev # confidence interval (95%) conf = 0.05 cutoff = norm.ppf(conf, mean_investment, stdev_investment) var_1d1 = self.InitCash - cutoff # Calculate n Day VaR var_array = [] num_days = 30 for x in range(1, num_days+1): var_array.append(np.round(var_1d1 * np.sqrt(x),2)) return var_array
from AlgorithmImports import * import numpy as np import pandas as pd from nltk.sentiment import SentimentIntensityAnalyzer from scipy.optimize import linprog from sklearn.ensemble import GradientBoostingRegressor from scipy.stats import norm import math class XGP(QCAlgorithm): def Initialize(self): # set up self.SetStartDate(2017, 1, 1) self.SetEndDate(2017, 6, 1) self.InitCash = 10000000 self.SetCash(self.InitCash) self.lookback = 50 # setting brokerage and reality modeling params self.SetBrokerageModel(BrokerageName.InteractiveBrokersBrokerage, AccountType.Margin) self.SetSecurityInitializer(lambda s : s.SetSlippageModel(VolumeShareSlippageModel())) # manually keeping track of securities self.securities = [] # We're getting an error when this is removed self.weights = [] self.trained = True self.risk_threshold = -0.015 # self.SetWarmup(timedelta(days=50)) self.prices_at_order = {} self.yesterday_total_profit = 0 self.yesterday_total_fees = 0 # Requesting data self.AddUniverseSelection(FineFundamentalUniverseSelectionModel(self.SelectCoarse, self.SelectFine)) self.UniverseSettings.Resolution = Resolution.Daily self.num_coarse_symbols = 100 self.num_fine_symbols = 10 # Train immediately self.Train(self.classifier_training) self.Schedule.On(self.DateRules.On(2023, 4, 19), self.TimeRules.At(10,0), self.actions) # Train every Sunday at 4am or first day of month (because new universe) self.Train(self.DateRules.Every(DayOfWeek.Sunday), self.TimeRules.At(4, 0), self.classifier_training) self.Train(self.DateRules.MonthStart(daysOffset = 0), self.TimeRules.At(4, 0), self.classifier_training) self.Schedule.On(self.DateRules.Every(DayOfWeek.Monday), self.TimeRules.At(10, 0), self.actions) self.Schedule.On(self.DateRules.MonthStart(daysOffset = 0), self.TimeRules.At(10, 0), self.actions) def SelectCoarse(self, coarse): """selecting CoarseFundamental objects based on criteria in paper""" if len(self.securities) == 0: selected = [c for c in coarse if c.HasFundamentalData and c.Price > 5] sorted_by_dollar_volume = sorted(selected, key=lambda c: c.DollarVolume, reverse=True) return [c.Symbol for c in sorted_by_dollar_volume[:self.num_coarse_symbols]] elif self.Time.day != 1: return Universe.Unchanged selected = [c for c in coarse if c.HasFundamentalData and c.Price > 5] sorted_by_dollar_volume = sorted(selected, key=lambda c: c.DollarVolume, reverse=True) return [c.Symbol for c in sorted_by_dollar_volume[:self.num_coarse_symbols]] def SelectFine(self, fine): """selecting FineFundamental objects based on our criteria""" if len(self.securities) == 0: selected = [f for f in fine if f.ValuationRatios.PERatio < 100 and f.MarketCap > 300000000 and f.ValuationRatios.PEGRatio < 3 and f.OperationRatios.TotalDebtEquityRatio.Value < 2 and f.OperationRatios.CurrentRatio.Value > 1] sorted_by_pe_ratio = sorted(selected, key=lambda f: f.ValuationRatios.PERatio, reverse=True) return [f.Symbol for f in sorted_by_pe_ratio[:self.num_fine_symbols]] if self.Time.day != 1: return Universe.Unchanged selected = [f for f in fine if f.ValuationRatios.PERatio < 100 and f.MarketCap > 300000000 and f.ValuationRatios.PEGRatio < 3 and f.OperationRatios.TotalDebtEquityRatio.Value < 2 and f.OperationRatios.CurrentRatio.Value > 1] sorted_by_pe_ratio = sorted(selected, key=lambda f: f.ValuationRatios.PERatio, reverse=True) return [f.Symbol for f in sorted_by_pe_ratio[:self.num_fine_symbols]] def OnSecuritiesChanged(self, changes): """triggers when Universe changes as result of filtering""" for security in changes.AddedSecurities: self.Debug(f"{self.Time}: Added {security}") for security in changes.RemovedSecurities: self.Debug(f"{self.Time}: Removed {security}") added = changes.AddedSecurities removed = changes.RemovedSecurities self.securities = list(set(self.securities).union(set(added)).difference(set(removed))) def OnData(self, data): return def OnEndOfDay(self): self.yesterday_total_profit = self.Portfolio.TotalProfit self.yesterday_total_fees = self.Portfolio.TotalFees def OnOrderEvent(self, orderEvent): """logs the details of an order""" self.Log(f'{orderEvent}') # ===================================================================================================================== # begin custom functions # ===================================================================================================================== def actions(self): self.Liquidate() #Liquidate the whole portfolio self.make_predictions() self.LinOptProg() lookback = 30 active_securities = [s.Symbol for s in self.securities] # risk management if len(self.weights) > 0: history = self.History(active_securities, timedelta(days=lookback), resolution=Resolution.Daily) history = history['close'].unstack(level=0) history.columns = active_securities returns = history.pct_change() w = np.array([i[0] for i in self.weights]) VaR = self.value_at_risk(returns, w) # calculation of value-at-risk limit self.Debug(f"VaR={VaR}") # position sizing max_loss_dollars = self.InitCash * self.risk_threshold # maximum loss in dollars we are willing to have in one trade # self.Debug(f"max_loss_dollars={max_loss_dollars}") if VaR <= self.risk_threshold: # if estimated loss in the next day is greater than our maximum risk threshold self.Debug(f"estimated risk {VaR} exceeds threshold") reduction_size = self.risk_threshold - VaR for (security, wt) in zip(active_securities, [i[0] for i in self.weights]): # # price_diff = how much this security's price has dropped now # # compared to the price at the time of order # price_diff = self.prices_at_order[security] - self.Securities[security].Price # self.Debug(f"price_diff: {price_diff}") # if wt < 0: # for short positions # price_diff = price_diff * -1 # so that price_diff is always negative # num_shares_to_order = max_loss_dollars * wt / price_diff # # make order within the risk limit # self.MarketOrder(security, num_shares_to_order) quantity = self.CalculateOrderQuantity(security, wt) reduced_quantity = math.ceil(quantity * (1-reduction_size)) if reduced_quantity != 0: self.Debug(f"VaR limit reached; expected loss is {VaR}. Reducing position size of \ {security} from {quantity} to {reduced_quantity}") else: a_securities = [s for s in self.securities] for (security, wt) in zip(a_securities, [i[0] for i in self.weights]): if wt != 0: self.SetHoldings(security.Symbol, wt) self.prices_at_order[security.Symbol] = self.Securities[security.Symbol].Price def classifier_training(self): self.return_mods = [] self.quantile_mods_lg = [] self.quantile_mods_st = [] #active_securities = [s.Symbol.Value for s in self.securities] active_securities = [s.Symbol for s in self.securities] self.Log(f"Training Started at {self.Time}") for security in active_securities: data = self.get_all_data([security], training=True, backtesting=False) # get tickers try: y_reg = data["return"] X = data.drop(["direction", "return", "symbol"], axis = 1) (ret, qut_lg, qut_st) = self.gb_returns(X, y_reg) except: ret = "NoModel" qut_lg = "NoModel" qut_st = "NoModel" self.return_mods.append(ret) self.quantile_mods_lg.append(qut_lg) self.quantile_mods_st.append(qut_st) self.trained = True def make_predictions(self): self.returns = [] self.quantiles = [] act_securities = [s.Symbol for s in self.securities] for i in range(len(act_securities)): security = act_securities[i] data = self.get_all_data([security], training=False) data = data[data.index == data.index.max()] prediction_data = data.drop(["direction", "return", "symbol"], axis = 1) try: r_pred = self.return_mods[i].predict(prediction_data)[0] except: r_pred = 0 if r_pred > 0: q_pred = self.quantile_mods_lg[i].predict(prediction_data)[0] elif r_pred < 0: q_pred = self.quantile_mods_st[i].predict(prediction_data)[0] else: q_pred = 0 self.returns.append(r_pred) self.quantiles.append(q_pred) self.Debug(self.returns) def gb_returns(self, X, y): """ Function to calculate expected returns and quantile loss """ mean_clf = GradientBoostingRegressor(n_estimators = 150, loss = "squared_error", criterion = "friedman_mse", learning_rate = 0.05, random_state = 1693, n_iter_no_change = 15) mean_fit_out = mean_clf.fit(X,y) quantile_clf_lg = GradientBoostingRegressor(n_estimators = 150, loss = "quantile", alpha = 0.05, n_iter_no_change = 15, learning_rate = 0.05, criterion = "friedman_mse", random_state = 1693) quantile_clf_st = GradientBoostingRegressor(n_estimators = 150, loss = "quantile", alpha = 0.95, n_iter_no_change = 15, learning_rate = 0.05, criterion = "friedman_mse", random_state = 1693) quantile_fit_lg = quantile_clf_lg.fit(X,y) quantile_fit_st = quantile_clf_st.fit(X,y) return (mean_fit_out, quantile_fit_lg, quantile_fit_st) def LinOptProg(self): """ Convex optimization Function """ self.weights = [] self.returns = np.array(self.returns).reshape(-1,1) self.quantiles = np.array(self.quantiles).reshape(-1,1) dirs = np.array([1 if d > 0 else 0 if d == 0 else -1 for d in self.returns]).reshape(-1,1) bounds = [(0, min(0.6, 3 / len(self.returns))) if d == 1 else (max(-0.6, -1.5 / len(self.returns)), 0) for d in dirs] A = np.array([-1*self.quantiles, dirs, -1*dirs]).squeeze() b = np.array([0.01, 1, 0]) res = linprog(-1*self.returns, A_ub = A, b_ub = b, bounds = bounds) if res.status == 0: self.weights = res.x.reshape(-1,1) else: self.Log("Optimization failed") # If optimization fails, give uniform weight 0 (buy nothing) self.weights = dirs * (1/len(self.returns)) del self.returns del self.quantiles def bollinger_bands(self, data, window=20, num_std=2): # Calculate the moving average data['MA'] = data['close'].rolling(window=window).mean() # Calculate the standard deviation data['STD'] = data['close'].rolling(window=window).std() # Calculate the Bollinger Bands data['Upper_BB'] = data['MA'] + (data['STD'] * num_std) data['Lower_BB'] = data['MA'] - (data['STD'] * num_std) return data def calculate_rsi(self,data, period=20): # Calculate the daily price changes (gains and losses) delta = data['close'].diff().dropna() # Separate gains and losses into their own series gains = delta.where(delta > 0, 0) losses = -delta.where(delta < 0, 0) # Calculate the average gain and average loss avg_gain = gains.ewm(com=period - 1, min_periods=period).mean() avg_loss = losses.ewm(com=period - 1, min_periods=period).mean() # Calculate the Relative Strength (RS) rs = avg_gain / avg_loss # Calculate the Relative Strength Index (RSI) rsi = 100 - (100 / (1 + rs)) data['rsi']=rsi return data def get_all_data(self, tickers, historical=True, training=False, backtesting=True): """ Gets historical data for training and prediction Parameters: ----------- tickers : list list of tickers to retrieve data historical : Bool, default True Flag to determine if we are training or backtesting; False if live trading training : Bool, default False If True, retrieves training data, a 90-day period. If performing predictions, False retrieves most recent day of data. For example, if called at 8 A.M., retrieves the previous trading days' data. backtesting : Bool, default True Flag to determine if we are backtesting or training Return: ------- self.dat : pd.DataFrame DataFrame containing data """ if historical: if backtesting: shift_factor = 30 # overshooting and select the maximum hist_lookback = 1 + shift_factor tiingo_lookback = 12 # in case of weekends? elif training: hist_lookback = self.lookback + 25 tiingo_lookback = self.lookback * 1.5 else: raise ValueError("Please train or backtest if using historical = True") else: shift_factor = 7 # needed so we can calculate lagged data hist_lookback = 1 + shift_factor tiingo_lookback = 1 + shift_factor # in case of weekends? full_data = pd.DataFrame() for symbol in tickers: # Get Price History history = self.History(symbol, hist_lookback) history = pd.DataFrame(history) # convert the historical data to a pandas DataFrame history['direction'] = np.where(history['close'] > history['open'], 1, 0) history['return']=history['close'].pct_change(periods=5) history = self.bollinger_bands(history) history = self.calculate_rsi(history) # Add relevant columns history['price_diff']=history["open"]-history["MA"] history['band_diff_up']=history["open"]-history["Upper_BB"] history['band_diff_lower']=history["open"]-history["Lower_BB"] # Add Tiingo Data data = self.AddData(TiingoNews, symbol).Symbol tiingo = self.History(data, int(tiingo_lookback), Resolution.Daily) if len(tiingo)!=0 and set(['description','publisheddate']).issubset(tiingo.columns): analyzer = SentimentIntensityAnalyzer() tiingo['polarity'] = tiingo['description'].dropna().apply(lambda x: analyzer.polarity_scores(x)) tiingo = pd.concat([tiingo.drop(['polarity'], axis=1), tiingo['polarity'].apply(pd.Series)], axis=1) #tiingo['sentiment'] = tiingo['compound'].apply(lambda x: 'positive' if x >0 else 'neutral' if x==0 else 'negative') tiingo = tiingo[[ 'publisheddate', 'compound']] tiingo['publisheddate'] = pd.to_datetime(tiingo['publisheddate'],utc=True).dt.date tiingo = tiingo.groupby(by=[ 'publisheddate'], as_index=False).sum() tiingo.rename(columns={'publisheddate' : 'time'}, inplace=True) tiingo.set_index('time',inplace=True) history = history.join(tiingo) lags = range(1,5) history=history.assign(**{ f'{col} (t-{lag})': history[col].shift(lag) for lag in lags for col in history }).dropna().drop(columns = ['close','high','low','volume'], errors='ignore') history['symbol'] = symbol.Value full_data=pd.concat([full_data, history]) self.Log(full_data) return full_data def get_daily_realized_pnl(self): daily_gross_profit = self.Portfolio.TotalProfit - self.yesterday_total_profit daily_fees = self.Portfolio.TotalFees - self.yesterday_total_fees return daily_gross_profit - daily_fees def value_at_risk(self, returns, weights, conf_level=0.05, num_days=1): """ Calculates the value-at-risk of the portfolio. --------------------------------------------------- Parameters returns : pd.DataFrame periodic returns conf_level : float confidence level. 0.05 by default weights : np.array portfolio weights num_days : int length of the period the VaR is calculated over """ cov_matrix = returns.cov() avg_return = returns.mean() portfolio_mean = avg_return.dot(weights) portfolio_stdev = np.sqrt(weights.T.dot(cov_matrix).dot(weights)) cutoff = norm.ppf(conf_level, portfolio_mean, portfolio_stdev) # n-Day VaR VaR = cutoff * np.sqrt(num_days) return VaR def cvar(self, returns, weights, conf_level=0.05): """ Calculates the portfolio CVaR ------------------------------------ Parameters returns : pd.DataFrame portfolio returns stdev : portfolio standard deviation conf_level : float confidence level """ VaR = value_at_risk(returns, weights) return VaR.mean()