Hello,
The code below uses a symbol universe and multiple timeframes with quote consolidators.
I am having some trouble pumping in historical data into the consolidator Update method. It's not clear what format this data should be in. I get the following error: TypeError : No method matches given arguments for Update
Also any guidnace or feedback on the overall code structure would also be greatly appretiated.
class Sample(QCAlgorithm):
def Initialize(self):
self.SetStartDate(2020, 1, 1)
self.SetCash(100000)
self.AddUniverse(self.selection_coarse)
self.strategies = {}
def selection_coarse(self, coarse):
'''Coarse symbol universe filter'''
selected = [ x for x in coarse if x.HasFundamentalData and (x.Price >= 20) ]
dollar_volume = sorted(selected, key=lambda x: x.DollarVolume, reverse=True)
top = dollar_volume[:5]
symbols = [x.Symbol for x in top]
return symbols
def OnSecuritiesChanged(self, changes):
'''Manage Strategy object for each symbol in filtered universe'''
for security in changes.AddedSecurities:
symbol = security.Symbol
if symbol not in self.strategies:
self.strategies[symbol] = Strategy(self, symbol)
for security in changes.RemovedSecurities:
symbol = security.Symbol
if symbol in self.strategies:
strategy = self.strategies.pop(symbol, None)
self.SubscriptionManager.RemoveConsolidator(symbol, strategy.consolidator_intraday)
def OnData(self, data):
pass
class Strategy:
def __init__(self, qc, symbol):
self.qc = qc
self.symbol = symbol
# Create consolidator
self.consolidator_intraday = TradeBarConsolidator(timedelta(5))
# Attach event handler to consolidator
self.consolidator_intraday.DataConsolidated += self.handler_intraday
# Subscribe consolidator to data
self.qc.SubscriptionManager.AddConsolidator(self.symbol, self.consolidator_intraday)
# Create indicaotr
self._ema_fast_intraday = ExponentialMovingAverage(8)
# Create rolling window
self.ema_fast_intraday = RollingWindow[ExponentialMovingAverage](5)
# Pump historical data into consolidator
self.bars_required = 200
self.history = self.qc.History(self.symbol, self.bars_required)
for bar in self.history.itertuples():
self.consolidator_intraday.Update(bar)
def handler_intraday(self, sender, bar):
'''Event hander for intraday timeframe'''
# Update indicator
self._ema_fast_intraday.Update(bar.EndTime, bar.Close)
# Update rolling window
self.ema_fast_intraday.Add(self._ema_fast_intraday)
KV1214
In other words, what is the correct way to manually warmup consolidators with histoircal data in python?
Here is my attempt, which does not work:
# Pump historical data into consolidator self.history = self.qc.History(self.symbol, 200) for bar in self.history.itertuples(): self.consolidator_intraday.Update(bar)
Shile Wen
Hi KV1214,
I've answered your question in this thread. For future reference, please do not create duplicate threads for the same issue.
Best,
Shile Wen
KV1214
The material on this website is provided for informational purposes only and does not constitute an offer to sell, a solicitation to buy, or a recommendation or endorsement for any security or strategy, nor does it constitute an offer to provide investment advisory services by QuantConnect. In addition, the material offers no opinion with respect to the suitability of any security or specific investment. QuantConnect makes no guarantees as to the accuracy or completeness of the views expressed in the website. The views are subject to change, and may have become unreliable for various reasons, including changes in market conditions or economic circumstances. All investments involve risk, including loss of principal. You should consult with an investment professional before making any investment decisions.
To unlock posting to the community forums please complete at least 30% of Boot Camp.
You can continue your Boot Camp training progress from the terminal. We hope to see you in the community soon!