Overall Statistics
Total Orders
10
Average Win
1.26%
Average Loss
-0.72%
Compounding Annual Return
7.230%
Drawdown
1.900%
Expectancy
0.379
Start Equity
1000000
End Equity
1013597.73
Net Profit
1.360%
Sharpe Ratio
-0.088
Sortino Ratio
-0.107
Probabilistic Sharpe Ratio
51.555%
Loss Rate
50%
Win Rate
50%
Profit-Loss Ratio
1.76
Alpha
0
Beta
0
Annual Standard Deviation
0.046
Annual Variance
0.002
Information Ratio
1.102
Tracking Error
0.046
Treynor Ratio
0
Total Fees
$86.96
Estimated Strategy Capacity
$17000000.00
Lowest Capacity Asset
XLK RGRPZX100F39
Portfolio Turnover
7.09%
#region imports
from AlgorithmImports import *

from collections import deque
from statsmodels.distributions.empirical_distribution import ECDF
from scipy.stats import kendalltau
from scipy.optimize import minimize
from scipy.integrate import quad
from scipy import stats
import sys
#endregion

class CopulaPairsTradingAlphaModel(AlphaModel):

    _window = {} # stores historical price used to calculate trading day's stock return
    _coef = 0    # to be calculated: requested ratio of quantity_u / quantity_v
    _day = 0     # keep track of current day for daily rebalance
    _month = 0   # keep track of current month for monthly recalculation of optimal trading pair
    _pair = []   # stores the selected trading pair
    _duration = timedelta(90)

    def __init__(self, lookback_days, num_days, cap_CL, floor_CL, weight_v):
        self._lookback_days = lookback_days   # length of history data in trading period
        self._num_days = num_days             # length of formation period which determine the copula we use
        self._cap__c_l = cap_CL                 # cap confidence level
        self._floor__c_l = floor_CL             # floor confidence level
        self._weight_v = weight_v             # desired holding weight of asset v in the portfolio, adjusted to avoid insufficient buying power

    def update(self, algorithm: QCAlgorithm, slice: Slice) -> List[Insight]:
        insights = []

        self.set_signal(algorithm, slice)     # only executed at first day of each month

        # Daily rebalance
        if algorithm.time.day == self._day or slice.quote_bars.count == 0:
            return []
        
        long, short = self._pair[0], self._pair[1]

        if len(self._window[long]) < 2 or len(self._window[short]) < 2:
            return []
        
        # Compute the mispricing indices for u and v by using estimated copula
        m_i_u_v, m_i_v_u = self._misprice_index()

        # Placing orders: if long is relatively underpriced, buy the pair
        if m_i_u_v < self._floor__c_l and m_i_v_u > self._cap__c_l:
            
            insights.extend([
                Insight.price(short, self._duration, InsightDirection.DOWN, weight=self._weight_v),
                Insight.price(long, self._duration, InsightDirection.UP, weight=self._weight_v * self._coef * algorithm.portfolio[long].price / algorithm.portfolio[short].price),
            ])

        # Placing orders: if short is relatively underpriced, sell the pair
        elif m_i_u_v > self._cap__c_l and m_i_v_u < self._floor__c_l:

            insights.extend([
                Insight.price(short, self._duration, InsightDirection.UP, weight=self._weight_v),
                Insight.price(long, self._duration, InsightDirection.DOWN, weight=self._weight_v * self._coef * algorithm.portfolio[long].price / algorithm.portfolio[short].price),
            ])
        
        self._day = algorithm.time.day
        return insights

    def set_signal(self, algorithm, slice):
        '''Computes the mispricing indices to generate the trading signals.
        It's called on first day of each month'''

        if algorithm.time.month == self._month:
            return
        
        ## Compute the best copula
        
        # Pull historical log returns used to determine copula
        history = algorithm.history(self._pair, self._num_days, Resolution.DAILY, data_normalization_mode=DataNormalizationMode.SCALED_RAW)
        if history.empty:
            return
        history = history.close.unstack(level=0)
        logreturns = (np.log(history) - np.log(history.shift(1))).dropna()

        x, y = logreturns[str(self._pair[0])], logreturns[str(self._pair[1])]

        # Convert the two returns series to two uniform values u and v using the empirical distribution functions
        ecdf_x, ecdf_y  = ECDF(x), ECDF(y)
        u, v = [ecdf_x(a) for a in x], [ecdf_y(a) for a in y]
        
        # Compute the Akaike Information Criterion (AIC) for different copulas and choose copula with minimum AIC
        tau = kendalltau(x, y)[0]  # estimate Kendall'rank correlation
        AIC ={}  # generate a dict with key being the copula family, value = [theta, AIC]
        
        for i in ['clayton', 'frank', 'gumbel']:
            param = self._parameter(i, tau)
            lpdf = [self._lpdf_copula(i, param, x, y) for (x, y) in zip(u, v)]
            # Replace nan with zero and inf with finite numbers in lpdf list
            lpdf = np.nan_to_num(lpdf) 
            loglikelihood = sum(lpdf)
            AIC[i] = [param, -2 * loglikelihood + 2]
            
        # Choose the copula with the minimum AIC
        self.copula = min(AIC.items(), key = lambda x: x[1][1])[0]
        
        ## Compute the signals
        
        # Generate the log return series of the selected trading pair
        logreturns = logreturns.tail(self._lookback_days)
        x, y = logreturns[str(self._pair[0])], logreturns[str(self._pair[1])]
        
        # Estimate Kendall'rank correlation
        tau = kendalltau(x, y)[0] 
        
        # Estimate the copula parameter: theta
        self.theta = self._parameter(self.copula, tau)
        
        # Simulate the empirical distribution function for returns of selected trading pair
        self.ecdf_x, self.ecdf_y  = ECDF(x), ECDF(y) 
        
        # Run linear regression over the two history return series and return the desired trading size ratio
        self._coef = stats.linregress(x,y).slope
        
        self._month = algorithm.time.month
        
    def _parameter(self, family, tau):    
        ''' Estimate the parameters for three kinds of Archimedean copulas
        according to association between Archimedean copulas and the Kendall rank correlation measure
        '''

        if  family == 'clayton':
            return 2 * tau / (1 - tau)
        
        elif family == 'frank':
            
            integrand = lambda t: t / (np.exp(t) - 1)  # generate the integrand
            frank_fun = lambda theta: ((tau - 1) / 4.0  - (quad(integrand, sys.float_info.epsilon, theta)[0] / theta - 1) / theta) ** 2
            
            return minimize(frank_fun, 4, method='BFGS', tol=1e-5).x 
        
        elif family == 'gumbel':
            return 1 / (1 - tau)
            
    def _lpdf_copula(self, family, theta, u, v):       
        '''Estimate the log probability density function of three kinds of Archimedean copulas
        ''' 
        if  family == 'clayton':
            pdf = (theta + 1) * ((u ** (-theta) + v ** (-theta) - 1) ** (-2 - 1 / theta)) * (u ** (-theta - 1) * v ** (-theta - 1))
            
        elif family == 'frank':
            num = -theta * (np.exp(-theta) - 1) * (np.exp(-theta * (u + v)))
            denom = ((np.exp(-theta * u) - 1) * (np.exp(-theta * v) - 1) + (np.exp(-theta) - 1)) ** 2
            pdf = num / denom
            
        elif family == 'gumbel':
            A = (-np.log(u)) ** theta + (-np.log(v)) ** theta
            c = np.exp(-A ** (1 / theta))
            pdf = c * (u * v) ** (-1) * (A ** (-2 + 2 / theta)) * ((np.log(u) * np.log(v)) ** (theta - 1)) * (1 + (theta - 1) * A ** (-1 / theta))
            
        return np.log(pdf)

    def _misprice_index(self):    
        '''Calculate mispricing index for every day in the trading period by using estimated copula
        Mispricing indices are the conditional probability P(U < u | V = v) and P(V < v | U = u)''' 
        return_x = np.log(self._window[self._pair[0]][-1] / self._window[self._pair[0]][-2])
        return_y = np.log(self._window[self._pair[1]][-1] / self._window[self._pair[1]][-2])
        
        # Convert the two returns to uniform values u and v using the empirical distribution functions
        u = self.ecdf_x(return_x)
        v = self.ecdf_y(return_y)
        
        if self.copula == 'clayton':
            m_i_u_v = v ** (-self.theta - 1) * (u ** (-self.theta) + v ** (-self.theta) - 1) ** (-1 / self.theta - 1) # P(U<u|V=v)
            m_i_v_u = u ** (-self.theta - 1) * (u ** (-self.theta) + v ** (-self.theta) - 1) ** (-1 / self.theta - 1) # P(V<v|U=u)
    
        elif self.copula == 'frank':
            A = (np.exp(-self.theta * u) - 1) * (np.exp(-self.theta * v) - 1) + (np.exp(-self.theta * v) - 1)
            B = (np.exp(-self.theta * u) - 1) * (np.exp(-self.theta * v) - 1) + (np.exp(-self.theta * u) - 1)
            C = (np.exp(-self.theta * u) - 1) * (np.exp(-self.theta * v) - 1) + (np.exp(-self.theta) - 1)
            m_i_u_v = B / C
            m_i_v_u = A / C
        
        elif self.copula == 'gumbel':
            A = (-np.log(u)) ** self.theta + (-np.log(v)) ** self.theta
            c_uv = np.exp(-A ** (1 / self.theta))   # c_uv is gumbel copula function C(u,v)
            m_i_u_v = c_uv * (A ** ((1 - self.theta) / self.theta)) * (-np.log(v)) ** (self.theta - 1) * (1.0 / v)
            m_i_v_u = c_uv * (A ** ((1 - self.theta) / self.theta)) * (-np.log(u)) ** (self.theta - 1) * (1.0 / u)
            
        return m_i_u_v, m_i_v_u

    def on_securities_changed(self, algorithm: QCAlgorithm, changes: SecurityChanges) -> None:
        for security in changes.removed_securities:
            symbol = security.symbol
            self._window.pop(symbol)
            if security.invested:
                algorithm.liquidate(symbol, "Removed from Universe")
            algorithm.subscription_manager.remove_consolidator(security.symbol, security.consolidator)
        
        for security in changes.added_securities:
            self._window[security.symbol] = deque(maxlen = 2)
            
            security.consolidator = TradeBarConsolidator(timedelta(1))
            security.consolidator.data_consolidated += lambda _, consolidated_bar: self._window[consolidated_bar.symbol].append(consolidated_bar.close)
            algorithm.subscription_manager.add_consolidator(security.symbol, security.consolidator)
        
        self._pair = list(self._window.keys())
        
        # Warm up historical prices
        history = algorithm.history(self._pair, 2, Resolution.DAILY, data_normalization_mode=DataNormalizationMode.SCALED_RAW)
        history = history.close.unstack(level=0)
        for symbol in self._window:
            for i in range(2):
                self._window[symbol].append(history[str(symbol)][i])
