Overall Statistics |
Total Orders 0 Average Win 0% Average Loss 0% Compounding Annual Return 0% Drawdown 0% Expectancy 0 Start Equity 100000 End Equity 100000 Net Profit 0% Sharpe Ratio 0 Sortino Ratio 0 Probabilistic Sharpe Ratio 0% Loss Rate 0% Win Rate 0% Profit-Loss Ratio 0 Alpha 0 Beta 0 Annual Standard Deviation 0 Annual Variance 0 Information Ratio 0 Tracking Error 0 Treynor Ratio 0 Total Fees $0.00 Estimated Strategy Capacity $0 Lowest Capacity Asset Portfolio Turnover 0% |
# region imports from AlgorithmImports import * from collections import defaultdict # endregion class ConsolidatorTestAlgorithm(QCAlgorithm): def Initialize(self): self.SetStartDate(2020, 1, 1) self.SetEndDate(2020, 1, 2) self.SetCash(100000) # Choose a single symbol self.symbol = self.AddEquity("SPY", Resolution.Minute).Symbol # Set up expected and actual consolidation times self.expected_consolidation_times = {} self.actual_consolidation_times = {} self.mismatch_stats = defaultdict(lambda: defaultdict(list)) # Set up manual aggregation dictionary self.manual_aggregation = {} # Set up consolidators self.target_resolutions = ['5m']#, '1h'] self.consolidators = {} for res in self.target_resolutions: interval = self.str_to_timedelta(res) consolidator = TradeBarConsolidator(interval) consolidator.DataConsolidated += self.OnDataConsolidated self.SubscriptionManager.AddConsolidator(self.symbol, consolidator) self.consolidators[res] = consolidator def OnData(self, data): current_time = self.Time exchange = self.Securities[self.symbol].Exchange market_open_time = exchange.Hours.GetNextMarketOpen(current_time.date(), False) market_close_time = exchange.Hours.GetNextMarketClose(current_time, False) if current_time >= market_open_time and current_time < market_close_time: for period, consolidator in self.consolidators.items(): interval = self.str_to_timedelta(period) if (current_time - market_open_time) % interval == timedelta(0): if period not in self.expected_consolidation_times: self.expected_consolidation_times[period] = [] self.expected_consolidation_times[period].append(current_time) if period not in self.manual_aggregation: self.manual_aggregation[period] = [] self.manual_aggregation[period].append({ 'open': data[self.symbol].Open, 'high': data[self.symbol].High, 'low': data[self.symbol].Low, 'close': data[self.symbol].Close, 'volume': data[self.symbol].Volume }) def OnDataConsolidated(self, sender, bar): # Identify the resolution of the consolidator period = next(key for key, value in self.consolidators.items() if value == sender) if period not in self.actual_consolidation_times: self.actual_consolidation_times[period] = [] self.actual_consolidation_times[period].append(bar.EndTime) # Aggregate data manually for comparison manual_data = self.AggregateManualData(period) if manual_data: open_price, high_price, low_price, close_price, volume = manual_data self.RecordMismatchStats('Open', period, bar.Open, open_price) self.RecordMismatchStats('Close', period, bar.Close, close_price) self.RecordMismatchStats('High', period, bar.High, high_price) self.RecordMismatchStats('Low', period, bar.Low, low_price) self.RecordMismatchStats('Volume', period, bar.Volume, volume) # if bar.Open != open_price: # self.Debug(f"Open mismatch for {self.symbol} ({period}): {bar.Open} vs {open_price}") # if bar.Close != close_price: # self.Debug(f"Close mismatch for {self.symbol} ({period}): {bar.Close} vs {close_price}") # if bar.High != high_price: # self.Debug(f"High mismatch for {self.symbol} ({period}): {bar.High} vs {high_price}") # if bar.Low != low_price: # self.Debug(f"Low mismatch for {self.symbol} ({period}): {bar.Low} vs {low_price}") # if bar.Volume != volume: # self.Debug(f"Volume mismatch for {self.symbol} ({period}): {bar.Volume} vs {volume}") self.manual_aggregation[period] = [] def AggregateManualData(self, period): if period in self.manual_aggregation and self.manual_aggregation[period]: data = self.manual_aggregation[period] open_price = data[0]['open'] close_price = data[-1]['close'] high_price = max(x['high'] for x in data) low_price = min(x['low'] for x in data) volume = sum(x['volume'] for x in data) return open_price, high_price, low_price, close_price, volume return None def str_to_timedelta(self, time_str): unit = time_str[-1] value = int(time_str[:-1]) if unit == 'm': return timedelta(minutes=value) elif unit == 'h': return timedelta(hours=value) elif unit == 'd': return timedelta(days=value) raise ValueError(f"Invalid time string {time_str}") def RecordMismatchStats(self, field_name, period, actual, expected): abs_diff = abs(actual - expected) rel_diff = abs_diff / expected if expected != 0 else float('inf') self.mismatch_stats[period][field_name].append((abs_diff, rel_diff)) if abs_diff > 0.01 * expected: # You can adjust this threshold as needed self.log(f"{field_name} mismatch for {self.symbol} ({period}): {actual} vs {expected} (abs_diff={abs_diff}, rel_diff={rel_diff:.4%})") def OnEndOfAlgorithm(self): for period, expected_times in self.expected_consolidation_times.items(): actual_times = self.actual_consolidation_times.get(period, []) missing_times = [time for time in expected_times if time not in actual_times] if missing_times: self.Debug(f"Missing consolidations for {self.symbol} ({period}): {missing_times}") # Calculate and print statistics for period, field_stats in self.mismatch_stats.items(): for field_name, diffs in field_stats.items(): abs_diffs, rel_diffs = zip(*diffs) self.log(f"{field_name} ({period}): " f"Mean Abs Diff = {np.mean(abs_diffs):.4f}, " f"Max Abs Diff = {np.max(abs_diffs):.4f}, " f"Mean Rel Diff = {np.mean(rel_diffs):.4%}, " f"Max Rel Diff = {np.max(rel_diffs):.4%}")