Writing Algorithms
Object Store
Introduction
The Object Store is a file system that you can use in your algorithms to save, read, and delete data. The Object Store is organization-specific, so you can save or read data from the same Object Store in all of your organization's projects. The Object Store works like a key-value storage system where you can store regular strings, JSON encoded strings, XML encoded strings, and bytes. You can access the data you store in the Object Store from backtests, the Research Environment, and live algorithms.
Get All Stored Data
To get all of the keys and values in the Object Store, iterate through the object_store
property.
for kvp in self.object_store: key = kvp.key value = kvp.value
To iterate through just the keys in the Object Store, iterate through the keys
property.
for key in self.object_store.keys: continue
Save Data
The Object Store saves objects under a key-value system. If you save objects in backtests, you can access them from the Research Environment. To avoid slowing down your backtests, save data once in the on_end_of_algorithm
event handler. In live trading, you can save data more frequently like at the end of a train
method or after universe selection.
If you run algorithms in QuantConnect Cloud, you need storage create permissions to save data in the Object Store.
If you don't have data to store, create some sample data.
You can save Bytes
and string
objects in the Object Store.
To store data, you need to provide a key. If you provide a key that is already in the Object Store, it will overwrite the data at that location. To avoid overwriting objects from other projects in your organization, prefix the key with your project ID. You can find the project ID in the URL of your browser when you open a project. For example, the ID of the project at quantconnect.com/project/12345 is 12345.
Strings
To save a string
object, call the save
or save_string
method.
save_successful = self.object_store.save(f"{self.project_id}/string_key", string_sample)
JSON
XML
Bytes
To save a Bytes
object (for example, zipped data), call the save_bytes
method.
save_successful = self.object_store.save_bytes(f"{self.project_id}/bytes_key", bytes_sample) zipped_data_sample = Compression.zip_bytes(bytes(string_sample, "utf-8"), "data") zip_save_successful = self.object_store.save_bytes(f"{self.project_id}/bytesKey.zip", zipped_data_sample)
Read Data
Read data from the Object Store to import algorithm variables between deployments, import data from the Research Environment, or load trained machine learning models. To read data from the Object Store, you need to provide the key you used to store the object.
You can load Bytes
and string
objects from the Object Store.
Before you read data from the Object Store, check if the key exists.
if self.object_store.contains_key(key): # Read data
Strings
To read a string
object, call the read
or read_string
method.
string_data = self.object_store.read(f"{self.project_id}/string_key")
JSON
XML
Bytes
To read a Bytes
object, call the read_bytes
method.
byte_data = self.object_store.read_bytes(f"{self.project_id}/bytes_key")
Delete Data
Delete objects in the Object Store to remove objects that you no longer need. If you run algorithms in QuantConnect Cloud, you need storage delete permissions to delete data from the Object Store.
To delete objects from the Object Store, call the delete
method. Before you delete data, check if the key exists. If you try to delete an object with a key that doesn't exist in the Object Store, the method raises an exception.
if self.object_store.contains_key(key): self.object_store.delete(key)
To delete all of the content in the Object Store, iterate through all the stored data.
for kvp in self.object_store: self.object_store.delete(kvp.key)
Cache Data
When you write to or read from the Object Store, the algorithm caches the data. The cache speeds up the algorithm execution because if you try to read the Object Store data again with the same key, it returns the cached data instead of downloading the data again. The cache speeds up execution, but it can cause problems if you are trying to share data between two nodes under the same Object Store key. For example, consider the following scenario:
- You open project A and save data under the key
123
. - You open project B and save new data under the same key
123
. - In project A, you read the Object Store data under the key
123
, expecting the data from project B, but you get the original data you saved in step #1 instead.
You get the data from step 1 instead of step 2 because the cache contains the data from step 1.
To clear the cache, call the Clear
method.
self.object_store.clear()
Storage Quotas
If you run algorithms locally, you can store as much data as your hardware will allow. If you run algorithms in QuantConnect Cloud, you must stay within your storage quota. If you need more storage space, edit your storage plan.
Example for DataFrames
Follow these steps to create a DataFrame, save it into the Object Store, and load it from the Object Store:
- Get some historical data.
- Get the file path for a specific key in the Object Store.
- Call the to_csv method to save the DataFrame in the Object Store as a CSV file.
- Call the read_csv method to load the CSV file from the Object Store.
spy = self.add_equity("SPY").symbol df = self.history(self.securities.keys, 360, Resolution.DAILY)
file_path = self.object_store.get_file_path("df_to_csv")
df.to_csv(file_path) # File size: 32721 bytes
reread = pd.read_csv(file_path)
pandas
supports saving and loading DataFrame objects in the following additional formats:
- XML
- JSON
- Parquet
- Pickle
file_path = self.object_store.get_file_path("df_to_xml") df.to_xml(file_path) # File size: 87816 bytes reread = pd.read_xml(file_path)
file_path = self.object_store.get_file_path("df_to_json") df.to_json(file_path) # File size: 125250 bytes reread = pd.read_json(file_path)
file_path = self.object_store.get_file_path("df_to_parquet") df.to_parquet(file_path) # File size: 23996 bytes reread = pd.read_parquet(file_path)
file_path = self.object_store.get_file_path("df_to_pickle") df.to_pickle(file_path) # File size: 19868 bytes reread = pd.read_pickle(file_path)
Example for Plotting
You can use the Object Store to plot data from your backtests and live algorithm in the Research Environment. The following example demonstrates how to plot a Simple Moving Average indicator that's generated during a backtest.
- Create a algorithm, add a data subscription, and add a simple moving average indicator.
- Save the indicator data as
string
inself.content
. - In the OnEndOfAlgorithm method, save the indicator data to the Object Store.
- Open the Research Environment and create a
QuantBook
. - Read the indicator data from the Object Store.
- Convert the data to a pandas object and create a chart.
class ObjectStoreChartingAlgorithm(QCAlgorithm): def initialize(self): self.add_equity("SPY") self.content = '' self._sma = self.sma("SPY", 22)
The algorithm will save self.content
to the Object Store.
def on_data(self, data: Slice): self.plot('SMA', 'Value', self.sma.current.value) self.content += f'{self.sma.current.end_time},{self.sma.current.value}\n'
def on_end_of_algorithm(self): self.object_store.save('sma_values_python', self.content)
qb = QuantBook()
content = qb.object_store.read("sma_values_python")
The key you provide must be the same key you used to save the object.
data = {} for line in content.split('\n'): csv = line.split(',') if len(csv) > 1: data[csv[0]] = float(csv[1]) series = pd.Series(data, index=data.keys()) series.plot()
class ObjectStoreChartingAlgorithm(QCAlgorithm): def initialize(self) -> None: self.set_start_date(2023, 1, 1) # Set Start Date self.set_cash(100000) # Set Strategy Cash self.add_equity("SPY", Resolution.MINUTE) self.content = '' # Create SMA indicator for referencing. self.sma = self.SMA("SPY", 22) def on_data(self, data: Slice) -> None: # Cache the indicator data point to save it. self.content += f'{self.sma.current.end_time},{self.sma.current.value}\n' def on_end_of_algorithm(self) -> None: # Save the indicator values to object store for logging. self.object_store.save('sma_values_python', self.content)
Example of Custom Data
Follow these steps to use the Object Store as a data source for custom data:
- Create a custom data class that defines a storage key and implements the
get_source
method. - Create an algorithm that downloads data from an external source and saves it to the Object Store.
- Call the
add_data
method to subscribe to the custom type. - Implement the
Reader
method for the custom data class. - To confirm your algorithm is receiving the custom data, request some historical data, then log and plot the data from the
Slice
object.
class Bitstamp(PythonData): KEY = 'bitstampusd.csv' def get_source(self, config, date, isLiveMode): return SubscriptionDataSource(Bitstamp.KEY, SubscriptionTransportMedium.OBJECT_STORE)
class ObjectStoreCustomDataAlgorithm(QCAlgorithm): def initialize(self): if not self.object_store.contains_key(Bitstamp.KEY): url = "https://raw.githubusercontent.com/QuantConnect/Documentation/master/Resources/datasets/custom-data/bitstampusd.csv" content = self.download(url) self.object_store.save(Bitstamp.KEY, content)
class ObjectStoreCustomDataAlgorithm(QCAlgorithm): def initialize(self): self.custom_data_symbol = self.add_data(Bitstamp, "BTC").symbol
class Bitstamp(PythonData): def reader(self, config, line, date, isLiveMode): # Example Line Format: # Date Open High Low Close Volume (BTC) Volume (Currency) Weighted Price # 2011-09-13 5.8 6.0 5.65 5.97 58.37138238, 346.0973893944 5.929230648356 if not line[0].isdigit(): return None coin = Bitstamp() coin.symbol = config.symbol data = line.split(',') # If value is zero, return None coin.value = float(data[4]) if coin.value == 0: return None coin.time = datetime.strptime(data[0], "%Y-%m-%d") coin.end_time = coin.time + timedelta(1) coin["Open"] = float(data[1]) coin["High"] = float(data[2]) coin["Low"] = float(data[3]) coin["Close"] = coin.value coin["VolumeBTC"] = float(data[5]) coin["VolumeUSD"] = float(data[6]) coin["WeightedPrice"] = float(data[7]) return coin
class ObjectStoreCustomDataAlgorithm(QCAlgorithm): def initialize(self): history = self.history(Bitstamp, self.custom_data_symbol, 200, Resolution.DAILY) self.debug(f"We got {len(history)} items from historical data request of {self.custom_data_symbol}.") def on_data(self, slice): data = slice.get(Bitstamp).get( self.custom_data_symbol) if not data: return self.log(f'{data.end_time}: Close: {data.close}') self.plot(self.custom_data_symbol, 'Price', data.close)
The following algorithm provides a full example of a sourcing custom data from the Object Store:
class ObjectStoreCustomDataAlgorithm(QCAlgorithm): def initialize(self) -> None: self.set_start_date(2012, 9, 13) self.set_end_date(2021, 6, 20) self.set_cash(100000) # Load the custom data from ObjectStore. if not self.object_store.contains_key(Bitstamp.KEY): url = "https://raw.githubusercontent.com/QuantConnect/Documentation/master/Resources/datasets/custom-data/bitstampusd.csv" content = self.download(url) self.object_store.save(Bitstamp.KEY, content) # Request custom data for plotting. # Define the symbol and "type" of our generic data. self.custom_data_symbol = self.add_data(Bitstamp, "BTC").symbol # Example historical call. history = self.history(Bitstamp, self.custom_data_symbol, 200, Resolution.DAILY) self.debug(f"We got {len(history)} items from historical data request of {self.custom_data_symbol}.") def on_data(self, slice: Slice) -> None: # Plot on updated data. data = slice.get(Bitstamp).get(self.custom_data_symbol) if not data: return self.log(f'{data.end_time}: Close: {data.close}') self.plot(self.custom_data_symbol, 'Price', data.close) class Bitstamp(PythonData): KEY = 'bitstampusd.csv' def get_source(self, config, date, isLiveMode): # Load data from object store. return SubscriptionDataSource(Bitstamp.KEY, SubscriptionTransportMedium.OBJECT_STORE) def reader(self, config, line, date, isLiveMode): # Example Line Format: # Date Open High Low Close Volume (BTC) Volume (Currency) Weighted Price # 2011-09-13 5.8 6.0 5.65 5.97 58.37138238, 346.0973893944 5.929230648356 if not line[0].isdigit(): return None coin = Bitstamp() coin.symbol = config.symbol data = line.split(',') # If value is zero, return None coin.value = float(data[4]) if coin.value == 0: return None coin.time = datetime.strptime(data[0], "%Y-%m-%d") coin.end_time = coin.time + timedelta(1) coin["Open"] = float(data[1]) coin["High"] = float(data[2]) coin["Low"] = float(data[3]) coin["Close"] = coin.value coin["VolumeBTC"] = float(data[5]) coin["VolumeUSD"] = float(data[6]) coin["WeightedPrice"] = float(data[7]) return coin
Preserve Insights Between Deployments
Follow these steps to use the Object Store to preserve the algorithm state across live deployments:
- Create an algorithm that defines a storage key and adds insights to the Insight Manager.
- At the top of the algorithm file, add the following imports:
- In the on_end_of_algorithm event handler of the algorithm, get the Insight objects and save them in the Object Store as a JSON object.
- At the bottom of the
initialize
method, read the Insight objects from the Object Store and add them to the Insight Manager.
class ObjectStoreChartingAlgorithm(QCAlgorithm): def initialize(self): self.insight_key = f"{self.project_id}/insights" self.set_universe_selection(ManualUniverseSelectionModel([ Symbol.create("SPY", SecurityType.EQUITY, Market.USA) ])) self.set_alpha(ConstantAlphaModel(InsightType.PRICE, InsightDirection.UP, timedelta(5), 0.025, None))
from Newtonsoft.Json import JsonConvert from System.Collections.Generic import List
Insight
objects are a C# objects, so you need the preceding C# libraries to serialize and deserialize them.
def on_end_of_algorithm(self): insights = self.insights.get_insights(lambda x: x.is_active(self.utc_time)) content = ','.join([JsonConvert.SerializeObject(x) for x in insights]) self.object_store.save(self.insight_key, f'[{content}]')
if self.object_store.contains_key(self.insight_key): insights = self.object_store.read_json[List[Insight]](self.insight_key) self.insights.add_range(insights)
The following algorithm provides a full example of preserving the Insight state between deployments:
class ObjectStoreInsightsAlgorithm(QCAlgorithm): def initialize(self) -> None: self.universe_settings.resolution = Resolution.DAILY self.set_start_date(2023,4,1) self.set_end_date(2023,4,11) self.set_cash(100000) self.insights_key = f"{self.project_id}/insights" # Read the file with the insights if self.object_store.contains_key(self.insights_key): # Load cached JSON key value pair from object store, then add to algorithm insights. insights = self.object_store.read_json[List[Insight]](self.insights_key) self.log(f"Read {len(insights)} insight(s) from the Object Store") self.insights.add_range(insights) # Delete the key to reuse it self.object_store.delete(self.insights_key) self.set_universe_selection(ManualUniverseSelectionModel([ Symbol.create("SPY", SecurityType.EQUITY, Market.USA) ])) self.set_alpha(ConstantAlphaModel(InsightType.PRICE, InsightDirection.UP, timedelta(5), 0.025, None)) self.set_portfolio_construction(EqualWeightingPortfolioConstructionModel(Resolution.DAILY)) def on_end_of_algorithm(self) -> None: # Get all active insights. insights = self.insights.get_insights(lambda x: x.is_active(self.utc_time)) # If we want to save all insights (expired and active), we can use # insights = self.insights.get_insights(lambda x: True) self.log(f"Save {len(insights)} insight(s) to the Object Store.") content = ','.join([JsonConvert.SerializeObject(x) for x in insights]) self.object_store.save(self.insights_key, f'[{content}]')
Live Trading Considerations
If you update the Object Store from outside of your live trading node and then try to read the new content from inside your live trading algorithm, you may not get the new content because the Object Store caches data to speed up execution. To ensure you get the latest value for a specific key in the Object Store, clear the cache and then read the data.
self.object_store.clear() if self.object_store.contains_key(key): value = self.object_store.read(key)