Overall Statistics |
Total Trades 795 Average Win 0.03% Average Loss -0.03% Compounding Annual Return -0.764% Drawdown 1.600% Expectancy -0.112 Net Profit -1.523% Sharpe Ratio -0.659 Probabilistic Sharpe Ratio 0.312% Loss Rate 56% Win Rate 44% Profit-Loss Ratio 1.02 Alpha -0.005 Beta 0.003 Annual Standard Deviation 0.008 Annual Variance 0 Information Ratio -0.228 Tracking Error 0.161 Treynor Ratio -1.539 Total Fees $2536.80 Estimated Strategy Capacity $5300000.00 Lowest Capacity Asset NVDA RHM8UTD8DT2D |
from AlgorithmImports import * from datetime import timedelta from symbol_data import SymbolData class TiingoNewsImpactAlphaModel(AlphaModel): PREDICTION_INTERVAL = timedelta(minutes=30) def __init__(self, algorithm): self.algorithm = algorithm self.symbol_data_by_symbol = {} # Schedule model training sessions algorithm.Train(algorithm.DateRules.MonthStart(), algorithm.TimeRules.At(7, 0), self.train_models) def train_models(self): for symbol, symbol_data in self.symbol_data_by_symbol.items(): symbol_data.train_model() def Update(self, algorithm: QCAlgorithm, slice: Slice) -> List[Insight]: insights = [] # Get expected returns of each Symbol, given the current news expected_returns_by_symbol = {} for dataset_symbol, article in slice.Get(TiingoNews).items(): for asset_symbol in article.Symbols: if asset_symbol not in self.symbol_data_by_symbol: continue # Articles can mention assets that aren't in the universe is_open = algorithm.Securities[asset_symbol].Exchange.Hours.IsOpen(algorithm.Time + timedelta(minutes=1), extendedMarket=False) if not is_open: continue # Only trade during regular hours, otherwise market orders get converted to MOO and causes margin issues if asset_symbol not in expected_returns_by_symbol: expected_returns_by_symbol[asset_symbol] = [] expected_return = self.symbol_data_by_symbol[asset_symbol].get_expected_return(article) if expected_return is not None: expected_returns_by_symbol[asset_symbol].append(expected_return) expected_return_by_symbol = { asset_symbol: self.aggregate_expected_returns(expected_returns) for asset_symbol, expected_returns in expected_returns_by_symbol.items() if len(expected_returns) > 0 } for asset_symbol, expected_return in expected_return_by_symbol.items(): if self.symbol_data_by_symbol[asset_symbol].should_trade(expected_return): direction = InsightDirection.Up if expected_return > 0 else InsightDirection.Down insights.append(Insight.Price(asset_symbol, self.PREDICTION_INTERVAL, direction)) return insights def aggregate_expected_returns(self, expected_returns): return sum(expected_returns)/len(expected_returns) def OnSecuritiesChanged(self, algorithm: QCAlgorithm, changes: SecurityChanges) -> None: for security in changes.AddedSecurities: self.symbol_data_by_symbol[security.Symbol] = SymbolData(algorithm, security.Symbol, self.PREDICTION_INTERVAL, self.aggregate_expected_returns) for security in changes.RemovedSecurities: if security.Symbol in self.symbol_data_by_symbol: symbol_data = self.symbol_data_by_symbol.pop(security.Symbol, None) if symbol_data: symbol_data.dispose()
# region imports from AlgorithmImports import * from universe import QQQETFUniverseSelectionModel from alpha import TiingoNewsImpactAlphaModel from portfolio import PartitionedPortfolioConstructionModel # endregion class QQQConstituentsNewsImpactAlgorithm(QCAlgorithm): def Initialize(self): self.SetStartDate(2021, 1, 1) self.SetEndDate(2023, 1, 1) self.SetCash(1_000_000) self.AddUniverseSelection(QQQETFUniverseSelectionModel(self.UniverseSettings)) self.AddAlpha(TiingoNewsImpactAlphaModel(self)) self.SetPortfolioConstruction(PartitionedPortfolioConstructionModel(self, 10)) self.AddRiskManagement(NullRiskManagementModel()) self.SetExecution(ImmediateExecutionModel())
#region imports from AlgorithmImports import * #endregion #region imports from nltk.corpus import stopwords from nltk.tokenize import word_tokenize import numpy as np #endregion class IntradayNewsReturnGenerator: STOP_WORDS = set(stopwords.words('english')) PUNCTUATION = "!()-[]{};:'\",<>./?@#%^&*_~" def __init__(self, prediction_interval, aggregation_method): self.PREDICTION_INTERVAL = prediction_interval self.aggregate_future_returns = aggregation_method self.future_return_by_word = {} def update_word_sentiments(self, words_collection, security_history): self.future_return_by_word = {} if len(words_collection) == 0 or security_history.empty: return future_returns_by_word = {} for time, words in words_collection: # Get entry price start_time = np.datetime64(time) entry_prices = security_history.loc[security_history.index >= start_time] if entry_prices.empty: continue entry_price = entry_prices.iloc[0] # Get exit price end_time = np.datetime64(time + self.PREDICTION_INTERVAL) exit_prices = security_history.loc[security_history.index >= end_time] if exit_prices.empty: continue exit_price = exit_prices.iloc[0] # Calculate trade return future_return = (exit_price - entry_price) / entry_price # Save simulated trade return for each word filtered_words = self.filter_words(words) for word in filtered_words: if word not in future_returns_by_word: future_returns_by_word[word] = [] future_returns_by_word[word].append(future_return) # Aggregate future returns for each word self.future_return_by_word = { word: self.aggregate_future_returns(future_returns) for word, future_returns in future_returns_by_word.items() } def filter_words(self, words): word_tokens = word_tokenize(words) return list(set([w.lower() for w in word_tokens if w.lower() not in self.STOP_WORDS and w not in self.PUNCTUATION])) def get_expected_return(self, words): if len(self.future_return_by_word) == 0: return None filtered_words = self.filter_words(words) future_returns = [] for word in filtered_words: if word in self.future_return_by_word: future_returns.append(self.future_return_by_word[word]) if len(future_returns) == 0: return None return self.aggregate_future_returns(future_returns)
#region imports from AlgorithmImports import * #endregion class PartitionedPortfolioConstructionModel(PortfolioConstructionModel): def __init__(self, algorithm, num_partitions): self.algorithm = algorithm self.NUM_PARTITIONS = num_partitions # REQUIRED: Will determine the target percent for each insight def DetermineTargetPercent(self, activeInsights: List[Insight]) -> Dict[Insight, float]: target_pct_by_insight = {} # Sort insights by time they were emmited insights_sorted_by_time = sorted(activeInsights, key=lambda x: x.GeneratedTimeUtc) # Find target securities and group insights by Symbol target_symbols = [] insight_by_symbol = {} for insight in insights_sorted_by_time: insight_by_symbol[insight.Symbol] = insight # Liquidate securities that are removed from the universe if insight.Symbol in self.removed_symbols: continue if len(target_symbols) < self.NUM_PARTITIONS: target_symbols.append(insight.Symbol) occupied_portfolio_value = 0 occupied_partitions = 0 # Get last insight emmited for each target Symbol for symbol, insight in insight_by_symbol.items(): # Only invest in Symbols in `target_symbols` if symbol not in target_symbols: target_pct_by_insight[insight] = 0 else: security_holding = self.algorithm.Portfolio[symbol] # If we're invested in the security in the proper direction, do nothing if security_holding.IsShort and insight.Direction == InsightDirection.Down \ or security_holding.IsLong and insight.Direction == InsightDirection.Up: occupied_portfolio_value += security_holding.AbsoluteHoldingsValue occupied_partitions += 1 continue # If currently invested and there but the insight direction has changed, # change portfolio weight of security and reset set partition size if security_holding.IsShort and insight.Direction == InsightDirection.Up \ or security_holding.IsLong and insight.Direction == InsightDirection.Down: target_pct_by_insight[insight] = int(insight.Direction) # If not currently invested, set portfolio weight of security with partition size if not security_holding.Invested: target_pct_by_insight[insight] = int(insight.Direction) # Scale down target percentages to respect partitions (account for liquidations from insight expiry + universe removals) total_portfolio_value = self.algorithm.Portfolio.TotalPortfolioValue free_portfolio_pct = (total_portfolio_value - occupied_portfolio_value) / total_portfolio_value vacant_partitions = self.NUM_PARTITIONS - occupied_partitions scaling_factor = free_portfolio_pct / vacant_partitions if vacant_partitions != 0 else 0 for insight, target_pct in target_pct_by_insight.items(): target_pct_by_insight[insight] = target_pct * scaling_factor return target_pct_by_insight # Determines if the portfolio should be rebalanced base on the provided rebalancing func def IsRebalanceDue(self, insights: List[Insight], algorithmUtc: datetime) -> bool: # Rebalance when any of the following cases are true: # Case 1: A security we're invested in was removed from the universe # Case 2: The latest insight for a Symbol we're invested in has expired # Case 3: The insight direction for a security we're invested in has changed # Case 4: There is an insight for a security we're not currently invested in AND there is an available parition in the portfolio last_active_insights = self.GetTargetInsights() # Warning: This assumes that all insights have the same duration insight_symbols = [insight.Symbol for insight in last_active_insights] num_investments = 0 for symbol, security_holding in self.algorithm.Portfolio.items(): if not security_holding.Invested: continue num_investments += 1 # Case 1: A security we're invested in was removed from the universe # Case 2: The latest insight for a Symbol we're invested in has expired if symbol not in insight_symbols: return True for insight in last_active_insights: security_holding = self.algorithm.Portfolio[insight.Symbol] # Case 3: The insight direction for a security we're invested in has changed if security_holding.IsShort and insight.Direction == InsightDirection.Up \ or security_holding.IsLong and insight.Direction == InsightDirection.Down: return True # Case 4: There is an insight for a security we're not currently invested in AND there is an available parition in the portfolio if not security_holding.Invested and num_investments < self.NUM_PARTITIONS: return True return False def OnSecuritiesChanged(self, algorithm: QCAlgorithm, changes: SecurityChanges) -> None: super().OnSecuritiesChanged(algorithm, changes) self.removed_symbols = [] for security in changes.RemovedSecurities: self.removed_symbols.append(security.Symbol) if not self.InsightCollection.ContainsKey(security.Symbol): continue for insight in self.InsightCollection[security.Symbol]: self.InsightCollection.Remove(insight)
from AlgorithmImports import * from datetime import timedelta import numpy as np import pytz from news_return_generator import IntradayNewsReturnGenerator class SymbolData: LOOKBACK_PERIOD = timedelta(days=30) STANDARD_DEVS = 6 STANDARD_DEV_PERIOD = 30 def __init__(self, algorithm, symbol, prediction_interval, aggregation_method): self.algorithm = algorithm self.symbol = symbol self.dataset_symbol = algorithm.AddData(TiingoNews, symbol).Symbol self.news_return_generator = IntradayNewsReturnGenerator(prediction_interval, aggregation_method) self.predictions_bb = BollingerBands(self.STANDARD_DEV_PERIOD, self.STANDARD_DEVS) algorithm.Train(self.train_model) def train_model(self): # Get and structure news history news_history = self.algorithm.History[TiingoNews](self.dataset_symbol, self.LOOKBACK_PERIOD) filtered_news_history = [] words_collection = [] for article in news_history: # Convert article timestamp (GMT) into the asset time zone timestamp = article.Time.astimezone(pytz.timezone(self.algorithm.Securities[self.symbol].Exchange.Hours.TimeZone.ToString())).replace(tzinfo=None) # Skip articles that were released outside of regular trading hours is_open = self.algorithm.Securities[self.symbol].Exchange.Hours.IsOpen(timestamp + timedelta(minutes=1), extendedMarket=False) if not is_open: continue # If you train the model with articles that we released outside of RTH, then the simulated entry price and exit price can be the same (the market opening price) filtered_news_history.append(article) words_collection.append( (timestamp, self.get_words(article)) ) # Get security history security_history = self.algorithm.History(self.symbol, self.LOOKBACK_PERIOD) if not security_history.empty: security_history = security_history.loc[self.symbol]['close'] # Train news return generator self.news_return_generator.update_word_sentiments(words_collection, security_history) # Warm up STD of predictions using the most recent articles for article in filtered_news_history[-self.predictions_bb.WarmUpPeriod:]: expected_return = self.get_expected_return(article) if expected_return is None: continue # The expected return can be None if the article has no title (data issue) self.predictions_bb.Update(article.Time, expected_return) def get_expected_return(self, article): return self.news_return_generator.get_expected_return(self.get_words(article)) def get_words(self, article): return article.Title + article.Description def dispose(self): self.algorithm.RemoveSecurity(self.dataset_symbol) def should_trade(self, expected_return): if expected_return in [None, 0]: return False return self.predictions_bb.IsReady \ and (expected_return > self.predictions_bb.UpperBand.Current.Value \ or expected_return < self.predictions_bb.LowerBand.Current.Value)
from AlgorithmImports import * class QQQETFUniverseSelectionModel(ETFConstituentsUniverseSelectionModel): def __init__(self, universe_settings: UniverseSettings = None) -> None: symbol = Symbol.Create("QQQ", SecurityType.Equity, Market.USA) super().__init__(symbol, universe_settings, self.ETFConstituentsFilter) def ETFConstituentsFilter(self, constituents: List[ETFConstituentData]) -> List[Symbol]: selected = sorted([c for c in constituents if c.Weight], key=lambda c: c.Weight, reverse=True)[:10] return [c.Symbol for c in selected]