Overall Statistics
Total Orders
88
Average Win
11.29%
Average Loss
-2.87%
Compounding Annual Return
13.670%
Drawdown
37.700%
Expectancy
1.095
Start Equity
100000
End Equity
222551.13
Net Profit
122.551%
Sharpe Ratio
0.41
Sortino Ratio
0.506
Probabilistic Sharpe Ratio
5.348%
Loss Rate
58%
Win Rate
42%
Profit-Loss Ratio
3.93
Alpha
0.098
Beta
0.105
Annual Standard Deviation
0.261
Annual Variance
0.068
Information Ratio
0.083
Tracking Error
0.3
Treynor Ratio
1.021
Total Fees
$167.60
Estimated Strategy Capacity
$860000000.00
Lowest Capacity Asset
AAPL R735QTJ8XC9X
Portfolio Turnover
0.57%
#region imports
from AlgorithmImports import *
#endregion
#https://www.quantconnect.com/tutorials/strategy-library/pairs-trading-copula-vs-cointegration
import numpy as np
from scipy import stats
from statsmodels.distributions.empirical_distribution import ECDF
from scipy.stats import kendalltau, pearsonr, spearmanr
from scipy.optimize import minimize
from scipy.integrate import quad
import sys
from collections import deque



class OptimizedMultidimensionalReplicator(QCAlgorithm):

    def Initialize(self):
        self.SetStartDate(2018, 1, 1)
        self.SetEndDate(2024,3,31)
        self.SetCash(100000)
        self.AddEquity("AAPL", Resolution.Daily)
        self.AddEquity("TSLA", Resolution.Daily)
        
        self.numdays = 50       
        self.lookbackdays = 100   
        self.cap_CL = 0.95        
        self.floor_CL = 0.05      
        self.weight_v = 0.5       
        self.coef = 0             
        self.day = 0              
        self.month = 0 

        self.window_1 = RollingWindow[TradeBar](3)
        self.window_2 = RollingWindow[TradeBar](3)
        
        self.pair = ["AAPL", "TSLA"]
        self.copula = 'clayton'


    
    def OnData(self, slice):
        self.SetSignal(slice)
        self.Plot("COEFICIENTS", "Slopes", self.coef) 
        self.Plot("KENDALL CORRELATION", "Theta", self.theta) 
        self.Debug(f"OptimalCopula:{self.copula}") 
        
        self._misprice_index(slice)
              
        
    
    def _get_historical_returns(self, symbol, period):
        history = self.History(symbol, period, Resolution.Daily)
        history = history.close.unstack(level=0)
        return (np.log(history) - np.log(history.shift(1))).dropna() 
        
        
    def _parameter(self, family, tau):
        if  family == 'clayton':
            return 2 * tau / (1 - tau)
        elif family == 'frank':
            integrand = lambda t: t / (np.exp(t) - 1)
            frank_fun = lambda theta: ((tau - 1) / 4.0  - (quad(integrand, sys.float_info.epsilon, theta)[0] / theta - 1) / theta) ** 2
            return minimize(frank_fun, 4, method='BFGS', tol=1e-5).x 
        elif family == 'gumbel':
            return 1 / (1 - tau)
            
    def _lpdf_copula(self, family, theta, u, v):
        if  family == 'clayton':
            pdf = (theta + 1) * ((u ** (-theta) + v ** (-theta) - 1) ** (-2 - 1 / theta)) * (u ** (-theta - 1) * v ** (-theta - 1))
        elif family == 'frank':
            num = -theta * (np.exp(-theta) - 1) * (np.exp(-theta * (u + v)))
            denom = ((np.exp(-theta * u) - 1) * (np.exp(-theta * v) - 1) + (np.exp(-theta) - 1)) ** 2
            pdf = num / denom
        elif family == 'gumbel':
            A = (-np.log(u)) ** theta + (-np.log(v)) ** theta
            c = np.exp(-A ** (1 / theta))
            pdf = c * (u * v) ** (-1) * (A ** (-2 + 2 / theta)) * ((np.log(u) * np.log(v)) ** (theta - 1)) * (1 + (theta - 1) * A ** (-1 / theta))
        return np.log(pdf)
        
        
    def _misprice_index(self, slice):
        
        if self.CurrentSlice["AAPL"] is not None:
            self.window_1.Add(self.CurrentSlice["AAPL"])
        if self.CurrentSlice["TSLA"] is not None:
            self.window_2.Add(self.CurrentSlice["TSLA"])
        
        if not self.window_1.IsReady and not self.window_2.IsReady:
            return
        
        return_x =np.log(self.window_1[0].Close / self.window_1[1].Close)
        return_y =np.log(self.window_2[0].Close / self.window_2[1].Close)
        
        u = self.ecdf_x(return_x)
        v = self.ecdf_y(return_y)
        
        if self.copula == 'clayton':
            MI_u_v = v ** (-self.theta - 1) * (u ** (-self.theta) + v ** (-self.theta) - 1) ** (-1 / self.theta - 1) 
            MI_v_u = u ** (-self.theta - 1) * (u ** (-self.theta) + v ** (-self.theta) - 1) ** (-1 / self.theta - 1) 
        elif self.copula == 'frank':
            A = (np.exp(-self.theta * u) - 1) * (np.exp(-self.theta * v) - 1) + (np.exp(-self.theta * v) - 1)
            B = (np.exp(-self.theta * u) - 1) * (np.exp(-self.theta * v) - 1) + (np.exp(-self.theta * u) - 1)
            C = (np.exp(-self.theta * u) - 1) * (np.exp(-self.theta * v) - 1) + (np.exp(-self.theta) - 1)
            MI_u_v = B / C
            MI_v_u = A / C
        elif self.copula == 'gumbel':
            A = (-np.log(u)) ** self.theta + (-np.log(v)) ** self.theta
            C_uv = np.exp(-A ** (1 / self.theta))   
            MI_u_v = C_uv * (A ** ((1 - self.theta) / self.theta)) * (-np.log(v)) ** (self.theta - 1) * (1.0 / v)
            MI_v_u = C_uv * (A ** ((1 - self.theta) / self.theta)) * (-np.log(u)) ** (self.theta - 1) * (1.0 / u)
        
        
        self.Debug(f" First Misprice :{MI_v_u}")
        self.Debug(f" Second Misprice :{MI_u_v}")
        
        if MI_u_v < self.floor_CL and MI_v_u > self.cap_CL:
            self.SetHoldings("AAPL", -self.weight_v, False, f'Coef: {self.coef}')
            self.SetHoldings("TSLA", self.weight_v * self.coef * self.Portfolio["AAPL"].Price / self.Portfolio["TSLA"].Price)
        elif MI_u_v > self.cap_CL and MI_v_u < self.floor_CL:
            self.SetHoldings("TSLA", self.weight_v, False, f'Coef: {self.coef}')
            self.SetHoldings("AAPL", -self.weight_v * self.coef * self.Portfolio["AAPL"].Price / self.Portfolio["TSLA"].Price)
        self.day = self.Time.day
        
        
        
    def SetSignal(self, slice):
        if self.Time.month == self.month:
            return
        logreturns = self._get_historical_returns(self.pair, self.numdays)
        x, y = logreturns[self.pair[0]], logreturns[self.pair[1]]
        ecdf_x, ecdf_y  = ECDF(x), ECDF(y)
        u, v = [ecdf_x(a) for a in x], [ecdf_y(a) for a in y]
        tau = kendalltau(x, y)[0] 
        AIC ={}  
        for i in ['clayton', 'frank', 'gumbel']:
            param = self._parameter(i, tau)
            lpdf = [self._lpdf_copula(i, param, x, y) for (x, y) in zip(u, v)]
            lpdf = np.nan_to_num(lpdf) 
            loglikelihood = sum(lpdf)
            AIC[i] = [param, -2 * loglikelihood + 2]
            
        self.copula = min(AIC.items(), key = lambda x: x[1][1])[0]
        
        logreturns = logreturns.tail(self.lookbackdays)
        x, y = logreturns[self.pair[0]], logreturns[self.pair[1]]
        tau = kendalltau(x, y)[0] 
        
        self.theta = self._parameter(self.copula, tau)
        self.ecdf_x, self.ecdf_y  = ECDF(x), ECDF(y) 
        self.coef = stats.linregress(x,y).slope
        self.month = self.Time.month