book
Checkout our new book! Hands on AI Trading with Python, QuantConnect, and AWS Learn More arrow

Universes

Custom Universes

Introduction

A custom universe lets you select a basket of assets from a custom dataset.

Data Sources

You can gather your custom data from any of the following sources:

The data source should serve data in chronological order and each data point should have a unique timestamp. Each request has a 1 second overhead, so bundle samples together for fast execution.

Define Custom Universe Types

Custom universes should extend the PythonData class. Extensions of the PythonData class must implement a get_source and reader method.

The get_source method in your custom data class instructs LEAN where to find the data. This method must return a SubscriptionDataSource object, which contains the data location and format (SubscriptionTransportMedium). You can even change source locations for backtesting and live modes. We support many different data sources.

The reader method of your custom data class takes one line of data from the source location and parses it into one of your custom objects. You can add as many properties to your custom data objects as you need, but must set symbol and end_time properties. When there is no useable data in a line, the method should return None. LEAN repeatedly calls the reader method until the date/time advances or it reaches the end of the file.

Select Language:
# Example custom universe data; it is virtually identical to other custom data types.
class MyCustomUniverseDataClass(PythonData):

    def get_source(self, config: SubscriptionDataConfig, date: datetime, is_live_mode: bool) -> SubscriptionDataSource:
        return SubscriptionDataSource(@"your-remote-universe-data", SubscriptionTransportMedium.REMOTE_FILE)

    def reader(self, config: SubscriptionDataConfig, line: str, date: datetime, is_live_mode: bool) -> BaseData:
        items = line.split(",")
    
        # Generate required data, then return an instance of your class.
        data = MyCustomUniverseDataClass()
        data.end_time = datetime.strptime(items[0], "%Y-%m-%d")
        # define Time as exactly 1 day earlier Time
        data.time = data.end_time - timedelta(1)
        data.symbol = Symbol.create(items[1], SecurityType.CRYPTO, Market.BITFINEX)
        data["CustomAttribute1"] = int(items[2])
        data["CustomAttribute2"] = float(items[3])
        return data

Your reader method should return objects in chronological order. If an object has a timestamp that is the same or earlier than the timestamp of the previous object, LEAN ignores it.

If you need to create multiple objects in your reader method from a single line, follow these steps:

  1. In the get_source method, pass FileFormat.UNFOLDING_COLLECTION as the third argument to the SubscriptionDataSource constructor.
  2. In the reader method, order the objects by their timestamp and then return a BaseDataCollection(end_time, config.symbol, objects) where objects is a list of your custom data objects.
Select Language:
class MyCustomUniverseDataClass(PythonData):
    
    def get_source(self, config, date, isLive):
        return SubscriptionDataSource("your-data-source-url", SubscriptionTransportMedium.REMOTE_FILE, FileFormat.UNFOLDING_COLLECTION)

    def reader(self, config, line, date, isLive):
        json_response = json.loads(line)
        
        end_time = datetime.strptime(json_response[-1]["date"], '%Y-%m-%d') + timedelta(1)

        data = list()

        for json_datum in json_response:
            datum = MyCustomUniverseDataClass()
            datum.symbol = Symbol.create(json_datum["Ticker"], SecurityType.EQUITY, Market.USA)
            datum.time = datetime.strptime(json_datum["date"], '%Y-%m-%d') 
            datum.end_time = datum.time + timedelta(1)
            datum['CustomAttribute1'] = int(json_datum['Attr1'])
            datum.value = float(json_datum['Attr1'])
            data.append(datum)

        return BaseDataCollection(end_time, config.symbol, data)

Initialize Custom Universes

To add a custom universe to your algorithm, in the initialize method, pass your universe type and a selector function to the add_universe method. The selector function receives a list of your custom objects and must return a list of Symbol objects. In the selector function definition, you can use any of the properties of your custom data type. The Symbol objects that you return from the selector function set the constituents of the universe.

Select Language:
# In Initialize
self._universe = self.add_universe(MyCustomUniverseDataClass, "myCustomUniverse", Resolution.DAILY, self.selector_function)

# Define the selector function
def selector_function(self, data: List[MyCustomUniverseDataClass]) -> List[Symbol]:
    sorted_data = sorted([ x for x in data if x["CustomAttribute1"] > 0 ],
                         key=lambda x: x["CustomAttribute2"],
                         reverse=True)
    return [x.symbol for x in sorted_data[:5]]

