Overall Statistics |
Total Trades 0 Average Win 0% Average Loss 0% Compounding Annual Return 0% Drawdown 0% Expectancy 0 Net Profit 0% Sharpe Ratio 0 Probabilistic Sharpe Ratio 0% Loss Rate 0% Win Rate 0% Profit-Loss Ratio 0 Alpha 0 Beta 0 Annual Standard Deviation 0 Annual Variance 0 Information Ratio -0.618 Tracking Error 0.306 Treynor Ratio 0 Total Fees $0.00 |
import dateutil import json from missingdatatracker import * import parameters class ParticleVentralContainmentField(QCAlgorithm): ''' Select a universe of securities then see how many data points are missing for them at hourly resolution. ''' def Initialize(self): self.delisted = set() self.missingDataTrackers = {} self.universeSelected = False self.Debug(parameters.final) startDate = datetime.strptime(parameters.start, "%Y-%m-%d") if parameters.start else datetime(2020, 1, 1) endDate = datetime.strptime(parameters.end, "%Y-%m-%d") if parameters.end else datetime(2020, 12, 31) self.SetStartDate(startDate) self.SetEndDate(endDate) self.SetCash(100000) # Set Strategy Cash self.UniverseSettings.Resolution = Resolution.Hour self.UniverseSettings.FillForward = False self.coarseUniverse = self.AddUniverse(self.CoarseFilter) def CoarseFilter(self, coarse): if self.universeSelected: return Universe.Unchanged self.universeSelected = True usEquities = [c for c in coarse if c.Symbol.ID.Market.lower() == "usa" and c.Symbol.SecurityType == SecurityType.Equity] usEquities.sort(key=lambda c: c.DollarVolume, reverse=True) universe = [c.Symbol for c in usEquities[:500]] self.Debug(f"{[s.Value for s in universe]}") for symbol in universe: if symbol not in self.missingDataTrackers: mdt = MissingDataTracker(self, symbol) self.missingDataTrackers[symbol] = mdt return universe def OnData(self, slice): for kvp in slice.Delistings: if kvp.Value.Type == DelistingType.Delisted: if kvp.Key in self.missingDataTrackers: self.Debug(f"{kvp.Key.Value} delisted") self.missingDataTrackers[kvp.Key].TrackRun() self.delisted.add(kvp.Key) if self.Time.hour >= 9 and self.Time.hour <= 16 and self.Time.minute == 0: for symbol in self.coarseUniverse.Securities.Keys: if symbol in self.delisted: continue mdt = self.missingDataTrackers[symbol] if mdt.lastTime != self.Time: if slice.Bars.ContainsKey(symbol) and slice.Bars[symbol].Volume > 0: mdt.DataReceived(self.Time) else: mdt.DataMissing(self.Time) self.SetRuntimeStatistic("received", sum(mdt.receivedCnt for mdt in self.missingDataTrackers.values())) self.SetRuntimeStatistic("missing", sum(mdt.missingCnt for mdt in self.missingDataTrackers.values())) self.SetRuntimeStatistic("symbolsWithMissing", sum(mdt.missingCnt > 0 for mdt in self.missingDataTrackers.values())) def OnEndOfAlgorithm(self): # Output in a format that makes it easy to verify in research environment # dict mapping security identifier to array of arrays obj = {} totalReceived = 0 totalMissing = 0 symbolsWithMissing = 0 for (symbol,mdt) in self.missingDataTrackers.items(): mdt.TrackRun() # Check correctness before we dump a bunch of logs for i in range(0, len(mdt.runs), 2): if i+1 < len(mdt.runs) and mdt.runs[i][1] >= mdt.runs[i+1][0]: self.Debug("Runs overlap") self.Debug(f"{symbol.Value} rcvd={mdt.receivedCnt} missing={mdt.missingCnt} runs={[(a.isoformat(), b.isoformat()) for (a,b) in mdt.runs]}") return totalReceived += mdt.receivedCnt totalMissing += mdt.missingCnt if mdt.runs: symbolsWithMissing += 1 obj[str(symbol.ID)] = [[start.isoformat(), end.isoformat()] for (start, end) in mdt.runs] self.Debug(f"Missing rate = {totalMissing}/{totalMissing+totalReceived} = {totalMissing/(totalMissing+totalReceived)}") self.Debug(f"Symbols with missing data = {symbolsWithMissing}/{self.coarseUniverse.Securities.Count} = {symbolsWithMissing/self.coarseUniverse.Securities.Count}") self.Debug(json.dumps(obj))
class MissingDataTracker: def __init__(self, algorithm, symbol): self.algorithm = algorithm self.symbol = symbol self.runStart = None self.lastTime = algorithm.Time self.runs = [] self.receivedCnt = 0 self.missingCnt = 0 def DataReceived(self, time): #self.algorithm.Debug(f"{self.symbol.Value} DataReceived {time} missing={self.missing} lastDataPoint={self.lastDataPoint}") self.TrackRun() self.receivedCnt += 1 self.lastTime = time def DataMissing(self, time): #self.algorithm.Debug(f"{self.symbol.Value} DataMissing {time} missing={self.missing} lastDataPoint={self.lastDataPoint}") if not self.runStart: self.runStart = time self.missingCnt += 1 self.lastTime = time def TrackRun(self): if self.runStart: #self.algorithm.Debug(f"{self.symbol.Value} missing={self.missing} run={(self.lastDataPoint, time)}") self.runs.append((self.runStart, self.lastTime)) self.runStart = None
import json # The following line is what gets replaced by script _parametersJson = '{"start": "2020-01-01", "end": "2020-12-31"}' _defaults = { ################################################################################################################ # These are parameters that are passed to the backtesting framework ################################################################################################################ "start": None, "end": None, } _parsedJson = json.loads(_parametersJson) final = {k: _parsedJson[k] if k in _parsedJson else _defaults[k] for k in _defaults} start = final["start"] end = final["end"]