Overall Statistics
Total Trades
795
Average Win
0.03%
Average Loss
-0.03%
Compounding Annual Return
-0.764%
Drawdown
1.600%
Expectancy
-0.112
Net Profit
-1.523%
Sharpe Ratio
-0.659
Probabilistic Sharpe Ratio
0.312%
Loss Rate
56%
Win Rate
44%
Profit-Loss Ratio
1.02
Alpha
-0.005
Beta
0.003
Annual Standard Deviation
0.008
Annual Variance
0
Information Ratio
-0.228
Tracking Error
0.161
Treynor Ratio
-1.539
Total Fees
$2536.80
Estimated Strategy Capacity
$5300000.00
Lowest Capacity Asset
NVDA RHM8UTD8DT2D
from AlgorithmImports import *
from datetime import timedelta
from symbol_data import SymbolData

class TiingoNewsImpactAlphaModel(AlphaModel):

    PREDICTION_INTERVAL = timedelta(minutes=30)

    def __init__(self, algorithm):
        self.algorithm = algorithm
        self.symbol_data_by_symbol = {}

        # Schedule model training sessions
        algorithm.Train(algorithm.DateRules.MonthStart(), algorithm.TimeRules.At(7, 0), self.train_models)

    def train_models(self):
        for symbol, symbol_data in self.symbol_data_by_symbol.items():
            symbol_data.train_model()
    
    def Update(self, algorithm: QCAlgorithm, slice: Slice) -> List[Insight]:
        insights = []

        # Get expected returns of each Symbol, given the current news
        expected_returns_by_symbol = {}
        for dataset_symbol, article in slice.Get(TiingoNews).items():
            for asset_symbol in article.Symbols:
                if asset_symbol not in self.symbol_data_by_symbol: 
                    continue # Articles can mention assets that aren't in the universe
                is_open = algorithm.Securities[asset_symbol].Exchange.Hours.IsOpen(algorithm.Time + timedelta(minutes=1), extendedMarket=False)
                if not is_open: 
                    continue # Only trade during regular hours, otherwise market orders get converted to MOO and causes margin issues
                if asset_symbol not in expected_returns_by_symbol:
                    expected_returns_by_symbol[asset_symbol] = []
                expected_return = self.symbol_data_by_symbol[asset_symbol].get_expected_return(article)
                if expected_return is not None:
                    expected_returns_by_symbol[asset_symbol].append(expected_return)
        expected_return_by_symbol = {
            asset_symbol: self.aggregate_expected_returns(expected_returns)
                for asset_symbol, expected_returns in expected_returns_by_symbol.items()
                if len(expected_returns) > 0
        }

        for asset_symbol, expected_return in expected_return_by_symbol.items():
            if self.symbol_data_by_symbol[asset_symbol].should_trade(expected_return):
                direction = InsightDirection.Up if expected_return > 0 else InsightDirection.Down
                insights.append(Insight.Price(asset_symbol, self.PREDICTION_INTERVAL, direction))

        return insights

    def aggregate_expected_returns(self, expected_returns):
        return sum(expected_returns)/len(expected_returns)

    def OnSecuritiesChanged(self, algorithm: QCAlgorithm, changes: SecurityChanges) -> None:
        for security in changes.AddedSecurities:
            self.symbol_data_by_symbol[security.Symbol] = SymbolData(algorithm, security.Symbol, self.PREDICTION_INTERVAL, self.aggregate_expected_returns)
        
        for security in changes.RemovedSecurities:
            if security.Symbol in self.symbol_data_by_symbol:
                symbol_data = self.symbol_data_by_symbol.pop(security.Symbol, None)
                if symbol_data:
                    symbol_data.dispose()
# region imports
from AlgorithmImports import *
from universe import QQQETFUniverseSelectionModel
from alpha import TiingoNewsImpactAlphaModel
from portfolio import PartitionedPortfolioConstructionModel
# endregion

class QQQConstituentsNewsImpactAlgorithm(QCAlgorithm):

    def Initialize(self):
        self.SetStartDate(2021, 1, 1)
        self.SetEndDate(2023, 1, 1)
        self.SetCash(1_000_000)

        self.AddUniverseSelection(QQQETFUniverseSelectionModel(self.UniverseSettings))
        self.AddAlpha(TiingoNewsImpactAlphaModel(self))
        self.SetPortfolioConstruction(PartitionedPortfolioConstructionModel(self, 10))
        self.AddRiskManagement(NullRiskManagementModel())
        self.SetExecution(ImmediateExecutionModel()) 
#region imports
from AlgorithmImports import *
#endregion
#region imports
from nltk.corpus import stopwords
from nltk.tokenize import word_tokenize
import numpy as np
#endregion

