Overall Statistics
Total Orders
482
Average Win
0.84%
Average Loss
-0.87%
Compounding Annual Return
0.997%
Drawdown
20.600%
Expectancy
0.023
Start Equity
1000000
End Equity
1063923.29
Net Profit
6.392%
Sharpe Ratio
-0.115
Sortino Ratio
-0.093
Probabilistic Sharpe Ratio
0.381%
Loss Rate
48%
Win Rate
52%
Profit-Loss Ratio
0.97
Alpha
0
Beta
0
Annual Standard Deviation
0.099
Annual Variance
0.01
Information Ratio
0.119
Tracking Error
0.099
Treynor Ratio
0
Total Fees
$17098.22
Estimated Strategy Capacity
$7500000.00
Lowest Capacity Asset
APD R735QTJ8XC9X
Portfolio Turnover
12.82%
# region imports
from AlgorithmImports import *
from universe import SectorETFUniverseSelectionModel
from portfolio import CointegratedVectorPortfolioConstructionModel
# endregion

class ETFPairsTrading(QCAlgorithm):

    def Initialize(self):
        self.SetStartDate(2018, 1, 1)  # Set Start Date
        self.SetEndDate(2024, 3, 31)
        self.SetCash(1000000)  #  Strategy Cash

        lookback = self.GetParameter("lookback", 100)   # lookback window on correlation & coinetgration
        threshold = self.GetParameter("threshold", 2)   # we want at least 2% expected profit margin to cover fees
        
        self.SetBrokerageModel(BrokerageName.InteractiveBrokersBrokerage, AccountType.Margin)
        
        self.UniverseSettings.Resolution = Resolution.Daily
        self.SetUniverseSelection(SectorETFUniverseSelectionModel(self.UniverseSettings))

        # This alpha model helps to pick the most correlated pair
        # and emit signal when they have mispricing that stay active for a predicted period
        # https://www.quantconnect.com/docs/v2/writing-algorithms/algorithm-framework/alpha/supported-models#09-Pearson-Correlation-Pairs-Trading-Model
        self.AddAlpha(PearsonCorrelationPairsTradingAlphaModel(lookback, Resolution.Daily, threshold=threshold))

        # We try to use cointegrating vector to decide the relative movement magnitude of the paired assets
        pcm = CointegratedVectorPortfolioConstructionModel(self, lookback, Resolution.Daily)
        pcm.RebalancePortfolioOnSecurityChanges = False

        self.AddRiskManagement(MaximumDrawdownPercentPortfolio(.20))
        self.SetPortfolioConstruction(pcm)

        self.SetWarmUp(timedelta(10))
#region imports
from AlgorithmImports import *
from Portfolio.EqualWeightingPortfolioConstructionModel import EqualWeightingPortfolioConstructionModel
from arch.unitroot.cointegration import engle_granger
#endregion

