Overall Statistics
Total Orders
0
Average Win
0%
Average Loss
0%
Compounding Annual Return
0%
Drawdown
0%
Expectancy
0
Start Equity
100000
End Equity
100000
Net Profit
0%
Sharpe Ratio
0
Sortino Ratio
0
Probabilistic Sharpe Ratio
0%
Loss Rate
0%
Win Rate
0%
Profit-Loss Ratio
0
Alpha
0
Beta
0
Annual Standard Deviation
0
Annual Variance
0
Information Ratio
0
Tracking Error
0
Treynor Ratio
0
Total Fees
$0.00
Estimated Strategy Capacity
$0
Lowest Capacity Asset
Portfolio Turnover
0%
# region imports
from AlgorithmImports import *
from collections import defaultdict
# endregion

"""
This algorithm is designed to identify and quantify discrepancies between manually aggregated minute-level market data 
and the automatically consolidated data produced by QuantConnects TradeBarConsolidator. 

By consolidating data manually for multiple resolutions (e.g., 5 minutes, 30 minutes) and comparing it to the consolidator 
output, the algorithm highlights any differences in open, high, low, close, and volume values, providing statistical 
summaries of the deviations. 
"""

class ConsolidatorTestAlgorithm(QCAlgorithm):

    def Initialize(self):
        self.SetStartDate(2020, 1, 6)
        self.SetEndDate(self.start_date)
        self.SetCash(100000)

        """ Mismatch Severity Display Threshold: Only display mismatches with percentage differences above this threshold (set to 0 to see all diffs)""" 
        self.diff_display_thresh = 0.05

        # Choose a single symbol
        self.ext_hrs = True
        self.symbol = self.AddEquity("AAPL", Resolution.Minute, extendedMarketHours=self.ext_hrs).Symbol

        # Set up expected and actual consolidation times
        self.expected_consolidation_times = {}
        self.actual_consolidation_times = {}

        # Dict to store stats
        self.mismatch_stats = defaultdict(lambda: defaultdict(list))

        # Set up manual aggregation dictionary
        self.manual_aggregation = {}

        # Set up consolidators
        # self.target_resolutions = ['5m', '30m']
        self.target_resolutions = ['30m']
        self.consolidators = {}
        for res in self.target_resolutions:
            interval = self.str_to_timedelta(res)
            consolidator = TradeBarConsolidator(interval)
            consolidator.DataConsolidated += self.OnDataConsolidated
            self.SubscriptionManager.AddConsolidator(self.symbol, consolidator)
            self.consolidators[res] = consolidator
            
    def OnData(self, data):

        # Current time
        current_time = self.Time

        # Market open/close times
        exchange = self.Securities[self.symbol].Exchange
        market_open_time = exchange.Hours.GetNextMarketOpen(current_time.date(), self.ext_hrs)
        market_close_time = exchange.Hours.GetNextMarketClose(current_time, self.ext_hrs)
        
        # Operate within market hours
        if current_time >= market_open_time and current_time < market_close_time:

            # Loop through consolitors
            for period, consolidator in self.consolidators.items():
                
                # Add 1-minute bar to log - to be aggregated
                if period not in self.manual_aggregation:
                    self.manual_aggregation[period] = []
                self.manual_aggregation[period].append({
                    'open': data[self.symbol].Open,
                    'high': data[self.symbol].High,
                    'low': data[self.symbol].Low,
                    'close': data[self.symbol].Close,
                    'volume': data[self.symbol].Volume
                })

                # Get consolidator interval / resolution
                interval = self.str_to_timedelta(period)

                # Check if we're at the end of a consolidation period
                if (current_time - market_open_time) % interval == timedelta(0):
                    if period not in self.expected_consolidation_times:
                        self.expected_consolidation_times[period] = []
                    self.expected_consolidation_times[period].append(current_time)

    def OnDataConsolidated(self, sender, bar):
        # Identify the resolution of the consolidator
        period = next(key for key, value in self.consolidators.items() if value == sender)
        
        if period not in self.actual_consolidation_times:
            self.actual_consolidation_times[period] = []
        self.actual_consolidation_times[period].append(bar.EndTime)

        # Aggregate data manually for comparison
        manual_data = self.AggregateManualData(period)
        
        if manual_data:
            open_price, high_price, low_price, close_price, volume = manual_data

            # Log mismatches above 2% difference threshold
            self.RecordMismatchStats('Open', period, bar.EndTime, bar.Open, open_price)
            self.RecordMismatchStats('Close', period, bar.EndTime, bar.Close, close_price)
            self.RecordMismatchStats('High', period, bar.EndTime, bar.High, high_price)
            self.RecordMismatchStats('Low', period, bar.EndTime, bar.Low, low_price)
            self.RecordMismatchStats('Volume', period, bar.EndTime, bar.Volume, volume)

            # Print all mismatches
            # if bar.Open != open_price:
            #     self.log(f"Open mismatch for {self.symbol} ({period}): {bar.Open} vs {open_price}")
            # if bar.Close != close_price:
            #     self.log(f"Close mismatch for {self.symbol} ({period}): {bar.Close} vs {close_price}")
            # if bar.High != high_price:
            #     self.log(f"High mismatch for {self.symbol} ({period}): {bar.High} vs {high_price}")
            # if bar.Low != low_price:
            #     self.log(f"Low mismatch for {self.symbol} ({period}): {bar.Low} vs {low_price}")
            # if bar.Volume != volume:
            #     self.log(f"Volume mismatch for {self.symbol} ({period}): {bar.Volume} vs {volume}")

        self.manual_aggregation[period] = []

    def AggregateManualData(self, period):
        if period in self.manual_aggregation and self.manual_aggregation[period]:
            data = self.manual_aggregation[period]
            open_price = data[0]['open']
            close_price = data[-1]['close']
            high_price = max(x['high'] for x in data)
            low_price = min(x['low'] for x in data)
            volume = sum(x['volume'] for x in data)
            return open_price, high_price, low_price, close_price, volume
        return None

    def str_to_timedelta(self, time_str):
        unit = time_str[-1]
        value = int(time_str[:-1])
        if unit == 'm':
            return timedelta(minutes=value)
        elif unit == 'h':
            return timedelta(hours=value)
        elif unit == 'd':
            return timedelta(days=value)
        raise ValueError(f"Invalid time string {time_str}")

    def RecordMismatchStats(self, field_name, period, end_time, actual, expected):
        abs_diff = abs(actual - expected)
        rel_diff = abs_diff / expected if expected != 0 else float('inf')
        
        self.mismatch_stats[period][field_name].append((abs_diff, rel_diff))
        
        if abs_diff > self.diff_display_thresh * expected:  # You can adjust this threshold as needed
            self.log(f"{end_time} | {field_name} mismatch for {self.symbol} ({period}): {actual} vs {expected} (abs_diff={abs_diff}, rel_diff={rel_diff:.4%})")


    def OnEndOfAlgorithm(self):
        for period, expected_times in self.expected_consolidation_times.items():
            actual_times = self.actual_consolidation_times.get(period, [])
            missing_times = [time for time in expected_times if time not in actual_times]
            if missing_times:
                self.Debug(f"Missing consolidations for {self.symbol} ({period}): {missing_times}")

        # Calculate and print statistics
        self.log("Summary Statistics:")
        for period, field_stats in self.mismatch_stats.items():
            for field_name, diffs in field_stats.items():
                abs_diffs, rel_diffs = zip(*diffs)
                self.log(f"{field_name} ({period}): "
                        f"Mean Abs Diff = {np.mean(abs_diffs):.4f}, "
                        f"Max Abs Diff = {np.max(abs_diffs):.4f}, "
                        f"Mean Rel Diff = {np.mean(rel_diffs):.4%}, "
                        f"Max Rel Diff = {np.max(rel_diffs):.4%}")