class IntradayNewsReturnGenerator:
    STOP_WORDS = set(stopwords.words('english'))
    PUNCTUATION = "!()-[]{};:'\",<>./?@#%^&*_~"

    def __init__(self, prediction_interval, aggregation_method):
        self.PREDICTION_INTERVAL = prediction_interval
        self.aggregate_future_returns = aggregation_method
        self.future_return_by_word = {}

    def update_word_sentiments(self, words_collection, security_history):
        self.future_return_by_word = {}
        if len(words_collection) == 0 or security_history.empty:
            return
        
        future_returns_by_word = {}
        for time, words in words_collection:
            # Get entry price
            start_time = np.datetime64(time)
            entry_prices = security_history.loc[security_history.index >= start_time] 
            if entry_prices.empty:
                continue
            entry_price = entry_prices.iloc[0]
            
            # Get exit price
            end_time = np.datetime64(time + self.PREDICTION_INTERVAL)
            exit_prices = security_history.loc[security_history.index >= end_time]
            if exit_prices.empty:
                continue
            exit_price = exit_prices.iloc[0]

            # Calculate trade return
            future_return = (exit_price - entry_price) / entry_price

            # Save simulated trade return for each word
            filtered_words = self.filter_words(words)
            for word in filtered_words:
                if word not in future_returns_by_word:
                    future_returns_by_word[word] = []
                future_returns_by_word[word].append(future_return)

        # Aggregate future returns for each word
        self.future_return_by_word = {
            word: self.aggregate_future_returns(future_returns) 
                for word, future_returns in future_returns_by_word.items()
        }
    
    def filter_words(self, words):
        word_tokens = word_tokenize(words)
        return list(set([w.lower() for w in word_tokens if w.lower() not in self.STOP_WORDS and w not in self.PUNCTUATION]))

    def get_expected_return(self, words):
        if len(self.future_return_by_word) == 0:
            return None
        filtered_words = self.filter_words(words)
        future_returns = []
        for word in filtered_words:
            if word in self.future_return_by_word:
                future_returns.append(self.future_return_by_word[word])
        if len(future_returns) == 0:
            return None
        return self.aggregate_future_returns(future_returns)
#region imports
from AlgorithmImports import *
#endregion

class PartitionedPortfolioConstructionModel(PortfolioConstructionModel):
    
    def __init__(self, algorithm, num_partitions):
        self.algorithm = algorithm
        self.NUM_PARTITIONS = num_partitions

    # REQUIRED: Will determine the target percent for each insight
    def DetermineTargetPercent(self, activeInsights: List[Insight]) -> Dict[Insight, float]:        
        target_pct_by_insight = {}
        
        # Sort insights by time they were emmited
        insights_sorted_by_time = sorted(activeInsights, key=lambda x: x.GeneratedTimeUtc)

        # Find target securities and group insights by Symbol
        target_symbols = []
        insight_by_symbol = {}
        for insight in insights_sorted_by_time:
            insight_by_symbol[insight.Symbol] = insight
            # Liquidate securities that are removed from the universe
            if insight.Symbol in self.removed_symbols:
                continue
            if len(target_symbols) < self.NUM_PARTITIONS:
                target_symbols.append(insight.Symbol)

        occupied_portfolio_value = 0
        occupied_partitions = 0
        # Get last insight emmited for each target Symbol
        for symbol, insight in insight_by_symbol.items():
            # Only invest in Symbols in `target_symbols`
            if symbol not in target_symbols:
                target_pct_by_insight[insight] = 0
            else:
                security_holding = self.algorithm.Portfolio[symbol]
                # If we're invested in the security in the proper direction, do nothing
                if security_holding.IsShort and insight.Direction == InsightDirection.Down \
                    or security_holding.IsLong and insight.Direction == InsightDirection.Up:
                    occupied_portfolio_value += security_holding.AbsoluteHoldingsValue
                    occupied_partitions += 1
                    continue

                # If currently invested and there but the insight direction has changed, 
                #  change portfolio weight of security and reset set partition size
                if security_holding.IsShort and insight.Direction == InsightDirection.Up \
                    or security_holding.IsLong and insight.Direction == InsightDirection.Down:
                    target_pct_by_insight[insight] = int(insight.Direction)

                # If not currently invested, set portfolio weight of security with partition size
                if not security_holding.Invested:
                    target_pct_by_insight[insight] = int(insight.Direction)

        # Scale down target percentages to respect partitions (account for liquidations from insight expiry + universe removals)
        total_portfolio_value = self.algorithm.Portfolio.TotalPortfolioValue
        free_portfolio_pct = (total_portfolio_value - occupied_portfolio_value) / total_portfolio_value
        vacant_partitions = self.NUM_PARTITIONS - occupied_partitions
        scaling_factor = free_portfolio_pct / vacant_partitions if vacant_partitions != 0 else 0
        for insight, target_pct in target_pct_by_insight.items():
            target_pct_by_insight[insight] = target_pct * scaling_factor

        return target_pct_by_insight

    # Determines if the portfolio should be rebalanced base on the provided rebalancing func
    def IsRebalanceDue(self, insights: List[Insight], algorithmUtc: datetime) -> bool:
        # Rebalance when any of the following cases are true:
        #  Case 1: A security we're invested in was removed from the universe
        #  Case 2: The latest insight for a Symbol we're invested in has expired
        #  Case 3: The insight direction for a security we're invested in has changed
        #  Case 4: There is an insight for a security we're not currently invested in AND there is an available parition in the portfolio

        last_active_insights = self.GetTargetInsights() # Warning: This assumes that all insights have the same duration
        insight_symbols = [insight.Symbol for insight in last_active_insights]
        num_investments = 0
        for symbol, security_holding in self.algorithm.Portfolio.items():
            if not security_holding.Invested:
                continue
            num_investments += 1
            #  Case 1: A security we're invested in was removed from the universe
            #  Case 2: The latest insight for a Symbol we're invested in has expired
            if symbol not in insight_symbols:
                return True
        
        for insight in last_active_insights:
            security_holding = self.algorithm.Portfolio[insight.Symbol]
            #  Case 3: The insight direction for a security we're invested in has changed
            if security_holding.IsShort and insight.Direction == InsightDirection.Up \
                or security_holding.IsLong and insight.Direction == InsightDirection.Down:
                return True

            #  Case 4: There is an insight for a security we're not currently invested in AND there is an available parition in the portfolio
            if not security_holding.Invested and num_investments < self.NUM_PARTITIONS:
                return True

        return False

    def OnSecuritiesChanged(self, algorithm: QCAlgorithm, changes: SecurityChanges) -> None:
        super().OnSecuritiesChanged(algorithm, changes)        
        self.removed_symbols = []
        for security in changes.RemovedSecurities:
            self.removed_symbols.append(security.Symbol)
            if not self.InsightCollection.ContainsKey(security.Symbol):
                continue
            for insight in self.InsightCollection[security.Symbol]:
                self.InsightCollection.Remove(insight)
                
