Created with Highcharts 12.1.2EquityMar 2023Mar…Apr 2023May 2023Jun 2023Jul 2023Aug 2023Sep 2023Oct 2023Nov 2023Dec 2023Jan 2024Feb 2024Mar 2024Apr 2024800k900k1,000k1,100k-20-10005-202050M100M050M100M02550
Overall Statistics
Total Orders
145
Average Win
0.63%
Average Loss
-0.62%
Compounding Annual Return
-10.147%
Drawdown
14.600%
Expectancy
-0.124
Start Equity
1000000
End Equity
898002.18
Net Profit
-10.200%
Sharpe Ratio
-1.434
Sortino Ratio
-1.299
Probabilistic Sharpe Ratio
1.002%
Loss Rate
57%
Win Rate
43%
Profit-Loss Ratio
1.02
Alpha
0
Beta
0
Annual Standard Deviation
0.086
Annual Variance
0.007
Information Ratio
-0.815
Tracking Error
0.086
Treynor Ratio
0
Total Fees
$2316.07
Estimated Strategy Capacity
$1500000.00
Lowest Capacity Asset
PX R735QTJ8XC9X
Portfolio Turnover
29.22%
# region imports
from AlgorithmImports import *
from universe import SectorETFUniverseSelectionModel
from portfolio import CointegratedVectorPortfolioConstructionModel
# endregion

class ETFPairsTrading(QCAlgorithm):

    def initialize(self):
        self.set_start_date(2023, 3, 1)  # Set Start Date
        self.set_end_date(2024, 3, 1) 
        self.set_cash(1000000)  # Set Strategy Cash

        lookback = self.get_parameter("lookback", 60)   # lookback window on correlation & coinetgration
        threshold = self.get_parameter("threshold", 2)   # we want at least 2+% expected profit margin to cover fees
        
        self.set_brokerage_model(BrokerageName.INTERACTIVE_BROKERS_BROKERAGE, AccountType.MARGIN)
        # This should be a intra-day strategy
        self.set_security_initializer(lambda security: security.set_margin_model(PatternDayTradingMarginModel()))
        
        self.universe_settings.resolution = Resolution.MINUTE
        self.universe_settings.data_normalization_mode = DataNormalizationMode.RAW
        self.set_universe_selection(SectorETFUniverseSelectionModel(self.universe_settings))

        # This alpha model helps to pick the most correlated pair
        # and emit signal when they have mispricing that stay active for a predicted period
        # https://www.quantconnect.com/docs/v2/writing-algorithms/algorithm-framework/alpha/supported-models#09-Pearson-Correlation-Pairs-Trading-Model
        self.add_alpha(PearsonCorrelationPairsTradingAlphaModel(lookback, Resolution.DAILY, threshold=threshold))

        # We try to use cointegrating vector to decide the relative movement magnitude of the paired assets
        self.pcm = CointegratedVectorPortfolioConstructionModel(self, lookback, Resolution.DAILY)
        self.pcm.rebalance_portfolio_on_security_changes = False
        self.set_portfolio_construction(self.pcm)

        # Avoid catastrophic loss in portfolio level by pair trading strategy
        #self.add_risk_management(MaximumDrawdownPercentPortfolio(0.08))

        self.set_warm_up(timedelta(90))

    def on_data(self, slice):
        if slice.splits or slice.dividends:
            self.pcm.handle_corporate_actions(self, slice)
#region imports
from AlgorithmImports import *
from Portfolio.EqualWeightingPortfolioConstructionModel import EqualWeightingPortfolioConstructionModel
from arch.unitroot.cointegration import engle_granger
from utils import reset_and_warm_up
#endregion

