Popular Models

Chronos-T5

Introduction

This page explains how to use Chronos-T5 in LEAN trading algorithms. The model repository provides the following description:

Chronos is a family of pretrained time series forecasting models based on language model architectures. A time series is transformed into a sequence of tokens via scaling and quantization, and a language model is trained on these tokens using the cross-entropy loss. Once trained, probabilistic forecasts are obtained by sampling multiple future trajectories given the historical context. Chronos models have been trained on a large corpus of publicly available time series data, as well as synthetic data generated using Gaussian processes.

For details on Chronos models, training data and procedures, and experimental results, please refer to the paper Chronos: Learning the Language of Time Series.

Use Cases

The Chronos-T5 model is a time series forecasting model. The following use cases explain how you might utilize it in trading algorithms:

  1. Forecast the future equity curves for a set of assets, then pass them to an optimizer to determine the weights that maximize the future Sharpe ratio of the portfolio.
  2. Forecast the future volatility of an asset to manage risk and opimize portfolio allocations.
  3. Train the model to forecast the impact of specific events on an asset and then adjust your holdings in response to the event. For example, you could train the model to forecast the impact of news, corporate actions, or financial reports.

Load Pre-Trained Model

Follow these steps to load the pre-trained Chronos-T5 model:

  1. Add the following imports to the top of your code file:
  2. from chronos import ChronosPipeline
    import torch
  3. Call the ChronosPipeline.from_pretrained method with the model path.
  4. In QuantConnect Cloud, the path to the tiny model is amazon / chronos-t5-tiny.

    # Load the ChronosPipeline model.
    self._pipeline = ChronosPipeline.from_pretrained(
        "amazon/chronos-t5-tiny",
        device_map="cuda" if torch.cuda.is_available() else "cpu",
        torch_dtype=torch.bfloat16,
    )
  5. (Optional) Set the seed to enable reproducibility.
  6. # Set the seed.
    from transformers import set_seed
    set_seed(1, True)

Fine-Tune Model

The Chronos-T5 model is pre-trained, so you don't need to fine-tune it. Fine-tuning the model just tailors it to your specific use case. Follow these steps to fine-tune it:

  1. Add the following imports to the top of your code file:
  2. import torch
    from ast import literal_eval
    from pathlib import Path
    from functools import partial
    from transformers import Trainer, TrainingArguments, set_seed 
    from gluonts.dataset.pandas import PandasDataset
    from gluonts.itertools import Filter
    from chronos import ChronosConfig, ChronosPipeline
    from chronos.scripts.training.train import ChronosDataset, has_enough_observations, load_model
    from chronos.scripts.training import train
    from logging import getLogger, INFO
  3. In the initialize method, define the model and some of its settings.
  4. In QuantConnect Cloud, the path to the tiny model is amazon/chronos-t5-tiny.

    def initialize(self):
        # Configure the model parameters to optimize performance.
        self._prediction_length = 3*21  # Three months of trading days
        self._device_map = "cuda" if torch.cuda.is_available() else "cpu"
        self._optimizer = 'adamw_torch_fused' if torch.cuda.is_available() else 'adamw_torch'
        self._model_name = "amazon/chronos-t5-tiny"
        self._model_path = self.object_store.get_file_path(
            f"llm/fine-tune/{self._model_name.replace('/', '-')}/"
        )
  5. Define the fine-tuning method.
  6. # Configure the fine-tuning method and initiate the training.
    def _train_chronos(
            self, training_data,
            probability: Optional[str] = None,
            context_length: int = 512,
            prediction_length: int = 64,
            min_past: int = 64,
            max_steps: int = 200_000,
            save_steps: int = 50_000,
            log_steps: int = 500,
            per_device_train_batch_size: int = 32,
            learning_rate: float = 1e-3,
            optim: str = "adamw_torch_fused",
            shuffle_buffer_length: int = 100,
            gradient_accumulation_steps: int = 2,
            model_id: str = "google/t5-efficient-tiny",
            model_type: str = "seq2seq",
            random_init: bool = False,
            tie_embeddings: bool = False,
            output_dir: str = "./output/",
            tf32: bool = True,
            torch_compile: bool = True,
            tokenizer_class: str = "MeanScaleUniformBins",
            tokenizer_kwargs: str = "{'low_limit': -15.0, 'high_limit': 15.0}",
            n_tokens: int = 4096,
            n_special_tokens: int = 2,
            pad_token_id: int = 0,
            eos_token_id: int = 1,
            use_eos_token: bool = True,
            lr_scheduler_type: str = "linear",
            warmup_ratio: float = 0.0,
            dataloader_num_workers: int = 1,
            max_missing_prop: float = 0.9,
            num_samples: int = 20,
            temperature: float = 1.0,
            top_k: int = 50,
            top_p: float = 1.0):
    
        # Set up logging for the train object.
        train.logger = getLogger()
        train.logger.setLevel(INFO)
        # Ensure the output_dir is a Path object.
        output_dir = Path(output_dir)
        # Convert probability from a string to a list, or set default if None.
        if isinstance(probability, str):
            probability = literal_eval(probability)
        elif probability is None:
            probability = [1.0 / len(training_data)] * len(training_data)
        # Convert the tokenizer_kwargs from a string to a dictionary.
        if isinstance(tokenizer_kwargs, str):
            tokenizer_kwargs = literal_eval(tokenizer_kwargs)
        # Enable reproducibility.
        set_seed(1, True)
        # Create datasets for training, filtered by criteria.
        train_datasets = [
            Filter(
                partial(
                    has_enough_observations,
                    min_length=min_past + prediction_length,
                    max_missing_prop=max_missing_prop,
                ),
                PandasDataset(data_frame, freq="D"),
            )
            for data_frame in training_data
        ]
        # Load the model with the specified configuration.
        model = load_model(
            model_id=model_id,
            model_type=model_type,
            vocab_size=n_tokens,
            random_init=random_init,
            tie_embeddings=tie_embeddings,
            pad_token_id=pad_token_id,
            eos_token_id=eos_token_id,
        )
        # Define the configuration for the Chronos tokenizer and other settings.
        chronos_config = ChronosConfig(
            tokenizer_class=tokenizer_class,
            tokenizer_kwargs=tokenizer_kwargs,
            n_tokens=n_tokens,
            n_special_tokens=n_special_tokens,
            pad_token_id=pad_token_id,
            eos_token_id=eos_token_id,
            use_eos_token=use_eos_token,
            model_type=model_type,
            context_length=context_length,
            prediction_length=prediction_length,
            num_samples=num_samples,
            temperature=temperature,
            top_k=top_k,
            top_p=top_p,
        )
    
        # Add extra items to model config so that it's saved in the ckpt.
        model.config.chronos_config = chronos_config.__dict__
        # Create a shuffled training dataset with the specified parameters.
        shuffled_train_dataset = ChronosDataset(
            datasets=train_datasets,
            probabilities=probability,
            tokenizer=chronos_config.create_tokenizer(),
            context_length=context_length,
            prediction_length=prediction_length,
            min_past=min_past,
            mode="training",
        ).shuffle(shuffle_buffer_length=shuffle_buffer_length)
    
        # Define the training arguments.
        training_args = TrainingArguments(
            output_dir=str(output_dir),
            per_device_train_batch_size=per_device_train_batch_size,
            learning_rate=learning_rate,
            lr_scheduler_type=lr_scheduler_type,
            warmup_ratio=warmup_ratio,
            optim=optim,
            logging_dir=str(output_dir / "train-logs"),
            logging_strategy="steps",
            logging_steps=log_steps,
            save_strategy="steps",
            save_steps=save_steps,
            report_to=["tensorboard"],
            max_steps=max_steps,
            gradient_accumulation_steps=gradient_accumulation_steps,
            dataloader_num_workers=dataloader_num_workers,
            tf32=tf32,  # remove this if not using Ampere GPUs (e.g., A100)
            torch_compile=torch_compile,
            ddp_find_unused_parameters=False,
            remove_unused_columns=False,
        )
    
        # Create a Trainer instance for training the model.
        trainer = Trainer(
            model=model,
            args=training_args,
            train_dataset=shuffled_train_dataset,
        )
        # Start the training process.
        trainer.train()
        # Save the trained model to the output directory.
        model.save_pretrained(output_dir)
        # Return the path to the output directory.
        return output_dir
    
  7. Create DataFrame(s) that contain your training samples.
  8. Each DataFrame should have one column, named "target". The rows throughout the DataFrame should span a consistent time. If there is no data for the time step (for example, the weekend price of an Equity asset), put NaN as the value for the time step. The following example demonstrates how to create DataFrames that contain one year of trailing prices for a set of Equities:

    # Get historical equity curves.
    history = self.history(symbols, timedelta(365), Resolution.DAILY)['close'].unstack(0)
    
    # Gather the training data.
    training_data_by_symbol = {}
    for symbol in symbols:
        df = history[[symbol]].dropna()
        if df.shape[0] < 10: # Skip this asset if there is very little data.
            continue
        adjusted_df = df.reset_index()[['time', symbol]]
        adjusted_df = adjusted_df.rename(columns={str(symbol.id): 'target'})
        adjusted_df['time'] = pd.to_datetime(adjusted_df['time'])
        adjusted_df.set_index('time', inplace=True)
        adjusted_df = adjusted_df.resample('D').asfreq()
        training_data_by_symbol[symbol] = adjusted_df

    For more information about history requests, see History Requests.

  9. Call the fine-tuning method with the training data.
  10. # Train the model with set parameters and save it to the output directory.
    output_dir_path = self._train_chronos(
        list(training_data_by_symbol.values()),
        context_length=int(252/2), # 6 months
        prediction_length=self._prediction_length,
        optim=self._optimizer,
        model_id=self._model_name,
        output_dir=self._model_path,
        learning_rate=1e-5,
        # Requires Ampere GPUs (e.g., A100)
        tf32=False,
        max_steps=3
    )
  11. Load the fine-tuned model.
  12. # Load the trained model from the output directory.
    pipeline = ChronosPipeline.from_pretrained(
        output_dir_path,
        device_map=self._device_map,
        torch_dtype=torch.bfloat16,
    )