from AlgorithmImports import *
from datetime import timedelta
import numpy as np
import pytz
from news_return_generator import IntradayNewsReturnGenerator

class SymbolData:

    LOOKBACK_PERIOD = timedelta(days=30)
    STANDARD_DEVS = 6
    STANDARD_DEV_PERIOD = 30

    def __init__(self, algorithm, symbol, prediction_interval, aggregation_method):
        self.algorithm = algorithm
        self.symbol = symbol

        self.dataset_symbol = algorithm.AddData(TiingoNews, symbol).Symbol
        self.news_return_generator = IntradayNewsReturnGenerator(prediction_interval, aggregation_method)
        self.predictions_bb = BollingerBands(self.STANDARD_DEV_PERIOD, self.STANDARD_DEVS)

        algorithm.Train(self.train_model)
    
    def train_model(self):
        # Get and structure news history
        news_history = self.algorithm.History[TiingoNews](self.dataset_symbol, self.LOOKBACK_PERIOD)
        filtered_news_history = []
        words_collection = []
        for article in news_history:
            # Convert article timestamp (GMT) into the asset time zone
            timestamp = article.Time.astimezone(pytz.timezone(self.algorithm.Securities[self.symbol].Exchange.Hours.TimeZone.ToString())).replace(tzinfo=None)

            # Skip articles that were released outside of regular trading hours            
            is_open = self.algorithm.Securities[self.symbol].Exchange.Hours.IsOpen(timestamp + timedelta(minutes=1), extendedMarket=False)
            if not is_open: 
                continue # If you train the model with articles that we released outside of RTH, then the simulated entry price and exit price can be the same (the market opening price)
            filtered_news_history.append(article)

            words_collection.append( (timestamp, self.get_words(article)) )

        # Get security history
        security_history = self.algorithm.History(self.symbol, self.LOOKBACK_PERIOD)
        if not security_history.empty:
            security_history = security_history.loc[self.symbol]['close']

        # Train news return generator
        self.news_return_generator.update_word_sentiments(words_collection, security_history)

        # Warm up STD of predictions using the most recent articles
        for article in filtered_news_history[-self.predictions_bb.WarmUpPeriod:]:
            expected_return = self.get_expected_return(article)
            if expected_return is None:
                continue # The expected return can be None if the article has no title (data issue)
            self.predictions_bb.Update(article.Time, expected_return)

    def get_expected_return(self, article):
        return self.news_return_generator.get_expected_return(self.get_words(article))

    def get_words(self, article):
        return article.Title + article.Description

    def dispose(self):
        self.algorithm.RemoveSecurity(self.dataset_symbol)
    
    def should_trade(self, expected_return):
        if expected_return in [None, 0]:
            return False
        return self.predictions_bb.IsReady \
            and (expected_return > self.predictions_bb.UpperBand.Current.Value \
            or expected_return < self.predictions_bb.LowerBand.Current.Value)
from AlgorithmImports import *

class QQQETFUniverseSelectionModel(ETFConstituentsUniverseSelectionModel):
    def __init__(self, universe_settings: UniverseSettings = None) -> None:
        symbol = Symbol.Create("QQQ", SecurityType.Equity, Market.USA)
        super().__init__(symbol, universe_settings, self.ETFConstituentsFilter)

    def ETFConstituentsFilter(self, constituents: List[ETFConstituentData]) -> List[Symbol]:
        selected = sorted([c for c in constituents if c.Weight],
            key=lambda c: c.Weight, reverse=True)[:10]
        return [c.Symbol for c in selected]