Overall Statistics
Total Orders
6100
Average Win
0.14%
Average Loss
-0.12%
Compounding Annual Return
-6.222%
Drawdown
31.800%
Expectancy
-0.087
Start Equity
1000000
End Equity
722906.82
Net Profit
-27.709%
Sharpe Ratio
-0.919
Sortino Ratio
-0.563
Probabilistic Sharpe Ratio
0.000%
Loss Rate
59%
Win Rate
41%
Profit-Loss Ratio
1.22
Alpha
-0.064
Beta
0.066
Annual Standard Deviation
0.063
Annual Variance
0.004
Information Ratio
-0.94
Tracking Error
0.159
Treynor Ratio
-0.882
Total Fees
$54325.55
Estimated Strategy Capacity
$22000000.00
Lowest Capacity Asset
SPY R735QTJ8XC9X
Portfolio Turnover
154.25%
#region imports
from AlgorithmImports import *
#endregion


class CustomBollingerBand(PythonIndicator):
    """
    An extension of the BollingerBands indicator where the indicator value is
    (close - middle_band) / (2 * std)
    """
    def __init__(self, period, k):
        """
        Input:
         - period
            Period of BollingerBands indicator
         - k
            k of BollingerBands indicator
        """
        self.bb = BollingerBands(period, k)
        self.time = datetime.min
        self.value = 0
        self.warm_up_period = self.bb.warm_up_period


    def update(self, *args):
        """
        Called each time an indicator should be updated with new data
        
        Input:
         - *args
            (1) IndicatorDataPoint
            (2) Timestamp, Float
        """
        if len(args) == 1: # Called with IndicatorDataPoint
            input = args[0]
            self.bb.update(input.time, input.close)
            self.time = input.end_time
            self.set_value()
            return self.bb.is_ready
        else:              # Called with time and close arguments
            time, close = args[0], args[1]
            self.bb.update(time, close)
            self.set_value()
        
    @property
    def is_ready(self):
        """
        Signals if the indicator is ready
        """
        return self.bb.is_ready
    
    def set_value(self):
        """
        Sets the current value of the indicator
        """
        std = self.bb.standard_deviation.current.value
        if std == 0:
            self.value = 0
        else:
            close = self.bb.current.value
            middle_band = self.bb.middle_band.current.value
            self.value = (close - middle_band) / (2 * std)
        
#region imports
from AlgorithmImports import *
#endregion
from SymbolData import SymbolData


class GradientBoostingAlphaModel(AlphaModel):
    """
    Emits insights in the direction of the prediction made by the Symbol Data objects.
    """
    _symbol_data_by_symbol = {}
    
    def __init__(self, hold_duration = 10):
        """
        Input:
         - hold_duration
            The duration of the insights emitted
        """
        self._hold_duration = hold_duration 
        self._weight = 1
    
    def update(self, algorithm, data):
        """
        Called each time the alpha model receives a new data slice.
        
        Input:
         - algorithm
            Algorithm instance running the backtest
         - data
            A data structure for all of an algorithm's data at a single time step
        
        Returns a list of Insights to the portfolio construction model.
        """
        insights = []
        for symbol, symbol_data in self._symbol_data_by_symbol.items():
            direction = symbol_data.predict_direction()
            if direction:
                hold_duration = timedelta(minutes=self._hold_duration) # Should match universe resolution
                insights.append(Insight.price(symbol, hold_duration, direction, None, None, None, self._weight))

        return insights
        
    def on_securities_changed(self, algorithm, changes):
        """
        Called each time the universe has changed.
        
        Input:
         - algorithm
            Algorithm instance running the backtest
         - changes
            The additions and removals of the algorithm's security subscriptions
        """
        for security in changes.added_securities:
            symbol = security.symbol
            self._symbol_data_by_symbol[symbol] = SymbolData(symbol, algorithm, self._hold_duration)
            
        for security in changes.removed_securities:
            symbol_data = self._symbol_data_by_symbol.pop(security.symbol, None)
            if symbol_data:
                symbol_data.dispose()
        
        self._weight = 1 / len(self._symbol_data_by_symbol)
        
#region imports
from AlgorithmImports import *
#endregion
from CustomBollingerBand import CustomBollingerBand

import lightgbm as lgb
import numpy as np
import pandas as pd