#region imports
from AlgorithmImports import *

from universe import MaximumKendallTauUniverseSelectionModel
from alpha import CopulaPairsTradingAlphaModel
#endregion


class CopulaPairsTradingAlgorithm(QCAlgorithm):

    _undesired_symbols_from_previous_deployment = []
    _checked_symbols_from_previous_deployment = False
    _previous_expiry_time = None
    
    def initialize(self):
        self.set_start_date(2024, 1, 1)
        self.set_end_date(2024, 6, 1)
        self.set_cash(1_000_000)

        self.settings.minimum_order_margin_portfolio_percentage = 0
        
        self.set_brokerage_model(BrokerageName.INTERACTIVE_BROKERS_BROKERAGE, AccountType.MARGIN)

        lookback_days = self.get_parameter("lookback_days", 250) # length of history data in trading period

        self.universe_settings.data_normalization_mode = DataNormalizationMode.RAW
        self.universe_settings.schedule.on(self.date_rules.month_start())
        # Select optimal trading pair into the universe
        self.add_universe_selection(MaximumKendallTauUniverseSelectionModel(self, lookback_days))

        self.add_alpha(CopulaPairsTradingAlphaModel(
            lookback_days,
            self.get_parameter("num_days", 1_000),
            self.get_parameter("cap_CL", 0.95),
            self.get_parameter("floor_CL", 0.05),
            self.get_parameter("weight_v", 0.5)
        ))
        
        self.settings.rebalance_portfolio_on_security_changes = False
        self.settings.rebalance_portfolio_on_insight_changes = False
        self.set_portfolio_construction(InsightWeightingPortfolioConstructionModel(self._rebalance_func))
        
        self.add_risk_management(NullRiskManagementModel())
        
        self.set_execution(ImmediateExecutionModel())

        self.set_warm_up(timedelta(90))

    def _rebalance_func(self, time):
        # Rebalance when all of the following are true:
        # - There are new insights or old insights have been cancelled
        # - The algorithm isn't warming up
        # - There is QuoteBar data in the current slice
        latest_expiry_time = sorted([insight.close_time_utc for insight in self.insights], reverse=True)[0] if self.insights.count else None
        if self._previous_expiry_time != latest_expiry_time and not self.is_warming_up and self.current_slice.quote_bars.count > 0:
            self._previous_expiry_time = latest_expiry_time
            return time
        return None

    def on_data(self, data):
        # Exit positions that aren't backed by existing insights.
        # If you don't want this behavior, delete this method definition.
        if not self.is_warming_up and not self._checked_symbols_from_previous_deployment:
            for security_holding in self.portfolio.values():
                if not security_holding.invested:
                    continue
                symbol = security_holding.symbol
                if not self.insights.has_active_insights(symbol, self.utc_time):
                    self._undesired_symbols_from_previous_deployment.append(symbol)
            self._checked_symbols_from_previous_deployment = True
        
        for symbol in self._undesired_symbols_from_previous_deployment:
            if self.is_market_open(symbol):
                self.liquidate(symbol, tag="Holding from previous deployment that's no longer desired")
                self._undesired_symbols_from_previous_deployment.remove(symbol)
#region imports
from AlgorithmImports import *

from scipy.stats import kendalltau
#endregion


class MaximumKendallTauUniverseSelectionModel(ScheduledUniverseSelectionModel):
    
    _tickers = [  
        "QQQ", "XLK",
        "XME", "EWG", 
        "TNA", "TLT",
        "FAS", "FAZ",
        "XLF", "XLU",
        "EWC", "EWA",
        "QLD", "QID"
    ]

    def __init__(self, algorithm, lookback_days: int = 250) -> None:
        super().__init__(algorithm.date_rules.month_start(), algorithm.time_rules.midnight, self._select_universe)
        self._algorithm = algorithm
        self._lookback_days = lookback_days
        self._symbols = [Symbol.create(ticker, SecurityType.EQUITY, Market.USA) for ticker in self._tickers]

    def _select_universe(self,  date_time: datetime) -> List[Symbol]:
        history = self._algorithm.history(self._symbols, self._lookback_days, Resolution.DAILY, data_normalization_mode=DataNormalizationMode.SCALED_RAW)
        if history.empty:
            return Universe.UNCHANGED
        history = history.close.unstack(level=0)
        log_returns = (np.log(history) - np.log(history.shift(1))).dropna()
        
        tau = 0
        for i in range(0, len(self._symbols), 2):
            
            x = log_returns[self._symbols[i]]
            y = log_returns[self._symbols[i+1]]
            
            # Estimate Kendall rank correlation for each pair
            tau_ = kendalltau(x, y)[0]
            
            if tau > tau_:
                continue

            tau = tau_
            pair = self._symbols[i:i+2]
        
        return pair