Historical Data

To get historical custom universe data, call the history method with the Universe object and the lookback period. The return type is a pandas.DataFrame where the columns contain the custom type attributes.

Select Language:
# DataFrame where teh columns are the universe attributes:
history_df = self.history(self._universe, 30, flatten=True)

# Series where the values are lists of universe objects:
history_series = self.history(self._universe, 30)
for time, data in history_series.iterrows():
    for single_stock_data in data:
        self.log(f"{single_stock_data.symbol} CustomAttribute1 at {single_stock_data.end_time}: {single_stock_data['CustomAttribute1']}")

Selection Frequency

Custom universes run on a schedule based on the end_time of your custom data objects. To adjust the selection schedule, see Schedule.

Examples

The following examples demonstrate some common practices for Custom Universe.

Example 1: Sourcing from the Object Store

This project demonstrates how to read custom data from the Object Store, and then use it to define a universe and place trades. The following research environment file generates the demo universe data, which contains a daily set of assets and their respective signals:

Select Language:
# Set a random seed to ensure reproducibility.
import random
np.random.seed(0)
# Select the asset weights for each trading day.
indices = [[x] * 3 for x in pd.bdate_range('2015-01-01', '2024-12-31')]
weights = list(np.random.dirichlet((10, 5, 3), size=(len(indices),)).flatten())
# Select the universe for each trading day.
equities = []
for i in range(len(indices)):
    random.seed(i)
    equities.extend(list(random.sample(["SPY", "TLT", "GLD", "USO", "IWM"], 3)))
# Organize the data into a DataFrame.
df = pd.DataFrame({"Date": [x for y in indices for x in y], "Symbol": equities, "Weight": weights})
# Save the DataFrame as a CSV in the Object Store.
df.to_csv(QuantBook().object_store.get_file_path("portfolio-targets.csv"), index=False)

The following algorithm file reads the preceding CSV file from the Object Store and uses its contents to form the daily universe and place trades:

Select Language:
class CustomUniverseExampleAlgorithm(QCAlgorithm):

    def initialize(self) -> None:
        self.set_start_date(2015, 1, 1)
        # Add a universe that reads from the Object Store.
        self._universe = self.add_universe(
            CustomUniverseData, "CustomUniverse", Resolution.DAILY, self._selector_function
        )
        # Add a Scheduled Event to rebalance the portfolio.
        spy = Symbol.create('SPY', SecurityType.EQUITY, Market.USA)
        self.schedule.on(
            self.date_rules.every_day(spy),
            self.time_rules.after_market_open(spy, 1),
            lambda: self.set_holdings(
                [PortfolioTarget(symbol, self._weight_by_symbol[symbol]) for symbol in self._universe.selected],
                True
            )
        )
    
    def _selector_function(self, alt_coarse: List[PythonData]) -> List[Symbol]:
        # Select the symbols that have a significant weight in the custom universe data to avoid 
        # small-size trades that erode returns. Save the weight to use during the rebalance.
        self._weight_by_symbol = {d.symbol: d.weight for d in alt_coarse if d["weight"] > 0.05}
        return list(self._weight_by_symbol.keys())


class CustomUniverseData(PythonData):

    def get_source(self, config: SubscriptionDataConfig, date: datetime, is_live_mode: bool) -> SubscriptionDataSource:
        # Define the location and format of the data file.
        return SubscriptionDataSource(
            "portfolio-targets.csv", SubscriptionTransportMedium.OBJECT_STORE, FileFormat.CSV
        )

    def reader(self, config: SubscriptionDataConfig, line: str, date: datetime, is_live_mode: bool) -> BaseData:
        # Skip the header row.
        if not line[0].isnumeric():
            return None
        # Split the line by each comma.
        items = line.split(",")
        # Parse the data from the CSV file.
        data = CustomUniverseData()
        data.end_time = datetime.strptime(items[0], "%Y-%m-%d")
        data.time = data.end_time - timedelta(1)
        data.symbol = Symbol.create(items[1], SecurityType.EQUITY, Market.USA)
        data["weight"] = float(items[2])
        return data

Other Examples

For more examples, see the following algorithms:

You can also see our Videos. You can also get in touch with us via Discord.

Did you find this page helpful?

Contribute to the documentation: