Overall Statistics |
Total Orders 0 Average Win 0% Average Loss 0% Compounding Annual Return 0% Drawdown 0% Expectancy 0 Start Equity 100000 End Equity 100000 Net Profit 0% Sharpe Ratio 0 Sortino Ratio 0 Probabilistic Sharpe Ratio 0% Loss Rate 0% Win Rate 0% Profit-Loss Ratio 0 Alpha 0 Beta 0 Annual Standard Deviation 0 Annual Variance 0 Information Ratio 0 Tracking Error 0 Treynor Ratio 0 Total Fees $0.00 Estimated Strategy Capacity $0 Lowest Capacity Asset Portfolio Turnover 0% |
# region imports from AlgorithmImports import * from collections import defaultdict # endregion """ This algorithm is designed to identify and quantify discrepancies between manually aggregated minute-level market data and the automatically consolidated data produced by QuantConnects TradeBarConsolidator. By consolidating data manually for multiple resolutions (e.g., 5 minutes, 30 minutes) and comparing it to the consolidator output, the algorithm highlights any differences in open, high, low, close, and volume values, providing statistical summaries of the deviations. Note: WorkingData is also affected, in fact incorrect looking WorkingData was how I identified the problem. """ class ConsolidatorTestAlgorithm(QCAlgorithm): def Initialize(self): self.SetStartDate(2020, 1, 6) self.SetEndDate(self.start_date) self.SetCash(100000) """ Mismatch Severity Display Threshold: Only display mismatches with percentage differences above this threshold (set to 0 to see all diffs)""" self.diff_display_thresh = 0.05 # Choose a single symbol self.ext_hrs = True self.symbol = self.AddEquity("AAPL", Resolution.Minute, extendedMarketHours=self.ext_hrs).Symbol # Set up expected and actual consolidation times self.expected_consolidation_times = {} self.actual_consolidation_times = {} # Dict to store stats self.mismatch_stats = defaultdict(lambda: defaultdict(list)) # Set up manual aggregation dictionary self.manual_aggregation = {} # Set up consolidators # self.target_resolutions = ['5m', '30m'] self.target_resolutions = ['30m'] self.consolidators = {} for res in self.target_resolutions: interval = self.str_to_timedelta(res) consolidator = TradeBarConsolidator(interval) consolidator.DataConsolidated += self.OnDataConsolidated self.SubscriptionManager.AddConsolidator(self.symbol, consolidator) self.consolidators[res] = consolidator self.log("(a) Mismatches: Consolidator vs Manual Aggregation") def OnData(self, data): # Current time current_time = self.Time # Market open/close times exchange = self.Securities[self.symbol].Exchange market_open_time = exchange.Hours.GetNextMarketOpen(current_time.date(), self.ext_hrs) market_close_time = exchange.Hours.GetNextMarketClose(current_time, self.ext_hrs) # Operate within market hours if current_time >= market_open_time and current_time < market_close_time: # Loop through consolitors for period, consolidator in self.consolidators.items(): # Add 1-minute bar to log - to be aggregated if period not in self.manual_aggregation: self.manual_aggregation[period] = [] self.manual_aggregation[period].append({ 'open': data[self.symbol].Open, 'high': data[self.symbol].High, 'low': data[self.symbol].Low, 'close': data[self.symbol].Close, 'volume': data[self.symbol].Volume }) # Get consolidator interval / resolution interval = self.str_to_timedelta(period) # Check if we're at the end of a consolidation period if (current_time - market_open_time) % interval == timedelta(0): if period not in self.expected_consolidation_times: self.expected_consolidation_times[period] = [] self.expected_consolidation_times[period].append(current_time) def OnDataConsolidated(self, sender, bar): # Identify the resolution of the consolidator period = next(key for key, value in self.consolidators.items() if value == sender) if period not in self.actual_consolidation_times: self.actual_consolidation_times[period] = [] self.actual_consolidation_times[period].append(bar.EndTime) # Aggregate data manually for comparison manual_data = self.AggregateManualData(period) if manual_data: open_price, high_price, low_price, close_price, volume = manual_data # Log mismatches above 2% difference threshold self.RecordMismatchStats('Open', period, bar.EndTime, bar.Open, open_price) self.RecordMismatchStats('Close', period, bar.EndTime, bar.Close, close_price) self.RecordMismatchStats('High', period, bar.EndTime, bar.High, high_price) self.RecordMismatchStats('Low', period, bar.EndTime, bar.Low, low_price) self.RecordMismatchStats('Volume', period, bar.EndTime, bar.Volume, volume) # Print all mismatches # if bar.Open != open_price: # self.log(f"Open mismatch for {self.symbol} ({period}): {bar.Open} vs {open_price}") # if bar.Close != close_price: # self.log(f"Close mismatch for {self.symbol} ({period}): {bar.Close} vs {close_price}") # if bar.High != high_price: # self.log(f"High mismatch for {self.symbol} ({period}): {bar.High} vs {high_price}") # if bar.Low != low_price: # self.log(f"Low mismatch for {self.symbol} ({period}): {bar.Low} vs {low_price}") # if bar.Volume != volume: # self.log(f"Volume mismatch for {self.symbol} ({period}): {bar.Volume} vs {volume}") self.manual_aggregation[period] = [] def AggregateManualData(self, period): if period in self.manual_aggregation and self.manual_aggregation[period]: data = self.manual_aggregation[period] open_price = data[0]['open'] close_price = data[-1]['close'] high_price = max(x['high'] for x in data) low_price = min(x['low'] for x in data) volume = sum(x['volume'] for x in data) return open_price, high_price, low_price, close_price, volume return None def str_to_timedelta(self, time_str): unit = time_str[-1] value = int(time_str[:-1]) if unit == 'm': return timedelta(minutes=value) elif unit == 'h': return timedelta(hours=value) elif unit == 'd': return timedelta(days=value) raise ValueError(f"Invalid time string {time_str}") def RecordMismatchStats(self, field_name, period, end_time, actual, expected): abs_diff = abs(actual - expected) rel_diff = abs_diff / expected if expected != 0 else float('inf') self.mismatch_stats[period][field_name].append((abs_diff, rel_diff)) if abs_diff > self.diff_display_thresh * expected: # You can adjust this threshold as needed self.log(f"{field_name} mismatch for {self.symbol} ({period}): {actual} vs {expected} (abs_diff={abs_diff}, rel_diff={rel_diff:.4%})") def OnEndOfAlgorithm(self): for period, expected_times in self.expected_consolidation_times.items(): actual_times = self.actual_consolidation_times.get(period, []) missing_times = [time for time in expected_times if time not in actual_times] if missing_times: self.Debug(f"Missing consolidations for {self.symbol} ({period}): {missing_times}") # Calculate and print statistics self.log("(b) Summary Statistics:") for period, field_stats in self.mismatch_stats.items(): for field_name, diffs in field_stats.items(): abs_diffs, rel_diffs = zip(*diffs) self.log(f"{field_name} ({period}): " f"Mean Abs Diff = {np.mean(abs_diffs):.4f}, " f"Max Abs Diff = {np.max(abs_diffs):.4f}, " f"Mean Rel Diff = {np.mean(rel_diffs):.4%}, " f"Max Rel Diff = {np.max(rel_diffs):.4%}")