class CointegratedVectorPortfolioConstructionModel(EqualWeightingPortfolioConstructionModel):

    def __init__(self, algorithm, lookback = 252, resolution = Resolution.Minute, 
                 rebalance = Expiry.EndOfWeek, portfolioBias = PortfolioBias.LongShort) -> None:
        super().__init__(rebalance, portfolioBias)
        self.algorithm = algorithm
        self.lookback = lookback
        self.resolution = resolution
        self.symbol_data = {}

    def ShouldCreateTargetForInsight(self, insight: Insight) -> bool:
        # Ignore insights if the asset has open position in the same direction
        return self.symbol_data[insight.Symbol].ShouldCreateNewTarget(insight.Direction)

    def DetermineTargetPercent(self, activeInsights: List[Insight]) -> Dict[Insight, float]:
        result = {}

        # Reset indicators when corporate actions occur
        for symbol in self.algorithm.CurrentSlice.Splits.keys():
            if symbol in self.symbol_data:
                self.symbol_data[symbol].Reset()
                self.symbol_data[symbol].WarmUpIndicator()
        for symbol in self.algorithm.CurrentSlice.Dividends.keys():
            if symbol in self.symbol_data:
                self.symbol_data[symbol].Reset()
                self.symbol_data[symbol].WarmUpIndicator()

        # If less than 2 active insights, no valid pair trading can be resulted
        if len(activeInsights) < 2:
            self.LiveLog(self.algorithm, f'PortfolioContructionModel: Less then 2 insights. Create zero-quantity targets')
            return {insight: 0 for insight in activeInsights}

        # Get log return for cointegrating vector regression
        logr = pd.DataFrame({symbol: data.Return for symbol, data in self.symbol_data.items()
            if symbol in [x.Symbol for x in activeInsights]})
        # fill nans with mean, if the whole column is nan, drop it
        logr = logr.fillna(logr.mean()).dropna(axis=1)
        # make sure we have at least 2 columns
        if logr.shape[1] < 2:
            self.LiveLog(self.algorithm, f'PortfolioContructionModel: Less then 2 insights. Create zero-quantity targets.')
            return {insight: 0 for insight in activeInsights}
        # Obtain the cointegrating vector of all signaled assets for statistical arbitrage
        model = engle_granger(logr.iloc[:, 0], logr.iloc[:, 1:], trend='n', max_lags=1)

        # If result not significant, return
        if model.pvalue > 0.05:
            return {insight: 0 for insight in activeInsights}
        
        # Normalization for budget constraint
        coint_vector = model.cointegrating_vector
        total_weight = sum(abs(coint_vector))

        for insight, weight in zip(activeInsights, coint_vector):
            # we can assume any paired assets' 2 dimensions in coint_vector are in opposite sign
            result[insight] = abs(weight) / total_weight * insight.Direction
        
        return result
        
    def OnSecuritiesChanged(self, algorithm: QCAlgorithm, changes: SecurityChanges) -> None:
        self.LiveLog(algorithm, f'PortfolioContructionModel.OnSecuritiesChanged: Changes: {changes}')
        super().OnSecuritiesChanged(algorithm, changes)
        for removed in changes.RemovedSecurities:
            symbolData = self.symbol_data.pop(removed.Symbol, None)
            if symbolData:
                symbolData.Dispose()

        for added in changes.AddedSecurities:
            symbol = added.Symbol
            if symbol not in self.symbol_data:
                symbolData = self.SymbolData(algorithm, symbol, self.lookback, self.resolution)
                self.symbol_data[symbol] = symbolData

    def LiveLog(self, algorithm, message):
        if algorithm.LiveMode:
            algorithm.Log(message)

    class SymbolData:

        def __init__(self, algorithm, symbol, lookback, resolution):
            self.algorithm = algorithm
            self.symbol = symbol
            self.lookback = lookback
            self.resolution = resolution

            # To store the historical daily log return
            self.windows = RollingWindow[IndicatorDataPoint](lookback)

            # Use daily log return to predict cointegrating vector
            self.logr = LogReturn(1)
            self.logr.Updated += self.OnUpdate
            self.consolidator = TradeBarConsolidator(timedelta(1))

            # Subscribe the consolidator and indicator to data for automatic update
            algorithm.RegisterIndicator(symbol, self.logr, self.consolidator)
            algorithm.SubscriptionManager.AddConsolidator(symbol, self.consolidator)

            self.WarmUpIndicator()

        def WarmUpIndicator(self):
            # historical warm-up on the log return indicator
            history = self.algorithm.History[TradeBar](self.symbol, self.lookback, self.resolution)
            for bar in list(history)[:-1]:
                self.logr.Update(bar.EndTime, bar.Close)

        def OnUpdate(self, sender, updated):
            self.windows.Add(IndicatorDataPoint(updated.EndTime, updated.Value))

        def Dispose(self):
            self.logr.Updated -= self.OnUpdate
            self.Reset()
            self.algorithm.SubscriptionManager.RemoveConsolidator(self.symbol, self.consolidator)
        
        def Reset(self):
            self.logr.Reset()
            self.windows.Reset()

        def ShouldCreateNewTarget(self, direction):
            quantity = self.algorithm.Portfolio[self.symbol].Quantity
            return quantity == 0 or direction != np.sign(quantity)

        @property
        def Return(self):
            return pd.Series(
                data = [x.Value for x in self.windows],
                index = [x.EndTime.date() for x in self.windows])[::-1]
#region imports
from AlgorithmImports import *
#endregion

class SectorETFUniverseSelectionModel(ETFConstituentsUniverseSelectionModel):
    def __init__(self, universe_settings: UniverseSettings = None) -> None:
        # Select the tech sector ETF constituents to get correlated assets
        symbol = Symbol.Create("IYM", SecurityType.Equity, Market.USA)
        super().__init__(symbol, universe_settings, self.ETFConstituentsFilter)

    def ETFConstituentsFilter(self, constituents: List[ETFConstituentData]) -> List[Symbol]:
        # Get the 10 securities with the largest weight in the index to reduce slippage and keep speed of the algorithm
        selected = sorted([c for c in constituents if c.Weight], 
                          key=lambda c: c.Weight, reverse=True)
        return [c.Symbol for c in selected[:10]]