Overall Statistics
Total Trades
508
Average Win
2.32%
Average Loss
-2.03%
Compounding Annual Return
3.959%
Drawdown
12.200%
Expectancy
0.102
Net Profit
41.070%
Sharpe Ratio
0.465
Loss Rate
49%
Win Rate
51%
Profit-Loss Ratio
1.15
Alpha
0.033
Beta
0.48
Annual Standard Deviation
0.09
Annual Variance
0.008
Information Ratio
0.249
Tracking Error
0.09
Treynor Ratio
0.087
Total Fees
$3530.59
import numpy as np
from scipy import stats
from statsmodels.distributions.empirical_distribution import ECDF
from scipy.stats import kendalltau, pearsonr, spearmanr
from scipy.optimize import minimize
from scipy.integrate import quad
import sys

# Xin: 
# (1) Used Scheduled Events to replace OnData() to avoid MarketOrder being converted into MarketOpenOrder. This fixed the weired
# quantity returned from self.CalculateOrderQuantity
# (2) In self.CalculateOrderQuantity, algo should input signed percentage, e.g., -0.4 or 0.4
# (3) Added a constraint on the max order quantity to permanently avoid insufficient capital issue

class CopulaPairsTradingAlgorithm(QCAlgorithm):

    def Initialize(self):
        ## Initialise the data and resolution required, as well as the cash and start-end dates for your algorithm. All algorithms must initialized.
        
        self.SetStartDate(2010,1,1)
        self.SetEndDate(2018,11,11)
        self.SetCash(100000)
        
        self.numdays = 1000       # length of formation period which determine the copula we use
        self.lookbackdays = 250   # length of history data in trading period
        self.cap_CL = 0.95        # cap confidence level
        self.floor_CL = 0.05      # floor confidence level
        
        self.ticker = self._pair_selection()
        
        ## Pull history daily close data of past one year
        self.syl = []
        for i in self.ticker:
            equity = self.AddSecurity(SecurityType.Equity, i, Resolution.Daily)
            self.syl.append(equity.Symbol.Value)
    
        self.price_list = {}
        self.price_list[self.syl[0]] = []
        self.price_list[self.syl[1]] = []
        
        history = self.History(self.syl, self.numdays,Resolution.Daily)
        close = history['close'].unstack(level=0) 
        logreturn = (np.log(close) - np.log(close.shift(1))).dropna()
        x, y = logreturn[self.ticker[0]], logreturn[self.ticker[1]]        
        logreturn={} # A dictionary to store the history log return series of paired stocks 
        
        ''' Convert the two returns series to two uniform values u and v using the empirical distribution functions '''

        ecdf_x, ecdf_y  = ECDF(x), ECDF(y)
        u, v = [ecdf_x(a) for a in x], [ecdf_y(a) for a in y]

        # compute the Akaike Information Criterion (AIC) for different copulas and choose copula with minimum AIC
        
        self.family = ['clayton', 'frank', 'gumbel']
        tau = kendalltau(x, y)[0]  # estimate Kendall'rank correlation
        AIC ={}  # generate a dict with key being the copula family, value = [theta, AIC]
        for i in self.family:
            lpdf = [self._lpdf_copula(i, self._parameter(i,tau), x, y) for (x, y) in zip(u, v)]
            # Replace nan with zero and inf with finite numbers in lpdf list 
            lpdf = np.nan_to_num(lpdf) 
            loglikelihood = sum(lpdf)
            AIC[i] = [self._parameter(i,tau), -2*loglikelihood + 2]
        # choose the copula with the minimum AIC  
        self.copula = min(AIC.items(), key = lambda x: x[1][1])[0]  
        self.Log(str(self.copula))
        self.Log(str(AIC))
        
        # schedule an event to fire on first day each month for a security 
        # the action compute the mispricing indices to generate the trading signals 
        self.Schedule.On(self.DateRules.MonthStart(self.syl[0]), self.TimeRules.AfterMarketOpen(self.syl[0]), self._set_signal)

        # schedule an event to trade every day after market open
        self.AddEquity("SPY", Resolution.Daily)
        self.Schedule.On(self.DateRules.EveryDay("SPY"), self.TimeRules.AfterMarketOpen("SPY", 30), self.EveryDayAfterMarketOpen)

    def OnData(self,data):
        self.data = data
        
    def EveryDayAfterMarketOpen(self):

        if self.data.Bars.Count < 2: return
        for i in self.syl:
            if self.data.Bars.ContainsKey(i):
                self.price_list[i].append(self.Portfolio[i].Price)
        
        # compute today's log return of 2 stocks
        if len(self.price_list[self.syl[0]]) < 2 or len(self.price_list[self.syl[1]]) < 2: return
        else: 
            return_x = np.log(self.price_list[self.syl[0]][-1]/self.price_list[self.syl[0]][-2])
            return_y = np.log(self.price_list[self.syl[1]][-1]/self.price_list[self.syl[1]][-2])
        
        # Convert the two returns to uniform values u and v using the empirical distribution functions
        u_value = self.ecdf_x(return_x)
        v_value = self.ecdf_y(return_y)
        
        # Compute the mispricing indices for u and v by using estimated copula
        self._misprice_index(self.copula, self.theta, u_value, v_value)

        # If v is overpriced: sell v and buy u
        if self.MI_u_v < self.floor_CL and self.MI_v_u > self.cap_CL:
            # If held long position in v and short position in u, liquidate and then place orders
            if self.Portfolio[self.syl[0]].Quantity < 0 and self.Portfolio[self.syl[1]].Quantity > 0:
                self.Liquidate()
                negQuantity = self.CalculateOrderQuantity(self.syl[1], -0.4) # return negative quantity for defined holding
                max_u = self.Portfolio.TotalPortfolioValue / self.Portfolio[self.syl[0]].Price # max quantity of u can be traded given available buying power
                max_v = max_u / self.coef # the corresponding max quantity of v can be traded
                quantity_v = min( -negQuantity, max_v )
                self.Sell(self.syl[1], 1 * quantity_v ) 
                self.Buy(self.syl[0], self.coef * quantity_v) 
            # Else if held short position in v and long position in u, then short more v and long more u
            else:
                negQuantity = self.CalculateOrderQuantity(self.syl[1], -0.4) # return net signed quantity needed
                # if v has already reached 0.4 percentage holding in short position, then return
                if negQuantity >= 0:
                    return
                # otherwise, short net v and long net u
                max_u = self.Portfolio.TotalPortfolioValue / self.Portfolio[self.syl[0]].Price # max quantity of u can be traded given available buying power
                max_v = max_u / self.coef # the corresponding max quantity of v can be traded
                quantity_v = min( -negQuantity, max_v - abs(self.Portfolio[self.syl[1]].Quantity) ) # max net quantity of v can be sold
                self.Sell(self.syl[1], 1 * quantity_v ) 
                self.Buy(self.syl[0], self.coef * quantity_v )    
                
        # Else if u is overpriced: sell u and buy v
        elif self.MI_u_v > self.cap_CL and self.MI_v_u < self.floor_CL:
            # If held long position in u and short position in v, liquidate and then place orders
            if self.Portfolio[self.syl[0]].Quantity > 0 and self.Portfolio[self.syl[1]].Quantity < 0:
                self.Liquidate()
                posQuantity = self.CalculateOrderQuantity(self.syl[1], 0.4) # return positive quantity for defined holding
                max_u = self.Portfolio.TotalPortfolioValue / self.Portfolio[self.syl[0]].Price # max quantity of u can be traded given available buying power
                max_v = max_u / self.coef # the corresponding max quantity of v can be traded
                quantity_v = min( posQuantity, max_v )
                self.Buy(self.syl[1], 1 * quantity_v ) 
                self.Sell(self.syl[0], self.coef * quantity_v )  
            # Else if held short position in u and long position in v, then short more u and long more v
            else:
                posQuantity = self.CalculateOrderQuantity(self.syl[1], 0.4) # return positive quantity for defined holding
                # if v has already reached 0.4 percentage holding in long position, then return
                if posQuantity <= 0:
                    return
                max_u = self.Portfolio.TotalPortfolioValue / self.Portfolio[self.syl[0]].Price # max quantity of u can be traded given available buying power
                max_v = max_u / self.coef # the corresponding max quantity of v can be traded
                quantity_v = min( posQuantity, max_v - self.Portfolio[self.syl[1]].Quantity )  # max net quantity can be bought           
                self.Buy(self.syl[1], 1 * quantity_v ) 
                self.Sell(self.syl[0], self.coef * quantity_v ) 

    
    def _set_signal(self):
        
        history = self.History(self.ticker, self.lookbackdays,Resolution.Daily)
       
        # generate the log return series of paired stocks
        close = history['close'].unstack(level=0) 
        logreturn = (np.log(close) - np.log(close.shift(1))).dropna()
        x, y = logreturn[self.ticker[0]], logreturn[self.ticker[1]]
 
        # estimate Kendall'rank correlation each trading day
        tau = kendalltau(x, y)[0] 
        
        # etstimate the copula parameter: theta
        self.theta = self._parameter(self.copula, tau)
        
        # simulate the empirical distribution function for returns of two paired stocks
        self.ecdf_x, self.ecdf_y  = ECDF(x), ECDF(y) 
        
        # run linear regression over the two history return series
        self.coef = stats.linregress(x,y).slope
      
       
    def _parameter(self, family, tau):
        
        ''' 
        estimate the parameters for three kinds of Archimedean copulas
        According to association between Archimedean copulas and the Kendall rank correlation measure
        '''
        
        if  family == 'clayton':
            return 2*tau/(1-tau)
        
        elif family == 'frank':
            
            '''
            debye = quad(integrand, sys.float_info.epsilon, theta)[0]/theta  is first order Debye function
            frank_fun is the squared difference
            Minimize the frank_fun would give the parameter theta for the frank copula 
            ''' 
            
            integrand = lambda t: t/(np.exp(t)-1)  # generate the integrand
            frank_fun = lambda theta: ((tau - 1)/4.0  - (quad(integrand, sys.float_info.epsilon, theta)[0]/theta - 1)/theta)**2
            
            return minimize(frank_fun, 4, method='BFGS', tol=1e-5).x 
        
        elif family == 'gumbel':
            return 1/(1-tau)
            
    
    def _lpdf_copula(self, family, theta, u, v):
        
        # Estimate the log probability density function of three kinds of Archimedean copulas
        
        if  family == 'clayton':
            pdf = (theta+1) * ((u**(-theta)+v**(-theta)-1)**(-2-1/theta)) * (u**(-theta-1)*v**(-theta-1))
            
        elif family == 'frank':
            num = -theta * (np.exp(-theta)-1) * (np.exp(-theta*(u+v)))
            denom = ((np.exp(-theta*u)-1) * (np.exp(-theta*v)-1) + (np.exp(-theta)-1))**2
            pdf = num/denom
            
        elif family == 'gumbel':
            A = (-np.log(u))**theta + (-np.log(v))**theta
            c = np.exp(-A**(1/theta))
            pdf = c * (u*v)**(-1) * (A**(-2+2/theta)) * ((np.log(u)*np.log(v))**(theta-1)) * (1+(theta-1)*A**(-1/theta))
            
        return np.log(pdf)
    
    def _misprice_index(self, family, theta, u, v):
    
        '''
        Calculate mispricing index for every day in the trading period by using estimated copula
        Mispricing indices are the conditional probability P(U < u | V = v) and P(V < v | U = u)
        '''
    
        if  family == 'clayton':
            self.MI_u_v = v**(-theta-1) * (u**(-theta)+v**(-theta)-1)**(-1/theta-1) # P(U<u|V=v)
            self.MI_v_u = u**(-theta-1) * (u**(-theta)+v**(-theta)-1)**(-1/theta-1) # P(V<v|U=u)
    
        elif family == 'frank':
            A = (np.exp(-theta*u)-1) * (np.exp(-theta*v)-1) + (np.exp(-theta*v)-1)
            B = (np.exp(-theta*u)-1) * (np.exp(-theta*v)-1) + (np.exp(-theta*u)-1)
            C = (np.exp(-theta*u)-1) * (np.exp(-theta*v)-1) + (np.exp(-theta)-1)
            self.MI_u_v = B/C
            self.MI_v_u = A/C
        
        elif family == 'gumbel':
            A = (-np.log(u))**theta + (-np.log(v))**theta
            C_uv = np.exp(-A**(1/theta))   # C_uv is gumbel copula function C(u,v)
            self.MI_u_v = C_uv * (A**((1-theta)/theta)) * (-np.log(v))**(theta-1) * (1.0/v)
            self.MI_v_u = C_uv * (A**((1-theta)/theta)) * (-np.log(u))**(theta-1) * (1.0/u)
            
    def _pair_selection(self): 
        
        tick_syl =  [["QQQ","XME","TNA","FAS","XLF","EWC","QLD"],
                     ["XLK","EWG","TLT","FAZ","XLU","EWA","QID"]] 

        logreturn={}
        
        for i in range(2):
            syl = [self.AddSecurity(SecurityType.Equity, x, Resolution.Daily).Symbol.Value for x in tick_syl[i]] # list of symbol
            history = self.History(syl, self.lookbackdays, Resolution.Daily)
            # generate the log return series of paired stocks
            close = history['close'].unstack(level=0) 
            df_logreturn = (np.log(close) - np.log(close.shift(1))).dropna()
            for j in tick_syl[i]:
                logreturn[j] = df_logreturn[j]
        # estimate coefficients of different correlation measures 
        tau_coef,pr_coef,sr_coef= [],[],[]
        for i in range(len(tick_syl[i])):
            tik_x, tik_y= logreturn[tick_syl[0][i]], logreturn[tick_syl[1][i]]
            tau_coef.append(kendalltau(tik_x, tik_y)[0]) 
            pr_coef.append(pearsonr(tik_x, tik_y)[0]) 
            sr_coef.append(spearmanr(tik_x, tik_y)[0]) 
        index_max = tau_coef.index(max(tau_coef))    
        pair = [tick_syl[0][index_max],tick_syl[1][index_max]]
        self.Log("tau" + str(tau_coef))
        self.Log("pr" + str(pr_coef))
        self.Log("sr" + str(sr_coef))
        self.Log(str(pair))
        return pair