Forecast Time Series

  1. Get some historical data.
  2. # Get historical closing prices.
    history = self.history(symbols, timedelta(365), Resolution.DAILY)['close'].unstack(0)
  3. Load the pre-trained or fine-tuned model.
  4. Forecast the future time series.
  5. # Generate forecasts for the assets using the trained pipeline.
    all_forecasts = pipeline.predict(
        [
            torch.tensor(history[symbol].dropna())
            for symbol in symbols
        ], 
        self._prediction_length
    )

    In this example, the model returns several future price paths for each asset.

  6. Aggregate the future prices paths of each asset into a single price path for each asset.
  7. For example, take the median value of each time step in the future price paths.

    # Create a DataFrame with median forecasts for each asset.
    forecasts_df = pd.DataFrame(
        {
            symbol: np.quantile(
                all_forecasts[i].numpy(), 0.5, axis=0   # 0.5 = median
            )
            for i, symbol in enumerate(symbols)
        }
    )

Examples

The following algorithm selects the most liquid assets at the beginning of each month. Once a quarter, it gets the trailing year of prices for all the assets in the universe and then forecasts the prices paths of all the assets over the upcoming quarter. It then uses the SciPy package to find the weights that maximize the future Sharpe ratio of the portfolio and rebalances the portfolio to those weights.


The following algorithm expands the preceding algorithm by fine-tuning the model before each forecast:

You can also see our Videos. You can also get in touch with us via Discord.

Did you find this page helpful?

Contribute to the documentation: