Machine Learning
Training Models
Introduction
Algorithms usually must process each timeslice within 10 minutes, but the Train
train
method allows you to increase this time to train machine learning models. The length of time you can train depends on your training quotas.
Train Models
To train models immediately, call the Train
train
method and pass in the name of your training method.
# Run an ML model training session and extend method call timeouts. Train(MyTrainingMethod);
// Run an ML model training session and extend method call timeouts. self.train(self.my_method)
Immediate training is most useful for training your model when you first deploy your strategy to production or when the model's performance begins to degrade.
Schedule Training Sessions
You can schedule model training sessions in a similar way to a Scheduled Event. To schedule a training session do this, pass in a DateRules and TimeRules argument to the Train
train
method.
// Set TrainingMethod to be executed at 8:00 am every Sunday Train(DateRules.Every(DayOfWeek.Sunday), TimeRules.At(8, 0), MyTrainingMethod);
# Set TrainingMethod to be executed at 8:00 am every Sunday self.train(self.date_rules.every(DayOfWeek.SUNDAY), self.time_rules.at(8,0), self.my_training_method)
We recommend you schedule your training sessions for when the market is closed to get the best compute allocation. While the market is open, your CPU is occupied with processing incoming tick data and handling other LEAN events.
Training Quotas
Training resources are allocated with a leaky bucket algorithm where you can use a maximum of n-minutes in a single training session and the number of available minutes refills over time. This design gives you burst allocations when you need them and recharges the allowance to prepare for the next training.
Cloud Quotas
If you execute algorithms in QuantConnect Cloud, see Training Quotas for more information about the training quotas.
Local Quotas
If you execute algorithms locally, the following table shows the default settings for the leaky bucket algorithm:
Setting | Value |
---|---|
Capacity (minutes) | 120 |
Time interval (minutes) | 1440 |
Refill amount (minutes per time interval) | Capacity / 7 |
To allow virtually unlimited training for local algorithms, add the following key-value pairs to your Lean / Launcher / config.json file:
"scheduled-event-leaky-bucket-capacity" : 99999999, "scheduled-event-leaky-bucket-time-interval-minutes" : 1, "scheduled-event-leaky-bucket-refill-amount": 999999,
Check Model Readiness
In backtests, the Train
train
method is synchronous, so it blocks your algorithm execution while the model trains. In live trading, the Train
train
method is asynchronous, so ensure your model is trained before you continue the algorithm execution. Training occurs on a separate thread, so set a boolean flag to notify your algorithm of the model state. A semaphore is a thread-safe flag you can use to synchronize program operations across different threads.
# Example of using a flag to signal the model is ready to use: class SemaphoreTrainingAlgorithm(QCAlgorithm): model = None # Model object model_is_training = False # Model state flag def initialize(self) -> None: self.train(self.my_training_method) def my_training_method(self) -> None: self.model_is_training = True # Train the model here. self.model_is_training = False def on_data(self, slice: Slice) -> None: # Don't use the model while it is training. if self.model_is_training: return # Once training is complete, use the model. result = self.model.predict()
// Example of using a flag to signal the model is ready to use: public class SemaphoreTrainingAlgorithm : QCAlgorithm { // Initialize the model. private MachineLearningModel _model; // Initialize the training state flag. private bool _modelIsTraining; public override void Initialize() { Train(MyTrainingMethod); } private void MyTrainingMethod() { _modelIsTraining = true; // Train the model here. _modelIsTraining = false; } public override void OnData(Slice slice) { // Don't use the model while it is training. if (_modelIsTraining) { return; } // Once training is complete, use the model. var result = _model.Predict(); } }
Examples
The following examples demonstrate some common practices for training the machine learning model.
Example 1: Scikit-Learn
The below algorithm makes use of Scikit-Learn
library to predict the future price movement using the previous 5 OHLCV data. The model is trained using rolling 2-year data. To ensure the model applicable to the current market environment, we recalibrate the model on every Sunday.
from sklearn.svm import SVR from sklearn.model_selection import GridSearchCV import joblib class ScikitLearnExampleAlgorithm(QCAlgorithm): def initialize(self) -> None: self.set_start_date(2022, 7, 4) self.set_cash(100000) # Request SPY data for model training, prediction and trading. self.symbol = self.add_equity("SPY", Resolution.DAILY).symbol # 2-year data to train the model. training_length = 252*2 self.training_data = RollingWindow[TradeBar](training_length) # Warm up the training dataset to train the model immediately. history = self.history[TradeBar](self.symbol, training_length, Resolution.DAILY) for trade_bar in history: self.training_data.add(trade_bar) # Retrieve already trained model from object store to use immediately. if self.object_store.contains_key("sklearn_model"): file_name = self.object_store.get_file_path("sklearn_model") self.model = joblib.load(file_name) # Otherwise, grid serach with hyperparameter choices to create an optimal support vector regressor to predict price movement. else: param_grid = {'C': [.05, .1, .5, 1, 5, 10], 'epsilon': [0.001, 0.005, 0.01, 0.05, 0.1], 'gamma': ['auto', 'scale']} self.model = GridSearchCV(SVR(), param_grid, scoring='neg_mean_squared_error', cv=5) # Train the model to use the prediction right away. self.train(self.my_training_method) # Recalibrate the model weekly to ensure its accuracy on the updated domain. self.train(self.date_rules.every(DayOfWeek.SUNDAY), self.time_rules.at(8,0), self.my_training_method) def get_features_and_labels(self, n_steps=5) -> None: # Train and predict the return data, which is more normalized and stationary. training_df = self.pandas_converter.get_data_frame[TradeBar](list(self.training_data)[::-1]) daily_pct_change = training_df.pct_change().dropna() # Stack the data for 5-day OHLCV data per each sample to train with. features = [] labels = [] for i in range(len(daily_pct_change)-n_steps): features.append(daily_pct_change.iloc[i:i+n_steps].values.flatten()) labels.append(daily_pct_change['close'].iloc[i+n_steps]) features = np.array(features) labels = np.array(labels) return features, labels def my_training_method(self) -> None: # Prepare the processed training data. features, labels = self.get_features_and_labels() # Recalibrate the model based on updated data. if isinstance(self.model, GridSearchCV): self.model = self.model.fit(features, labels).best_estimator_ else: self.model = self.model.fit(features, labels) def on_data(self, slice: Slice) -> None: if self.symbol in slice.bars: self.training_data.add(slice.bars[self.symbol]) # Get prediction by the updated features. features, _ = self.get_features_and_labels() prediction = self.model.predict(features[-1].reshape(1, -1)) prediction = float(prediction) # If the predicted direction is going upward, buy SPY. if prediction > 0: self.set_holdings(self.symbol, 1) # If the predicted direction is going downward, sell SPY. elif prediction < 0: self.set_holdings(self.symbol, -1) def on_end_of_algorithm(self) -> None: # Store the model to object store to retrieve it in other instances in case the algorithm stops. model_key = "sklearn_model" file_name = self.object_store.get_file_path(model_key) joblib.dump(self.model, file_name) self.object_store.save(model_key)
Other Examples
For more examples, see the following algorithms: