Universes
Custom Universes
Define Custom Universe Types
Custom universes should extend the BaseData
PythonData
class. Extensions of the BaseData
PythonData
class must implement a GetSource
get_source
and Reader
reader
method.
The GetSource
get_source
method in your custom data class instructs LEAN where to find the data. This method must return a SubscriptionDataSource
object, which contains the data location and format (SubscriptionTransportMedium
). You can even change source locations for backtesting and live modes. We support many different data sources.
The Reader
reader
method of your custom data class takes one line of data from the source location and parses it into one of your custom objects. You can add as many properties to your custom data objects as you need, but must set Symbol
symbol
and EndTime
end_time
properties. When there is no useable data in a line, the method should return null
None
. LEAN repeatedly calls the Reader
reader
method until the date/time advances or it reaches the end of the file.
//Example custom universe data; it is virtually identical to other custom data types. public class MyCustomUniverseDataClass : BaseData { public int CustomAttribute1; public decimal CustomAttribute2; public override DateTime EndTime { // define end time as exactly 1 day after Time get { return Time + QuantConnect.Time.OneDay; } set { Time = value - QuantConnect.Time.OneDay; } } public override SubscriptionDataSource GetSource(SubscriptionDataConfig config, DateTime date, bool isLiveMode) { return new SubscriptionDataSource(@"your-remote-universe-data", SubscriptionTransportMedium.RemoteFile); } public override BaseData Reader(SubscriptionDataConfig config, string line, DateTime date, bool isLiveMode) { var items = line.Split(","); // Generate required data, then return an instance of your class. return new MyCustomUniverseDataClass { EndTime = Parse.DateTimeExact(items[0], "yyyy-MM-dd"), Symbol = Symbol.Create(items[1], SecurityType.Equity, Market.USA), CustomAttribute1 = int.Parse(items[2]), CustomAttribute2 = decimal.Parse(items[3], NumberStyles.Any, CultureInfo.InvariantCulture) }; } }
# Example custom universe data; it is virtually identical to other custom data types. class MyCustomUniverseDataClass(PythonData): def get_source(self, config: SubscriptionDataConfig, date: datetime, is_live_mode: bool) -> SubscriptionDataSource: return SubscriptionDataSource(@"your-remote-universe-data", SubscriptionTransportMedium.REMOTE_FILE) def reader(self, config: SubscriptionDataConfig, line: str, date: datetime, is_live_mode: bool) -> BaseData: items = line.split(",") # Generate required data, then return an instance of your class. data = MyCustomUniverseDataClass() data.end_time = datetime.strptime(items[0], "%Y-%m-%d") # define Time as exactly 1 day earlier Time data.time = data.end_time - timedelta(1) data.symbol = Symbol.create(items[1], SecurityType.CRYPTO, Market.BITFINEX) data["CustomAttribute1"] = int(items[2]) data["CustomAttribute2"] = float(items[3]) return data
Your Reader
reader
method should return objects in chronological order. If an object has a timestamp that is the same or earlier than the timestamp of the previous object, LEAN ignores it.
If you need to create multiple objects in your Reader
reader
method from a single line
, follow these steps:
- In the
GetSource
get_source
method, passFileFormat.UnfoldingCollection
FileFormat.UNFOLDING_COLLECTION
as the third argument to theSubscriptionDataSource
constructor. - In the
Reader
reader
method, order the objects by their timestamp and then return aBaseDataCollection(endTime, config.Symbol, objects)
BaseDataCollection(end_time, config.symbol, objects)
whereobjects
is a list of your custom data objects.
public class MyCustomUniverseDataClass : BaseData { [JsonProperty(PropertyName = "Attr1")] public int CustomAttribute1 { get; set; } [JsonProperty(PropertyName = "Ticker")] public string Ticker { get; set; } [JsonProperty(PropertyName = "date")] public DateTime Date { get; set; } public override DateTime EndTime { // define end time as exactly 1 day after Time get { return Time + QuantConnect.Time.OneDay; } set { Time = value - QuantConnect.Time.OneDay; } } public MyCustomUniverseDataClass() { Symbol = Symbol.Empty; DataType = MarketDataType.Base; } public override SubscriptionDataSource GetSource(SubscriptionDataConfig config, DateTime date, bool isLiveMode) { return new SubscriptionDataSource(@"your-data-source-url", SubscriptionTransportMedium.RemoteFile, FileFormat.UnfoldingCollection); } public override BaseData Reader(SubscriptionDataConfig config, string line, DateTime date, bool isLiveMode) { var items = JsonConvert.DeserializeObject<List<MyCustomUniverseDataClass>>(line); var endTime = items.Last().Date; foreach (var item in items) { item.Symbol = Symbol.Create(item.Ticker, SecurityType.Equity, Market.USA); item.Time = item.Date; item.Value = (decimal) item.CustomAttribute1; } return new BaseDataCollection(endTime, config.Symbol, items); } }
class MyCustomUniverseDataClass(PythonData): def get_source(self, config, date, isLive): return SubscriptionDataSource("your-data-source-url", SubscriptionTransportMedium.REMOTE_FILE, FileFormat.UNFOLDING_COLLECTION) def reader(self, config, line, date, isLive): json_response = json.loads(line) end_time = datetime.strptime(json_response[-1]["date"], '%Y-%m-%d') + timedelta(1) data = list() for json_datum in json_response: datum = MyCustomUniverseDataClass() datum.symbol = Symbol.create(json_datum["Ticker"], SecurityType.EQUITY, Market.USA) datum.time = datetime.strptime(json_datum["date"], '%Y-%m-%d') datum.end_time = datum.time + timedelta(1) datum['CustomAttribute1'] = int(json_datum['Attr1']) datum.value = float(json_datum['Attr1']) data.append(datum) return BaseDataCollection(end_time, config.symbol, data)
Initialize Custom Universes
To add a custom universe to your algorithm, in the Initialize
initialize
method, pass your universe type and a selector function to the AddUniverse
add_universe
method. The selector function receives a list of your custom objects and must return a list of Symbol
objects. In the selector function definition, you can use any of the properties of your custom data type. The Symbol
objects that you return from the selector function set the constituents of the universe.
private Universe _universe; // In Initialize _universe = AddUniverse<MyCustomUniverseDataClass>("myCustomUniverse", Resolution.Daily, data => { return (from singleStockData in data where singleStockData.CustomAttribute1 > 0 orderby singleStockData.CustomAttribute2 descending select singleStockData.Symbol).Take(5); });
# In Initialize self._universe = self.add_universe(MyCustomUniverseDataClass, "myCustomUniverse", Resolution.DAILY, self.selector_function) # Define the selector function def selector_function(self, data: List[MyCustomUniverseDataClass]) -> List[Symbol]: sorted_data = sorted([ x for x in data if x["CustomAttribute1"] > 0 ], key=lambda x: x["CustomAttribute2"], reverse=True) return [x.symbol for x in sorted_data[:5]]
Historical Data
To get custom universe historical data, call the History
history
method with the Universe
object and the lookback period. The return type is a IEnumerable<BaseDataCollection>
and you have to cast its items to MyCustomUniverseDataClass
.
To get historical custom universe data, call the History
history
method with the Universe
object and the lookback period. The return type is a pandas.DataFrame
where the columns contain the custom type attributes.
var history = History(_universe, 30); foreach (var data in history) { foreach (MyCustomUniverseDataClass singleStockData in data) { Log($"{singleStockData.Symbol} CustomAttribute1 at {singleStockData.EndTime}: {singleStockData.CustomAttribute1}"); } }
history = self.history(self._universe, 30) for time, data in history.iterrows(): for single_stock_data in data: self.log(f"{single_stock_data.symbol} CustomAttribute1 at {single_stock_data.end_time}: {single_stock_data['CustomAttribute1']}")
Selection Frequency
Custom universes run on a schedule based on the EndTime
end_time
of your custom data objects. To adjust the selection schedule, see Schedule.
Examples
The following examples demonstrate some common practices for Custom Universe.
Example 1: Sourcing from the Object Store
This project demonstrates how to read custom data from the Object Store, and then use it to define a universe and place trades. The following research environment file generates the demo universe data, which contains a daily set of assets and their respective signals:
// Load the assembly files and data types in their own cell. #load "../Initialize.csx"
// Import the data types. #load "../QuantConnect.csx" using QuantConnect; using QuantConnect.Research; using QuantConnect.Data.Market; using MathNet.Numerics.Distributions; // Create a QuantBook. var qb = new QuantBook(); // Make a history request to get a list of trading days. var history = qb.History( Symbol.Create("SPY", SecurityType.Equity, Market.USA), new DateTime(2015, 1, 1), new DateTime(2024, 12, 31), Resolution.Daily ).ToList(); // Create a variable to store the file contents. var contents = new List<string>(); // Define the list of possible assets. var tickers = new List { "SPY", "TLT", "GLD", "USO", "IWM" }; // Loop through the list of trading days. for (int i = 0; i < history.Count; i++) { // Create a random number generator. var random = new Random(i); // Select 3 of the assets and define their target portfolio weights. var date = history[i].EndTime; var equities = tickers.OrderBy(x => random.Next()).Take(3).ToList(); var weights = Dirichlet.Sample(random, new[] { 10d, 5d, 3d }); // Append the target portfolio weights for this trading day to the file. contents.AddRange(new [] { $"{date:yyyy-MM-dd},{equities[0]},{weights[0]}", $"{date:yyyy-MM-dd},{equities[1]},{weights[1]}", $"{date:yyyy-MM-dd},{equities[2]},{weights[2]}" }); } // Save the file to the Object Store. Console.WriteLine(qb.ObjectStore.Save("portfolioTargets.csv", string.Join("\n", contents)));
# Set a random seed to ensure reproducibility. import random np.random.seed(0) # Select the asset weights for each trading day. indices = [[x] * 3 for x in pd.bdate_range('2015-01-01', '2024-12-31')] weights = list(np.random.dirichlet((10, 5, 3), size=(len(indices),)).flatten()) # Select the universe for each trading day. equities = [] for i in range(len(indices)): random.seed(i) equities.extend(list(random.sample(["SPY", "TLT", "GLD", "USO", "IWM"], 3))) # Organize the data into a DataFrame. df = pd.DataFrame({"Date": [x for y in indices for x in y], "Symbol": equities, "Weight": weights}) # Save the DataFrame as a CSV in the Object Store. df.to_csv(QuantBook().object_store.get_file_path("portfolio-targets.csv"), index=False)
The following algorithm file reads the preceding CSV file from the Object Store and uses its contents to form the daily universe and place trades:
public class CustomUniverseExampleAlgorithm : QCAlgorithm { // Create a dictionary to hold the target weight of each asset. private Dictionary<Symbol, decimal> _weightBySymbol = new(); private Universe _universe; public override void Initialize() { SetStartDate(2015, 1, 1); // Add a universe that reads from the Object Store. _universe = AddUniverse<CustomUniverseData>("CustomUniverse", Resolution.Daily, (altCoarse) => { // Select the symbols that have a significant weight in the custom universe data to avoid // small-size trades that erode returns. Save the weight to use during the rebalance. _weightBySymbol = altCoarse .Select(d => d as CustomUniverseData) .Where(d => d.Weight > 0.05m) .ToDictionary(d => d.Symbol, d => d.Weight); return _weightBySymbol.Keys; }); // Add a Scheduled Event to rebalance the portfolio. var spy = QuantConnect.Symbol.Create("SPY", SecurityType.Equity, Market.USA); Schedule.On( DateRules.EveryDay(spy), TimeRules.AfterMarketOpen(spy, 1), () => SetHoldings( _universe.Selected.Select(symbol => new PortfolioTarget(symbol, _weightBySymbol[symbol])).ToList(), true ) ); } } public class CustomUniverseData : BaseData { public decimal Weight; public override DateTime EndTime { // Set the data period to 1 day. get { return Time + QuantConnect.Time.OneDay; } set { Time = value - QuantConnect.Time.OneDay; } } public override SubscriptionDataSource GetSource(SubscriptionDataConfig config, DateTime date, bool isLiveMode) { // Define the location and format of the data file. return new SubscriptionDataSource( "portfolioTargets.csv", SubscriptionTransportMedium.ObjectStore, FileFormat.Csv ); } public override BaseData Reader(SubscriptionDataConfig config, string line, DateTime date, bool isLiveMode) { // Skip the header row. if (!Char.IsDigit(line[0])) { return null; } // Split the line by each comma. var items = line.Split(","); // Parse the data from the CSV file. return new CustomUniverseData { EndTime = Parse.DateTimeExact(items[0], "yyyy-MM-dd"), Symbol = Symbol.Create(items[1], SecurityType.Equity, Market.USA), Weight = decimal.Parse(items[2], NumberStyles.Any, CultureInfo.InvariantCulture) }; } }
class CustomUniverseExampleAlgorithm(QCAlgorithm): def initialize(self) -> None: self.set_start_date(2015, 1, 1) # Add a universe that reads from the Object Store. self._universe = self.add_universe( CustomUniverseData, "CustomUniverse", Resolution.DAILY, self._selector_function ) # Add a Scheduled Event to rebalance the portfolio. spy = Symbol.create('SPY', SecurityType.EQUITY, Market.USA) self.schedule.on( self.date_rules.every_day(spy), self.time_rules.after_market_open(spy, 1), lambda: self.set_holdings( [PortfolioTarget(symbol, self._weight_by_symbol[symbol]) for symbol in self._universe.selected], True ) ) def _selector_function(self, alt_coarse: List[PythonData]) -> List[Symbol]: # Select the symbols that have a significant weight in the custom universe data to avoid # small-size trades that erode returns. Save the weight to use during the rebalance. self._weight_by_symbol = {d.symbol: d.weight for d in alt_coarse if d["weight"] > 0.05} return list(self._weight_by_symbol.keys()) class CustomUniverseData(PythonData): def get_source(self, config: SubscriptionDataConfig, date: datetime, is_live_mode: bool) -> SubscriptionDataSource: # Define the location and format of the data file. return SubscriptionDataSource( "portfolio-targets.csv", SubscriptionTransportMedium.OBJECT_STORE, FileFormat.CSV ) def reader(self, config: SubscriptionDataConfig, line: str, date: datetime, is_live_mode: bool) -> BaseData: # Skip the header row. if not line[0].isnumeric(): return None # Split the line by each comma. items = line.split(",") # Parse the data from the CSV file. data = CustomUniverseData() data.end_time = datetime.strptime(items[0], "%Y-%m-%d") data.time = data.end_time - timedelta(1) data.symbol = Symbol.create(items[1], SecurityType.EQUITY, Market.USA) data["weight"] = float(items[2]) return data
Other Examples
For more examples, see the following algorithms: