book
Checkout our new book! Hands on AI Trading with Python, QuantConnect, and AWS Learn More arrow

Machine Learning

Training Models

Introduction

Algorithms usually must process each timeslice within 10 minutes, but the Traintrain method allows you to increase this time to train machine learning models. The length of time you can train depends on your training quotas.

Train Models

To train models immediately, call the Traintrain method and pass in the name of your training method.

# Run an ML model training session and extend method call timeouts.
Train(MyTrainingMethod);
// Run an ML model training session and extend method call timeouts.
self.train(self.my_method)

Immediate training is most useful for training your model when you first deploy your strategy to production or when the model's performance begins to degrade.

Schedule Training Sessions

You can schedule model training sessions in a similar way to a Scheduled Event. To schedule a training session do this, pass in a DateRules and TimeRules argument to the Traintrain method.

// Set TrainingMethod to be executed at 8:00 am every Sunday
Train(DateRules.Every(DayOfWeek.Sunday), TimeRules.At(8, 0), MyTrainingMethod);
# Set TrainingMethod to be executed at 8:00 am every Sunday
self.train(self.date_rules.every(DayOfWeek.SUNDAY), self.time_rules.at(8,0), self.my_training_method)

We recommend you schedule your training sessions for when the market is closed to get the best compute allocation. While the market is open, your CPU is occupied with processing incoming tick data and handling other LEAN events.

Training Quotas

Training resources are allocated with a leaky bucket algorithm where you can use a maximum of n-minutes in a single training session and the number of available minutes refills over time. This design gives you burst allocations when you need them and recharges the allowance to prepare for the next training.

Cloud Quotas

If you execute algorithms in QuantConnect Cloud, see Training Quotas for more information about the training quotas.

Local Quotas

If you execute algorithms locally, the following table shows the default settings for the leaky bucket algorithm:

SettingValue
Capacity (minutes)
120
Time interval (minutes)1440
Refill amount (minutes per time interval)
Capacity / 7

To allow virtually unlimited training for local algorithms, add the following key-value pairs to your Lean / Launcher / config.json file:

"scheduled-event-leaky-bucket-capacity" : 99999999,
"scheduled-event-leaky-bucket-time-interval-minutes" : 1,
"scheduled-event-leaky-bucket-refill-amount": 999999,

Check Model Readiness

In backtests, the Traintrain method is synchronous, so it blocks your algorithm execution while the model trains. In live trading, the Traintrain method is asynchronous, so ensure your model is trained before you continue the algorithm execution. Training occurs on a separate thread, so set a boolean flag to notify your algorithm of the model state. A semaphore is a thread-safe flag you can use to synchronize program operations across different threads.

# Example of using a flag to signal the model is ready to use:
class SemaphoreTrainingAlgorithm(QCAlgorithm):

    model = None  # Model object
    model_is_training = False  # Model state flag

    def initialize(self) -> None: 
        self.train(self.my_training_method)
    
    def my_training_method(self) -> None: 
        self.model_is_training = True
        # Train the model here.
        self.model_is_training = False
    
    def on_data(self, slice: Slice) -> None: 
        # Don't use the model while it is training.
        if self.model_is_training:
            return
        
        # Once training is complete, use the model.
        result = self.model.predict()
// Example of using a flag to signal the model is ready to use:
public class SemaphoreTrainingAlgorithm : QCAlgorithm
{
    // Initialize the model.
    private MachineLearningModel _model;
    // Initialize the training state flag.
    private bool _modelIsTraining;

    public override void Initialize()
    {
        Train(MyTrainingMethod);
    }

    private void MyTrainingMethod()
    {
        _modelIsTraining = true;
        // Train the model here.
        _modelIsTraining = false;
    }

    public override void OnData(Slice slice)
    {
        // Don't use the model while it is training.
        if (_modelIsTraining)
        {
            return;
        }
        // Once training is complete, use the model.
        var result = _model.Predict();
    }
}

Examples

The following examples demonstrate some common practices for training the machine learning model.

