Popular Models
Chronos-T5
Introduction
This page explains how to use Chronos-T5 in LEAN trading algorithms. The model repository provides the following description:
Chronos is a family of pretrained time series forecasting models based on language model architectures. A time series is transformed into a sequence of tokens via scaling and quantization, and a language model is trained on these tokens using the cross-entropy loss. Once trained, probabilistic forecasts are obtained by sampling multiple future trajectories given the historical context. Chronos models have been trained on a large corpus of publicly available time series data, as well as synthetic data generated using Gaussian processes.
For details on Chronos models, training data and procedures, and experimental results, please refer to the paper Chronos: Learning the Language of Time Series.
Use Cases
The Chronos-T5 model is a time series forecasting model. The following use cases explain how you might utilize it in trading algorithms:
- Forecast the future equity curves for a set of assets, then pass them to an optimizer to determine the weights that maximize the future Sharpe ratio of the portfolio.
- Forecast the future volatility of an asset to manage risk and opimize portfolio allocations.
- Train the model to forecast the impact of specific events on an asset and then adjust your holdings in response to the event. For example, you could train the model to forecast the impact of news, corporate actions, or financial reports.
Load Pre-Trained Model
Follow these steps to load the pre-trained Chronos-T5 model:
- Add the following imports to the top of your code file:
- Call the
ChronosPipeline.from_pretrained
method with the model path. - (Optional) Set the seed to enable reproducibility.
from chronos import ChronosPipeline import torch
In QuantConnect Cloud, the path to the tiny model is amazon / chronos-t5-tiny.
# Load the ChronosPipeline model. self._pipeline = ChronosPipeline.from_pretrained( "amazon/chronos-t5-tiny", device_map="cuda" if torch.cuda.is_available() else "cpu", torch_dtype=torch.bfloat16, )
# Set the seed. from transformers import set_seed set_seed(1, True)
Fine-Tune Model
The Chronos-T5 model is pre-trained, so you don't need to fine-tune it. Fine-tuning the model just tailors it to your specific use case. Follow these steps to fine-tune it:
- Add the following imports to the top of your code file:
- In the
initialize
method, define the model and some of its settings. - Define the fine-tuning method.
- Create DataFrame(s) that contain your training samples.
- Call the fine-tuning method with the training data.
- Load the fine-tuned model.
import torch from ast import literal_eval from pathlib import Path from functools import partial from transformers import Trainer, TrainingArguments, set_seed from gluonts.dataset.pandas import PandasDataset from gluonts.itertools import Filter from chronos import ChronosConfig, ChronosPipeline from chronos.scripts.training.train import ChronosDataset, has_enough_observations, load_model from chronos.scripts.training import train from logging import getLogger, INFO
In QuantConnect Cloud, the path to the tiny model is amazon/chronos-t5-tiny.
def initialize(self): # Configure model parameters to optimize performance. self._prediction_length = 3*21 # Three months of trading days self._device_map = "cuda" if torch.cuda.is_available() else "cpu" self._optimizer = 'adamw_torch_fused' if torch.cuda.is_available() else 'adamw_torch' self._model_name = "amazon/chronos-t5-tiny" self._model_path = self.object_store.get_file_path( f"llm/fine-tune/{self._model_name.replace('/', '-')}/" )
# Configure the fine-tuning method and initiate model training. def _train_chronos( self, training_data, probability: Optional[str] = None, context_length: int = 512, prediction_length: int = 64, min_past: int = 64, max_steps: int = 200_000, save_steps: int = 50_000, log_steps: int = 500, per_device_train_batch_size: int = 32, learning_rate: float = 1e-3, optim: str = "adamw_torch_fused", shuffle_buffer_length: int = 100, gradient_accumulation_steps: int = 2, model_id: str = "google/t5-efficient-tiny", model_type: str = "seq2seq", random_init: bool = False, tie_embeddings: bool = False, output_dir: str = "./output/", tf32: bool = True, torch_compile: bool = True, tokenizer_class: str = "MeanScaleUniformBins", tokenizer_kwargs: str = "{'low_limit': -15.0, 'high_limit': 15.0}", n_tokens: int = 4096, n_special_tokens: int = 2, pad_token_id: int = 0, eos_token_id: int = 1, use_eos_token: bool = True, lr_scheduler_type: str = "linear", warmup_ratio: float = 0.0, dataloader_num_workers: int = 1, max_missing_prop: float = 0.9, num_samples: int = 20, temperature: float = 1.0, top_k: int = 50, top_p: float = 1.0): # Set up logging for the train object. train.logger = getLogger() train.logger.setLevel(INFO) # Ensure output_dir is a Path object. output_dir = Path(output_dir) # Convert probability from string to a list, or set default if # None. if isinstance(probability, str): probability = literal_eval(probability) elif probability is None: probability = [1.0 / len(training_data)] * len(training_data) # Convert tokenizer_kwargs from string to a dictionary. if isinstance(tokenizer_kwargs, str): tokenizer_kwargs = literal_eval(tokenizer_kwargs) # Enable reproducibility. set_seed(1, True) # Create datasets for training, filtered by criteria. train_datasets = [ Filter( partial( has_enough_observations, min_length=min_past + prediction_length, max_missing_prop=max_missing_prop, ), PandasDataset(data_frame, freq="D"), ) for data_frame in training_data ] # Load the model with the specified configuration. model = load_model( model_id=model_id, model_type=model_type, vocab_size=n_tokens, random_init=random_init, tie_embeddings=tie_embeddings, pad_token_id=pad_token_id, eos_token_id=eos_token_id, ) # Define the configuration for the Chronos # tokenizer and other settings. chronos_config = ChronosConfig( tokenizer_class=tokenizer_class, tokenizer_kwargs=tokenizer_kwargs, n_tokens=n_tokens, n_special_tokens=n_special_tokens, pad_token_id=pad_token_id, eos_token_id=eos_token_id, use_eos_token=use_eos_token, model_type=model_type, context_length=context_length, prediction_length=prediction_length, num_samples=num_samples, temperature=temperature, top_k=top_k, top_p=top_p, ) # Add extra items to model config so that # it's saved in the ckpt. model.config.chronos_config = chronos_config.__dict__ # Create a shuffled training dataset with the # specified parameters. shuffled_train_dataset = ChronosDataset( datasets=train_datasets, probabilities=probability, tokenizer=chronos_config.create_tokenizer(), context_length=context_length, prediction_length=prediction_length, min_past=min_past, mode="training", ).shuffle(shuffle_buffer_length=shuffle_buffer_length) # Define the training arguments. training_args = TrainingArguments( output_dir=str(output_dir), per_device_train_batch_size=per_device_train_batch_size, learning_rate=learning_rate, lr_scheduler_type=lr_scheduler_type, warmup_ratio=warmup_ratio, optim=optim, logging_dir=str(output_dir / "train-logs"), logging_strategy="steps", logging_steps=log_steps, save_strategy="steps", save_steps=save_steps, report_to=["tensorboard"], max_steps=max_steps, gradient_accumulation_steps=gradient_accumulation_steps, dataloader_num_workers=dataloader_num_workers, tf32=tf32, # remove this if not using Ampere GPUs (e.g., A100) torch_compile=torch_compile, ddp_find_unused_parameters=False, remove_unused_columns=False, ) # Create a Trainer instance for training the model. trainer = Trainer( model=model, args=training_args, train_dataset=shuffled_train_dataset, ) # Start the training process. trainer.train() # Save the trained model to the output directory. model.save_pretrained(output_dir) # Return the path to the output directory. return output_dir
Each DataFrame should have one column, named "target". The rows throughout the DataFrame should span a consistent time. If there is no data for the time step (for example, the weekend price of an Equity asset), put NaN as the value for the time step. The following example demonstrates how to create DataFrames that contain one year of trailing prices for a set of Equities:
# Get historical equity curves. history = self.history(symbols, timedelta(365), Resolution.DAILY)['close'].unstack(0) # Gather the training data. training_data_by_symbol = {} for symbol in symbols: df = history[[symbol]].dropna() if df.shape[0] < 10: # Skip this asset if there is very little data continue adjusted_df = df.reset_index()[['time', symbol]] adjusted_df = adjusted_df.rename(columns={str(symbol.id): 'target'}) adjusted_df['time'] = pd.to_datetime(adjusted_df['time']) adjusted_df.set_index('time', inplace=True) adjusted_df = adjusted_df.resample('D').asfreq() training_data_by_symbol[symbol] = adjusted_df
For more information about history requests, see History Requests.
# Train the model with set parameters and save to the output directory. output_dir_path = self._train_chronos( list(training_data_by_symbol.values()), context_length=int(252/2), # 6 months prediction_length=self._prediction_length, optim=self._optimizer, model_id=self._model_name, output_dir=self._model_path, learning_rate=1e-5, # Requires Ampere GPUs (e.g., A100) tf32=False, max_steps=3 )
# Load the trained model from the output directory. pipeline = ChronosPipeline.from_pretrained( output_dir_path, device_map=self._device_map, torch_dtype=torch.bfloat16, )
Forecast Time Series
- Get some historical data.
- Load the pre-trained or fine-tuned model.
- Forecast the future time series.
- Aggregate the future prices paths of each asset into a single price path for each asset.
# Get historical closing price. history = self.history(symbols, timedelta(365), Resolution.DAILY)['close'].unstack(0)
# Generate forecasts for symbols using the trained pipeline. all_forecasts = pipeline.predict( [ torch.tensor(history[symbol].dropna()) for symbol in symbols ], self._prediction_length )
In this example, the model returns several future price paths for each asset.
For example, take the median value of each time step in the future price paths.
# Create a DataFrame with median forecasts for each symbol. forecasts_df = pd.DataFrame( { symbol: np.quantile( all_forecasts[i].numpy(), 0.5, axis=0 # 0.5 = median ) for i, symbol in enumerate(symbols) } )
Examples
The following algorithm selects the most liquid assets at the beginning of each month. Once a quarter, it gets the trailing year of prices for all the assets in the universe and then forecasts the prices paths of all the assets over the upcoming quarter. It then uses the SciPy package to find the weights that maximize the future Sharpe ratio of the portfolio and rebalances the portfolio to those weights.
The following algorithm expands the preceding algorithm by fine-tuning the model before each forecast: