Overall Statistics |
Total Orders 29911 Average Win 0.09% Average Loss -0.08% Compounding Annual Return 11.704% Drawdown 12.900% Expectancy -0.053 Start Equity 1000000 End Equity 1246636.84 Net Profit 24.664% Sharpe Ratio 0.491 Sortino Ratio 0.536 Probabilistic Sharpe Ratio 21.329% Loss Rate 55% Win Rate 45% Profit-Loss Ratio 1.11 Alpha 0.079 Beta 0.331 Annual Standard Deviation 0.174 Annual Variance 0.03 Information Ratio 0.325 Tracking Error 0.198 Treynor Ratio 0.258 Total Fees $17163.76 Estimated Strategy Capacity $8000.00 Lowest Capacity Asset GE R735QTJ8XC9X Portfolio Turnover 23.64% |
#region imports from AlgorithmImports import * from collections import deque import numpy as np import scipy as sp #endregion class TSZscore_VwapReversion(AlphaModel): def __init__(self): self.period = 20 self.securities_list = [] self.day = -1 self.historical_VwapReversion_by_symbol = {} def on_securities_changed(self, algorithm: QCAlgorithm, changes: SecurityChanges) -> None: # Register each security in the universe for security in changes.added_securities: if security not in self.securities_list: self.historical_VwapReversion_by_symbol[security.symbol] = deque(maxlen=self.period) self.securities_list.append(security) for security in changes.removed_securities: if security in self.securities_list: self.securities_list.remove(security) def update(self, algorithm: QCAlgorithm, data: Slice) -> List[Insight]: if data.quote_bars.count == 0: # Only emit insights when there is quote data, not when a corporate action occurs (at midnight) return [] if self.day == algorithm.time.day: # Only emit insights once per day return [] self.day = algorithm.time.day # Neutralize Vwap/Close of securities so it's mean 0, then append them to the list temp_list = {} for security in self.securities_list: if security.Close != 0: temp_list[security.symbol] = algorithm.vwap(security.symbol).Current.Value/security.Close else: temp_list[security.symbol] = 1 temp_mean = sum(temp_list.values())/len(temp_list.values()) for security in self.securities_list: self.historical_VwapReversion_by_symbol[security.symbol].appendleft(temp_list[security.symbol]-temp_mean) # Compute ts_zscore of current Vwap/Close zscore_by_symbol = {} for security in self.securities_list: zscore_by_symbol[security.symbol] = sp.stats.zscore(self.historical_VwapReversion_by_symbol[security.symbol])[0] # create insights to long / short the asset insights = [] weights = {} for symbol, zscore in zscore_by_symbol.items(): if not np.isnan(zscore): weight = zscore else: weight = 0 weights[symbol] = weight # Make scale similar across alphas abs_weight = {key: abs(val) for key, val in weights.items()} weights_sum = sum(abs_weight.values()) if weights_sum != 0: for symbol, weight in weights.items(): weights[symbol] = weight/ weights_sum for symbol, weight in weights.items(): if weight > 0: insights.append(Insight.price(symbol, Expiry.END_OF_DAY, InsightDirection.UP, weight=weight)) elif weight < 0: insights.append(Insight.price(symbol, Expiry.END_OF_DAY, InsightDirection.DOWN, weight=weight)) return insights class TSZscore_DividendGrowth(AlphaModel): def __init__(self): self.period = 252 self.day = -1 self.securities_list = [] self.dps = {} def on_securities_changed(self, algorithm: QCAlgorithm, changes: SecurityChanges) -> None: # Register each security in the universe for security in changes.added_securities: if security not in self.securities_list: self.dps[security.symbol] = deque(maxlen=self.period) self.securities_list.append(security) for security in changes.removed_securities: if security in self.securities_list: self.securities_list.remove(security) def update(self, algorithm: QCAlgorithm, data: Slice) -> List[Insight]: if data.quote_bars.count == 0: # Only emit insights when there is quote data, not when a corporate action occurs (at midnight) return [] if self.day == algorithm.time.day: # Only emit insights once per day return [] self.day = algorithm.time.day # Append dividend to the list, compute ts_zscore of current dividend zscore_by_symbol = {} for security in self.securities_list: if not np.isnan(security.fundamentals.earning_reports.dividend_per_share.Value): self.dps[security.symbol].appendleft(security.fundamentals.earning_reports.dividend_per_share.Value) zscore_by_symbol[security.symbol] = sp.stats.zscore(self.dps[security.symbol])[0] # create insights to long / short the asset insights = [] weights = {} for symbol, zscore in zscore_by_symbol.items(): if not np.isnan(zscore): weight = zscore else: weight = 0 weights[symbol] = weight # Make scale similar across alphas abs_weight = {key: abs(val) for key, val in weights.items()} weights_sum = sum(abs_weight.values()) if weights_sum != 0: for symbol, weight in weights.items(): weights[symbol] = weight/ weights_sum for symbol, weight in weights.items(): if weight >= 0: insights.append(Insight.price(symbol, Expiry.END_OF_DAY, InsightDirection.UP, weight=weight)) else: insights.append(Insight.price(symbol, Expiry.END_OF_DAY, InsightDirection.DOWN, weight=weight)) return insights class Conditional_Reversion(AlphaModel): def __init__(self): self.condition_period = 5 self.period = 3 self.securities_list = [] self.day = -1 self.historical_volume_by_symbol = {} self.historical_close_by_symbol = {} def on_securities_changed(self, algorithm: QCAlgorithm, changes: SecurityChanges) -> None: # Register each security in the universe for security in changes.added_securities: if security not in self.securities_list: self.historical_volume_by_symbol[security.symbol] = deque(maxlen=self.condition_period) self.historical_close_by_symbol[security.symbol] = deque(maxlen=self.period) self.securities_list.append(security) for security in changes.removed_securities: if security in self.securities_list: self.securities_list.remove(security) def update(self, algorithm: QCAlgorithm, data: Slice) -> List[Insight]: if data.quote_bars.count == 0: # Only emit insights when there is quote data, not when a corporate action occurs (at midnight) return [] if self.day == algorithm.time.day: # Only emit insights once per month return [] self.day = algorithm.time.day # Append volume and close to the list zscore_by_symbol = {} return_by_symbol = {} for security in self.securities_list: if (security.Close != 0 and security.Volume != 0): self.historical_close_by_symbol[security.symbol].appendleft(security.Close) self.historical_volume_by_symbol[security.symbol].appendleft(security.Volume) return_by_symbol[security.symbol] = (self.historical_close_by_symbol[security.symbol][0] - self.historical_close_by_symbol[security.symbol][-1]) if return_by_symbol == {}: # Don't emit insight if there's no valid data return [] # Rank the 3 days return among securities to return value from 0 to 1 sorted_return_by_symbol = sorted(return_by_symbol.items(), key=lambda x: x[1]) return_rank_by_symbol = {} for item in sorted_return_by_symbol: # item is a key-value pair. [0] is the security symbol and [1] is the return return_rank_by_symbol[item[0]] = (sorted_return_by_symbol.index(item))/ len(sorted_return_by_symbol) # Calculating the final weight weights = {} for security in self.securities_list: # If condition is met, assign weight if len(self.historical_volume_by_symbol[security.symbol]) != 0 and max(self.historical_volume_by_symbol[security.symbol]) == security.Volume: weight = -return_rank_by_symbol[security.symbol] # Change this sign and complete different behaviour if purely long. Investigate else: weight = 0 weights[security.symbol] = weight weights_mean = sum(weights.values())/len(weights.values()) for symbol, weight in weights.items(): weights[symbol] = weight - weights_mean # Make scale similar across alphas abs_weight = {key: abs(val) for key, val in weights.items()} weights_sum = sum(abs_weight.values()) if weights_sum != 0: for symbol, weight in weights.items(): weights[symbol] = weight/ weights_sum # Create insights to long / short the asset insights = [] for symbol, weight in weights.items(): if weight > 0: insights.append(Insight.price(symbol, Expiry.END_OF_DAY, InsightDirection.UP, weight=weight)) elif weight < 0: insights.append(Insight.price(symbol, Expiry.END_OF_DAY, InsightDirection.DOWN, weight=weight)) #Expiry.END_OF_DAY return insights def __init__(self): self.day = -1 self.securities_list = [] self.sentiment_indicator_by_symbol = {} self.sentiment_by_symbol = {} self.past_close_by_symbol = {} def on_securities_changed(self, algorithm: QCAlgorithm, changes: SecurityChanges) -> None: # Register each security in the universe for security in changes.added_securities: if security not in self.securities_list: self.securities_list.append(security) self.sentiment_indicator_by_symbol[security.symbol] = algorithm.add_data(BrainSentimentIndicator7Day, security.symbol) for security in changes.removed_securities: if security in self.securities_list: self.securities_list.remove(security) self.sentiment_indicator_by_symbol.pop(security.symbol, None) def update(self, algorithm: QCAlgorithm, data: Slice) -> List[Insight]: if data.quote_bars.count == 0: # Only emit insights when there is quote data, not when a corporate action occurs (at midnight) return [] if self.day == algorithm.time.day: # Only emit insights once per day return [] self.day = algorithm.time.day # Get sentiment value for symbol, indicator in self.sentiment_indicator_by_symbol.items(): if slice.ContainsKey(indicator.dataset_symbol): self.sentiment_by_symbol[symbol] = slice[indicator.dataset_symbol].sentiment # Calculate dividend growth mean for neutralizing dps_growth_by_symbol = {} for security in self.securities_list: if not np.isnan(security.fundamentals.earning_ratios.dps_growth.Value): dps_growth_by_symbol[security.symbol] = security.fundamentals.earning_ratios.dps_growth.Value dps_growth_mean = sum(dps_growth_by_symbol.values())/len(dps_growth_by_symbol.values()) # Neutralized the dividend growth for security in self.securities_list: if not np.isnan(security.fundamentals.earning_ratios.dps_growth.Value): dps_growth_by_symbol[security.symbol] = security.fundamentals.earning_ratios.dps_growth.Value - dps_growth_mean # Compute ts_zscore of current dividend growth keys, vals = zip(*dps_growth_by_symbol.items()) z = sp.stats.zscore(vals) zscore_by_symbol = dict(zip(keys,z)) # create insights to long / short the asset insights = [] weights = {} for symbol, zscore in zscore_by_symbol.items(): if not np.isnan(zscore): weight = zscore else: weight = 0 weights[symbol] = weight for symbol, weight in weights.items(): if weight >= 0: insights.append(Insight.price(symbol, Expiry.END_OF_DAY, InsightDirection.UP, weight=weight)) else: insights.append(Insight.price(symbol, Expiry.END_OF_DAY, InsightDirection.DOWN, weight=weight)) return insights
from AlgorithmImports import * from alpha import * import tensorflow as tf import numpy as np import pandas as pd class MLPPredictionAlgorithm(QCAlgorithm): def Initialize(self): self.set_start_date(2021, 1, 3) self.set_end_date(2023, 1, 1) self.set_cash(1_000_000) # initialize dictionary to hold historical data, lastest features for prediction self.data = {} self.latest_features = {} self.predictions = {} # initialize data storage for Alpha outputs self.alpha_outputs = {} # set stock universe self.symbols = [] self.UniverseSettings.Resolution = Resolution.Daily self.add_universe(self.CoarseSelectionFilter) # add alphas self.add_alpha(TSZscore_VwapReversion()) # initialize ML model self.model = None self.lookback = 19 self.SetWarmUp(self.lookback) # schedule the training event, and prediction event self.Schedule.On(self.DateRules.EveryDay(), self.TimeRules.At( 12, 0), self.TrainModel) self.Schedule.On(self.DateRules.EveryDay(), self.TimeRules.At( 14, 30), self.MakePredictions) # set portfolio construction, risk management, and execution self.set_portfolio_construction(InsightWeightingPortfolioConstructionModel(rebalance=Expiry.EndOfMonth)) self.add_risk_management(NullRiskManagementModel()) self.set_execution(ImmediateExecutionModel()) def CoarseSelectionFilter(self, coarse): # sort descending by daily dollar volume sorted_by_dollar_volume = sorted( coarse, key=lambda x: x.DollarVolume, reverse=True) # select only Symbols with a price of more than $10 per share self.symbols = [ c.Symbol for c in sorted_by_dollar_volume if c.Price > 10] self.symbols = self.symbols[:100] # initialize the historical data dictionary for symbol in self.symbols: if symbol not in self.data: self.data[symbol] = pd.DataFrame() return self.symbols # def OnSecuritiesChanged(self, changes): # for security in changes.AddedSecurities: # symbol = security.Symbol # if symbol not in self.data: # self.data[symbol] = self.History( # symbol, self.lookback, Resolution.Daily) # for security in changes.RemovedSecurities: # symbol = security.Symbol # if symbol in self.data: # del self.data[symbol] def OnData(self, data): if self.is_warming_up: return for symbol in self.symbols: if symbol not in data.Bars: self.Debug("Missing data for symbol: " + str(symbol)) continue # retrieve the historical data history = self.History(symbol, timedelta( days=self.lookback), Resolution.DAILY) # update self.data with historical data if not history.empty: self.data[symbol] = history def ModelConstruction(self): model = tf.keras.models.Sequential() model.add(tf.keras.layers.Dense( 64, input_shape=(5,), activation=tf.nn.relu)) model.add(tf.keras.layers.Dense(64, activation=tf.nn.relu)) model.add(tf.keras.layers.Dense(64, activation=tf.nn.relu)) model.add(tf.keras.layers.Dense(1, activation='linear')) optimizer = tf.keras.optimizers.Adam(0.001) model.compile(optimizer=optimizer, loss='mse', metrics=['accuracy']) return model def TrainModel(self): X = [] y = [] for symbol in self.symbols: # self.debug(f"symbol: {symbol}, shape: {self.data[symbol].shape}") if self.data[symbol].shape[0] == 0: continue # drop NA, normalize data normalized_data = self.data[symbol].dropna() normalized_data = (normalized_data - normalized_data.mean()) / normalized_data.std() # self.debug(f"normalized data: {normalized_data.iloc[1, :]}") for i in range(0, self.data[symbol].shape[0]): # flatten the 3d standardize data into 2d, lose symbol information, question: what about NA value? features = normalized_data.iloc[i] # self.debug(f"symbol: {symbol}, i: {i}, features: {features}") features = features.values.flatten() if len(features) == 5: X.append(features) else: self.Debug(f"Inconsistent feature length for {symbol} at index {i}") for i in range(0, self.data[symbol].shape[0] - 1): # get the target label (return) symbol_return = (self.data[symbol].iloc[i + 1]['close'] - self.data[symbol].iloc[i]['close']) / self.data[symbol].iloc[i]['close'] y.append(symbol_return) # last row will be stored for the prediction self.latest_features[symbol] = normalized_data.iloc[-1].values.flatten() # X discard the last row X = X[:-1] # self.debug(f"X: {X}") # self.debug(f"y: {y}") # train the MLP model if len(X) > 0 and len(y) > 0: X = np.array(X) y = np.array(y) X = X.reshape(-1, 5) # y = y * 10000 # scale the return self.debug(f"X shape: {X.shape}") self.debug(f"y length: {len(y)}") # Train the MLP model self.model = self.ModelConstruction() self.model.fit(X, y, epochs=10, batch_size=5, verbose=1) self.Debug("Model trained with {} samples.".format(len(X))) else: self.Debug("Not enough data to train the model.") def MakePredictions(self): self.debug("prediction") if not self.model: self.Debug("Model is not trained yet") return # clear previous predictions self.predictions.clear() for symbol in self.symbols: if symbol not in self.latest_features: self.debug(f"Missing latest features for symbol {symbol}") continue features = self.latest_features[symbol].reshape(1, -1) # self.debug(f"prediction input shape: {features.shape}") # self.debug(f"prediction input for {symbol}: {features}") prediction = self.model.predict(features) self.predictions[symbol] = prediction[0][0] self.debug(f"Prediction for symbol {symbol}: {self.predictions[symbol]}") self.debug(f"Predictions: {self.predictions}") if len(self.predictions) > 0: self.TradeOnPredictions() def TradeOnPredictions(self): # rank the stocks sorted_predictions = sorted(self.predictions.items(), key=lambda item: item[1]) n = len(sorted_predictions) long_threshold = int(n * 0.8) short_threshold = int(n * 0.2) for i, (symbol, prediction) in enumerate(sorted_predictions): if i < short_threshold: self.SetHoldings(symbol, -1.0 / short_threshold) self.Debug(f"Short: {symbol}") elif i >= long_threshold: self.SetHoldings(symbol, 1.0 / long_threshold) self.Debug(f"Long: {symbol}") else: self.SetHoldings(symbol, 0) self.Debug(f"No Position: {symbol}")