Example 1: Scikit-Learn

The below algorithm makes use of Scikit-Learn library to predict the future price movement using the previous 5 OHLCV data. The model is trained using rolling 2-year data. To ensure the model applicable to the current market environment, we recalibrate the model on every Sunday.

from sklearn.svm import SVR
from sklearn.model_selection import GridSearchCV
import joblib

class ScikitLearnExampleAlgorithm(QCAlgorithm):
    def initialize(self) -> None:
        self.set_start_date(2022, 7, 4)
        self.set_cash(100000)
        # Request SPY data for model training, prediction and trading.
        self.symbol = self.add_equity("SPY", Resolution.DAILY).symbol

        # 2-year data to train the model.
        training_length = 252*2
        self.training_data = RollingWindow[TradeBar](training_length)
        # Warm up the training dataset to train the model immediately.
        history = self.history[TradeBar](self.symbol, training_length, Resolution.DAILY)
        for trade_bar in history:
            self.training_data.add(trade_bar)

        # Retrieve already trained model from object store to use immediately.
        if self.object_store.contains_key("sklearn_model"):
            file_name = self.object_store.get_file_path("sklearn_model")
            self.model = joblib.load(file_name)
        # Otherwise, grid serach with hyperparameter choices to create an optimal support vector regressor to predict price movement.
        else:
            param_grid = {'C': [.05, .1, .5, 1, 5, 10], 
                          'epsilon': [0.001, 0.005, 0.01, 0.05, 0.1], 
                          'gamma': ['auto', 'scale']}
            self.model = GridSearchCV(SVR(), param_grid, scoring='neg_mean_squared_error', cv=5)

        # Train the model to use the prediction right away.
        self.train(self.my_training_method)
        # Recalibrate the model weekly to ensure its accuracy on the updated domain.
        self.train(self.date_rules.every(DayOfWeek.SUNDAY), self.time_rules.at(8,0), self.my_training_method)
        
    def get_features_and_labels(self, n_steps=5) -> None:
        # Train and predict the return data, which is more normalized and stationary.
        training_df = self.pandas_converter.get_data_frame[TradeBar](list(self.training_data)[::-1])
        daily_pct_change = training_df.pct_change().dropna()

        # Stack the data for 5-day OHLCV data per each sample to train with.
        features = []
        labels = []
        for i in range(len(daily_pct_change)-n_steps):
            features.append(daily_pct_change.iloc[i:i+n_steps].values.flatten())
            labels.append(daily_pct_change['close'].iloc[i+n_steps])
        features = np.array(features)
        labels = np.array(labels)

        return features, labels

    def my_training_method(self) -> None:
        # Prepare the processed training data.
        features, labels = self.get_features_and_labels()
        # Recalibrate the model based on updated data.
        if isinstance(self.model, GridSearchCV):
            self.model = self.model.fit(features, labels).best_estimator_
        else:
            self.model = self.model.fit(features, labels)

    def on_data(self, slice: Slice) -> None:
        if self.symbol in slice.bars:
            self.training_data.add(slice.bars[self.symbol])

        # Get prediction by the updated features.
        features, _ = self.get_features_and_labels()
        prediction = self.model.predict(features[-1].reshape(1, -1))
        prediction = float(prediction)

        # If the predicted direction is going upward, buy SPY.
        if prediction > 0:
            self.set_holdings(self.symbol, 1)
        # If the predicted direction is going downward, sell SPY.
        elif prediction < 0:            
            self.set_holdings(self.symbol, -1)

    def on_end_of_algorithm(self) -> None:
        # Store the model to object store to retrieve it in other instances in case the algorithm stops.
        model_key = "sklearn_model"
        file_name = self.object_store.get_file_path(model_key)
        joblib.dump(self.model, file_name)
        self.object_store.save(model_key)

Other Examples

For more examples, see the following algorithms:

You can also see our Videos. You can also get in touch with us via Discord.

Did you find this page helpful?

Contribute to the documentation: