Suppose we have a ScheduledEvent that occurs hourly, and minute-level data feed with consolidation into hourly bars. 

The default behavior for something like the following would use stale prices from the previous hour:

  1. class MyAlgo(QCAlgorithm):
  2. def Initialize(self):
  3. self.SetStartDate(2015, 1, 5)
  4. self.SetEndDate(2015, 1, 6)
  5. self.SetCash(1)
  6. # Subscribe to minute data
  7. self.symbols = ['EURUSD', 'GBPUSD', 'USDJPY']
  8. for symbol in self.symbols: self.AddForex(symbol, Resolution.Minute, Market.Oanda)
  9. # Rolling Window of 4 Hourly bars for each symbol
  10. self.symbolConsolidatedData = { symbol : RollingWindow[QuoteBar](4) for symbol in self.symbols }
  11. for symbol in self.symbols:
  12. consolidator = QuoteBarConsolidator(timedelta(hours=1))
  13. consolidator.DataConsolidated += self.OnDataConsolidated
  14. self.SubscriptionManager.AddConsolidator(self.Symbol(symbol), consolidator)
  15. # Hourly Scheduled Event
  16. self.Schedule.On(self.DateRules.EveryDay('EURUSD'),
  17. self.TimeRules.Every(timedelta(hours=1)),
  18. self.CheckDataWindows)
  19. def OnData(self, data):
  20. pass
  21. def OnDataConsolidated(self, sender, bar):
  22. self.symbolConsolidatedData[bar.Symbol.Value].Add(bar)
  23. def CheckDataWindows(self):
  24. # Number of bars in each window
  25. count = { symbol : window.Count for symbol, window in self.symbolConsolidatedData.items() }
  26. self.Log(count)
  27. # Most recent time
  28. if all( [c >= 1 for c in count.values()] ):
  29. time = { symbol: window[0].EndTime.isoformat() for symbol, window in self.symbolConsolidatedData.items() }
  30. self.Log(f'Most Recent Data: {time}')
  31. self.Log(f'Algorithm Time: {self.Time}')
  32. return
+ Expand

Logs:

  1. 2015-01-05 01:00:00 : {'EURUSD': 1, 'GBPUSD': 1, 'USDJPY': 1}
  2. 2015-01-05 01:00:00 : Most Recent Data: {'EURUSD': '2015-01-05T00:00:00', 'GBPUSD': '2015-01-05T00:00:00', 'USDJPY': '2015-01-05T00:00:00'}
  3. 2015-01-05 01:00:00 : Algorithm Time: 2015-01-05 01:00:00
  4. 2015-01-05 02:00:00 : {'EURUSD': 2, 'GBPUSD': 2, 'USDJPY': 2}
  5. 2015-01-05 02:00:00 : Most Recent Data: {'EURUSD': '2015-01-05T01:00:00', 'GBPUSD': '2015-01-05T01:00:00', 'USDJPY': '2015-01-05T01:00:00'}
  6. 2015-01-05 02:00:00 : Algorithm Time: 2015-01-05 02:00:00
  7. 2015-01-05 03:00:00 : {'EURUSD': 3, 'GBPUSD': 3, 'USDJPY': 3}
  8. 2015-01-05 03:00:00 : Most Recent Data: {'EURUSD': '2015-01-05T02:00:00', 'GBPUSD': '2015-01-05T02:00:00', 'USDJPY': '2015-01-05T02:00:00'}
  9. 2015-01-05 03:00:00 : Algorithm Time: 2015-01-05 03:00:00

For instance, at time 2015-01-0501:00:00, the goal is to use a ScheduledEvent to run some analysis on the hourly bar that closes at time 2015-01-0501:00:00, and generate predictions for 2015-01-0502:00:00. However because the ScheduledEvent occurs at the same time the hourly bar consolidates, it is using the (stale) previous hourly bar closing at time 2015-01-0500:00:00 instead.

Any suggestions on “delaying” the ScheduledEvent until the hourly bar is ready? E.g. Hourly Bar Consolidates at time 2015-01-0501:00:00, Scheduled Event is called at time 2015-01-0501:00:01

Author

Adam W

June 2021