class SymbolData:
    """
    This class holds all of the data for a security. It's responsible for training the
    gradient boosting model and making predictions. 
    """
    
    def __init__(self, symbol, algorithm, hold_duration, k_start=0.5, k_end=5, 
                 k_step=0.25, training_weeks=4, max_depth=1, num_leaves=2, num_trees=20,
                 commission=0.02, spread_cost=0.03):
        """
        Input:
         - symbol
            Represents a unique security identifier
         - algorithm
            Algorithm instance running the backtest
         - hold_duration
            Number of timesteps ahead to predict
         - k_start
            Starting k for indicator parameter loop
         - k_end
            Ending k for indicator parameter loop
         - k_step
            Stepping k for indicator parameter loop
         - training_weeks
            Number of weeks of historical data to train on
         - max_depth
            Maximum depth of the trees built
         - num_leaves
            Number of leaves for each tree
         - num_trees
            Number of trees to build
         - commission
            Commission cost of trading round-trip
         - spread_cost
            Spread cost of trading round-trip
        """
        self._symbol = symbol
        self._algorithm = algorithm
        self._hold_duration = hold_duration
        self._resolution = algorithm.universe_settings.resolution
        self._training_length = int(training_weeks * 5 * 6.5 * 60) # training_weeks in minutes
        self._max_depth = max_depth
        self._num_leaves = num_leaves
        self._num_trees = num_trees
        self._cost = commission + spread_cost
        
        self._indicator_consolidators = []
    
        # Train a model at the end of each month
        self._model = None
        algorithm.train(algorithm.date_rules.month_end(symbol), 
                        algorithm.time_rules.before_market_close(symbol), 
                        self._train)
        
        # Avoid overnight holds
        self._allow_predictions = False
        self._events = [
            algorithm.schedule.on(algorithm.date_rules.every_day(symbol), 
                                  algorithm.time_rules.after_market_open(symbol, 0), 
                                  self._start_predicting),
            algorithm.schedule.on(algorithm.date_rules.every_day(symbol), 
                                  algorithm.time_rules.before_market_close(symbol, hold_duration + 1),
                                  self._stop_predicting)
        ]
        
        self._setup_indicators(k_start, k_end, k_step)
        self._train()
        
        
    def _setup_indicators(self, k_start, k_end, k_step):
        """
        Initializes all the technical indicators and their historical windows.
        
        Input:
         - k_start
            Starting k for indicator parameter loop
         - k_end
            Ending k for indicator parameter loop
         - k_step
            Stepping k for indicator parameter loop
        """
        self._indicators_by_indicator_type = {}
        self._indicators_history_by_indicator_type = {}
        self._max_warm_up_period = 0
        
        for k in np.arange(k_start, k_end + k_step, k_step):
            indicators = {
                'rsi' : RelativeStrengthIndex(int(14*k)),
                'macd': MovingAverageConvergenceDivergence(int(12*k), int(26*k), 9),
                'bb'  : CustomBollingerBand(int(20*k), 2)
            }
            
            for indicator_type, indicator in indicators.items():
                # Register indicators for automatic updates
                consolidator = self._algorithm.resolve_consolidator(self._symbol, self._resolution)
                self._algorithm.register_indicator(self._symbol, indicator, consolidator)
                self._indicator_consolidators.append(consolidator)
                
                # Save reference to indicators
                if indicator_type not in self._indicators_by_indicator_type:
                    self._indicators_by_indicator_type[indicator_type] = []
                    self._indicators_history_by_indicator_type[indicator_type] = []
                self._indicators_by_indicator_type[indicator_type].append(indicator)
                
                # Create empty lookback window for indicator history
                self._indicators_history_by_indicator_type[indicator_type].append(np.array([]))
        
                # Find max warmup period
                self._max_warm_up_period = max(self._max_warm_up_period, indicator.warm_up_period)
                
        self._history_length = self._training_length + self._max_warm_up_period
        
    
    def _reset_state(self):
        """
        Resets all the technical indicators and their histories.
        """
        for indicator_type, indicators_history in self._indicators_history_by_indicator_type.items():
            self._indicators_history_by_indicator_type[indicator_type] = [np.array([]) for _ in range(len(indicators_history))]
            for indicator in self._indicators_by_indicator_type[indicator_type]:
                indicator.reset()
        
        
    def _train(self):
        """
        Trains the gradient boosting model using indicator values as input and 
        future return as output.
        """
        self._reset_state()
        
        # Request history for indicator warm up
        history = self._algorithm.history(self._symbol, self._history_length, self._resolution)
        if history.empty or history.shape[0] < self._history_length:
            self._algorithm.log(f"Not enough history for {self._symbol} to train yet.")
            return
        history = history.loc[self._symbol].close
        
        # Warm up indicators and history of indicators
        for indicator_type, indicators in self._indicators_by_indicator_type.items():
            for idx, indicator in enumerate(indicators):
                warm_up_length = self._training_length + indicator.warm_up_period - 1
                warm_up_data = history.iloc[-warm_up_length:]
                for time, close in warm_up_data.items():
                    # Update indicator
                    indicator.update(time, close)
        
                    # Update indicator history
                    if indicator.is_ready:
                        current_history = self._indicators_history_by_indicator_type[indicator_type][idx]
                        appended = np.append(current_history, indicator.current.value)
                        self._indicators_history_by_indicator_type[indicator_type][idx] = appended
        
        history = history.iloc[self._max_warm_up_period:]
        label = history.shift(-self._hold_duration) - history
        
        
        ##################
        ## Clean Training Data
        ##################
        # Remove last `hold_duration` minutes of each day to avoid overnight holdings
        
        # Get clean indices
        data_points_per_day = [len(g) for _, g in label.groupby(pd.Grouper(freq='D')) if g.shape[0] > 0]
        clean_indices = []
        for i in range(len(data_points_per_day)):
            from_index = 0 if i == 0 else data_points_per_day[i-1]
            to_index = sum(data_points_per_day[:i+1]) - self._hold_duration
            clean_indices.append((from_index, to_index))
        
        # Clean label history
        label = pd.concat([label[from_index:to_index] for from_index, to_index in clean_indices])
        
        # Clean indicator history
        for indicator_type, indicators_history in self._indicators_history_by_indicator_type.items():
            for idx, indicator_history in enumerate(indicators_history):
                clean_indicator = np.concatenate([indicator_history[from_index:to_index] for from_index, to_index in clean_indices])
                self._indicators_history_by_indicator_type[indicator_type][idx] = clean_indicator
        
        
        ##################
        ## Format data for training
        ##################
        data = np.empty(shape=(len(label), 0))
        feature_name = []
        for indicator_type, indicators_history in self._indicators_history_by_indicator_type.items():
            for k_step, indicator_history in enumerate(indicators_history):
                data = np.append(data, indicator_history.reshape(len(indicator_history), 1), axis=1)
                feature_name.append(f"{indicator_type}-{k_step}")
        data_set = lgb.Dataset(data=data, label=label, feature_name=feature_name, free_raw_data=False).construct()
        
        
        ######################
        ## Training
        ######################
        params = {'max_depth' : self._max_depth, 'num_leaves': self._num_leaves, 'seed' : 1234}
        self._model = lgb.train(params, train_set = data_set, num_boost_round = self._num_trees, feature_name = feature_name)
        
        
    def predict_direction(self):
        """
        Predicts the direction of future returns
        """
        if self._model is None or not self._allow_predictions:
            return 0
        
        input_data = [[]]
        for _, indicators in self._indicators_by_indicator_type.items():
            for indicator in indicators:
                input_data[0].append(indicator.current.value)
                
        return_prediction = self._model.predict(input_data)
        if return_prediction > self._cost:
            return 1
        if return_prediction < -self._cost:
            return -1
        return 0
        
        
    def dispose(self):
        """
        Removes the indicator consolidators
        
        Input:
         - remove_events
            Flag to remove scheduled events
        """
        for consolidator in self._indicator_consolidators:
            self._algorithm.subscription_manager.remove_consolidator(self._symbol, consolidator)
        
        for event in self._events:
            self._algorithm.schedule.remove(event)
        
            
    def _start_predicting(self):
        """
        Enable the gradient boosting model to generate predictions
        """
        self._allow_predictions = True
        
        
    def _stop_predicting(self):
        """
        Disable the gradient boosting model from generating predictions
        """
        self._allow_predictions = False
        
#region imports
from AlgorithmImports import *

from GradientBoostingAlphaModel import GradientBoostingAlphaModel
#endregion


class GradientBoostingModelAlgorithm(QCAlgorithm):

    def initialize(self):
        self.set_start_date(2015, 9, 1)
        self.set_end_date(2020, 9, 17)
        
        self.set_cash(1000000)
        
        symbols = [ Symbol.create("SPY", SecurityType.EQUITY, Market.USA) ]
        self.set_universe_selection( ManualUniverseSelectionModel(symbols) )
        self.universe_settings.resolution = Resolution.MINUTE
        
        self.set_alpha(GradientBoostingAlphaModel())
        
        self.set_portfolio_construction(InsightWeightingPortfolioConstructionModel())
        
        self.set_execution(ImmediateExecutionModel())