class CointegratedVectorPortfolioConstructionModel(EqualWeightingPortfolioConstructionModel):

    def __init__(self, algorithm, lookback = 252, resolution = Resolution.MINUTE, 
                 rebalance = Expiry.END_OF_WEEK) -> None:
        super().__init__(rebalance, PortfolioBias.LONG_SHORT)
        self.algorithm = algorithm
        self.lookback = lookback
        self.resolution = resolution

    def should_create_target_for_insight(self, insight: Insight) -> bool:
        # Ignore insights if the asset has open position in the same direction
        return self.should_create_new_target(insight.symbol, insight.direction)

    def determine_target_percent(self, active_insights: List[Insight]) -> Dict[Insight, float]:
        # If less than 2 active insights, no valid pair trading can be resulted
        if len(active_insights) < 2:
            self.live_log(self.algorithm, f'PortfolioContructionModel: Less then 2 insights. Create zero-quantity targets')
            return {insight: 0 for insight in active_insights}

        result = {}

        # Get log return for cointegrating vector regression
        logr = pd.DataFrame({symbol: self.returns(self.algorithm.securities[symbol]) 
            for symbol in self.algorithm.securities.keys() if symbol in [x.symbol for x in active_insights]})
        # fill nans with mean, if the whole column is nan, drop it
        logr = logr.fillna(logr.mean()).dropna(axis=1)
        # make sure we have at least 2 columns
        if logr.shape[1] < 2:
            self.live_log(self.algorithm, f'PortfolioContructionModel: Less then 2 insights. Create zero-quantity targets.')
            return {insight: 0 for insight in active_insights}
        # Obtain the cointegrating vector of all signaled assets for statistical arbitrage
        model = engle_granger(logr.iloc[:, 0], logr.iloc[:, 1:], trend='n', lags=0)
        
        # If result not significant, return
        if model.pvalue > 0.05:
            return {insight: 0 for insight in active_insights}
        
        # Normalization for budget constraint
        coint_vector = model.cointegrating_vector
        total_weight = sum(abs(coint_vector))

        for insight, weight in zip(active_insights, coint_vector):
            # we can assume any paired assets' 2 dimensions in coint_vector are in opposite sign
            result[insight] = abs(weight) / total_weight * insight.direction
            
        return result
        
    def on_securities_changed(self, algorithm, changes):
        self.live_log(algorithm, f'PortfolioContructionModel.on_securities_changed: Changes: {changes}')
        super().on_securities_changed(algorithm, changes)
        for added in changes.added_securities:
            self.init_security_data(algorithm, added)
        
        for removed in changes.removed_securities:
            self.dispose_security_data(algorithm, removed)

    def handle_corporate_actions(self, algorithm, slice):
        symbols = set(slice.dividends.keys())
        symbols.update(slice.splits.keys())

        for symbol in symbols:
            self.warm_up_indicator(algorithm.securities[symbol])

    def live_log(self, algorithm, message):
        if algorithm.live_mode:
            algorithm.log(message)

    def init_security_data(self, algorithm, security):
        # To store the historical daily log return
        security['window'] = RollingWindow[IndicatorDataPoint](self.lookback)

        # Use daily log return to predict cointegrating vector
        security['logr'] = LogReturn(1)
        security['logr'].updated += lambda _, updated: security['window'].add(IndicatorDataPoint(updated.end_time, updated.value))
        security['consolidator'] = TradeBarConsolidator(timedelta(1))

        # Subscribe the consolidator and indicator to data for automatic update
        algorithm.register_indicator(security.symbol, security['logr'], security['consolidator'])
        algorithm.subscription_manager.add_consolidator(security.symbol, security['consolidator'])

        self.warm_up_indicator(security)

    def warm_up_indicator(self, security):
        self.reset(security)
        security['consolidator'] = reset_and_warm_up(self.algorithm, security, self.resolution, self.lookback)

    def reset(self, security):
        security['logr'].reset()
        security['window'].reset()

    def dispose_security_data(self, algorithm, security):
        self.reset(security)
        algorithm.subscription_manager.remove_consolidator(security.symbol, security['consolidator'])

    def should_create_new_target(self, symbol, direction):
        quantity = self.algorithm.portfolio[symbol].quantity
        return quantity == 0 or direction != int(np.sign(quantity))

    def returns(self, security):
        return pd.Series(
            data = [x.value for x in security['window']],
            index = [x.end_time for x in security['window']])[::-1]
#region imports
from AlgorithmImports import *
#endregion

class SectorETFUniverseSelectionModel(ETFConstituentsUniverseSelectionModel):
    def __init__(self, universe_settings: UniverseSettings = None) -> None:
        # Select the tech sector ETF constituents to get correlated assets
        symbol = Symbol.create("IYM", SecurityType.EQUITY, Market.USA)
        super().__init__(symbol, universe_settings, self.etf_constituents_filter)

    def etf_constituents_filter(self, constituents: List[ETFConstituentData]) -> List[Symbol]:
        # Get the 10 securities with the largest weight in the index to reduce slippage and keep speed of the algorithm
        selected = sorted([c for c in constituents if c.weight], 
                          key=lambda c: c.weight, reverse=True)
        return [c.symbol for c in selected[:10]]
#region imports
from AlgorithmImports import *
#endregion

def reset_and_warm_up(algorithm, security, resolution, lookback = None):
    indicator = security['logr']
    consolidator = security['consolidator']

    if not lookback:
        lookback = indicator.warm_up_period

    # historical request to update the consolidator that will warm up the indicator
    history = algorithm.history[consolidator.input_type](security.symbol, lookback, resolution,
        data_normalization_mode = DataNormalizationMode.SCALED_RAW)

    indicator.reset()
    
    # Replace the consolidator, since we cannot reset it
    #
    # 
    # 
    # 
    # 
    #  
    # Not ideal since we don't the consolidator type and period
    algorithm.subscription_manager.remove_consolidator(security.symbol, consolidator)
    consolidator = TradeBarConsolidator(timedelta(1))
    algorithm.register_indicator(security.symbol, indicator, consolidator)
    
    for bar in list(history)[:-1]:
        consolidator.update(bar)

    return consolidator

'''
# In main.py, OnData and call HandleCorporateActions for framework models (if necessary)
    def on_data(self, slice):
        if slice.splits or slice.dividends:
            self.alpha.handle_corporate_actions(self, slice)
            self.pcm.handle_corporate_actions(self, slice)
            self.risk.handle_corporate_actions(self, slice)
            self.execution.handle_corporate_actions(self, slice)

# In the framework models, add
from utils import ResetAndWarmUp

and implement HandleCorporateActions. E.g.:
    def handle_corporate_actions(self, algorithm, slice):
        for security.symbol, data in self.security.symbol_data.items():
            if slice.splits.contains_key(security.symbol) or slice.dividends.contains_key(security.symbol):
                data.warm_up_indicator()

where WarmUpIndicator will call ResetAndWarmUp for each indicator/consolidator pair
'''