Overall Statistics |
Total Orders 88 Average Win 11.29% Average Loss -2.87% Compounding Annual Return 13.670% Drawdown 37.700% Expectancy 1.095 Start Equity 100000 End Equity 222551.13 Net Profit 122.551% Sharpe Ratio 0.41 Sortino Ratio 0.506 Probabilistic Sharpe Ratio 5.348% Loss Rate 58% Win Rate 42% Profit-Loss Ratio 3.93 Alpha 0.098 Beta 0.105 Annual Standard Deviation 0.261 Annual Variance 0.068 Information Ratio 0.083 Tracking Error 0.3 Treynor Ratio 1.021 Total Fees $167.60 Estimated Strategy Capacity $860000000.00 Lowest Capacity Asset AAPL R735QTJ8XC9X Portfolio Turnover 0.57% |
#region imports from AlgorithmImports import * #endregion #https://www.quantconnect.com/tutorials/strategy-library/pairs-trading-copula-vs-cointegration import numpy as np from scipy import stats from statsmodels.distributions.empirical_distribution import ECDF from scipy.stats import kendalltau, pearsonr, spearmanr from scipy.optimize import minimize from scipy.integrate import quad import sys from collections import deque class OptimizedMultidimensionalReplicator(QCAlgorithm): def Initialize(self): self.SetStartDate(2018, 1, 1) self.SetEndDate(2024,3,31) self.SetCash(100000) self.AddEquity("AAPL", Resolution.Daily) self.AddEquity("TSLA", Resolution.Daily) self.numdays = 50 self.lookbackdays = 100 self.cap_CL = 0.95 self.floor_CL = 0.05 self.weight_v = 0.5 self.coef = 0 self.day = 0 self.month = 0 self.window_1 = RollingWindow[TradeBar](3) self.window_2 = RollingWindow[TradeBar](3) self.pair = ["AAPL", "TSLA"] self.copula = 'clayton' def OnData(self, slice): self.SetSignal(slice) self.Plot("COEFICIENTS", "Slopes", self.coef) self.Plot("KENDALL CORRELATION", "Theta", self.theta) self.Debug(f"OptimalCopula:{self.copula}") self._misprice_index(slice) def _get_historical_returns(self, symbol, period): history = self.History(symbol, period, Resolution.Daily) history = history.close.unstack(level=0) return (np.log(history) - np.log(history.shift(1))).dropna() def _parameter(self, family, tau): if family == 'clayton': return 2 * tau / (1 - tau) elif family == 'frank': integrand = lambda t: t / (np.exp(t) - 1) frank_fun = lambda theta: ((tau - 1) / 4.0 - (quad(integrand, sys.float_info.epsilon, theta)[0] / theta - 1) / theta) ** 2 return minimize(frank_fun, 4, method='BFGS', tol=1e-5).x elif family == 'gumbel': return 1 / (1 - tau) def _lpdf_copula(self, family, theta, u, v): if family == 'clayton': pdf = (theta + 1) * ((u ** (-theta) + v ** (-theta) - 1) ** (-2 - 1 / theta)) * (u ** (-theta - 1) * v ** (-theta - 1)) elif family == 'frank': num = -theta * (np.exp(-theta) - 1) * (np.exp(-theta * (u + v))) denom = ((np.exp(-theta * u) - 1) * (np.exp(-theta * v) - 1) + (np.exp(-theta) - 1)) ** 2 pdf = num / denom elif family == 'gumbel': A = (-np.log(u)) ** theta + (-np.log(v)) ** theta c = np.exp(-A ** (1 / theta)) pdf = c * (u * v) ** (-1) * (A ** (-2 + 2 / theta)) * ((np.log(u) * np.log(v)) ** (theta - 1)) * (1 + (theta - 1) * A ** (-1 / theta)) return np.log(pdf) def _misprice_index(self, slice): if self.CurrentSlice["AAPL"] is not None: self.window_1.Add(self.CurrentSlice["AAPL"]) if self.CurrentSlice["TSLA"] is not None: self.window_2.Add(self.CurrentSlice["TSLA"]) if not self.window_1.IsReady and not self.window_2.IsReady: return return_x =np.log(self.window_1[0].Close / self.window_1[1].Close) return_y =np.log(self.window_2[0].Close / self.window_2[1].Close) u = self.ecdf_x(return_x) v = self.ecdf_y(return_y) if self.copula == 'clayton': MI_u_v = v ** (-self.theta - 1) * (u ** (-self.theta) + v ** (-self.theta) - 1) ** (-1 / self.theta - 1) MI_v_u = u ** (-self.theta - 1) * (u ** (-self.theta) + v ** (-self.theta) - 1) ** (-1 / self.theta - 1) elif self.copula == 'frank': A = (np.exp(-self.theta * u) - 1) * (np.exp(-self.theta * v) - 1) + (np.exp(-self.theta * v) - 1) B = (np.exp(-self.theta * u) - 1) * (np.exp(-self.theta * v) - 1) + (np.exp(-self.theta * u) - 1) C = (np.exp(-self.theta * u) - 1) * (np.exp(-self.theta * v) - 1) + (np.exp(-self.theta) - 1) MI_u_v = B / C MI_v_u = A / C elif self.copula == 'gumbel': A = (-np.log(u)) ** self.theta + (-np.log(v)) ** self.theta C_uv = np.exp(-A ** (1 / self.theta)) MI_u_v = C_uv * (A ** ((1 - self.theta) / self.theta)) * (-np.log(v)) ** (self.theta - 1) * (1.0 / v) MI_v_u = C_uv * (A ** ((1 - self.theta) / self.theta)) * (-np.log(u)) ** (self.theta - 1) * (1.0 / u) self.Debug(f" First Misprice :{MI_v_u}") self.Debug(f" Second Misprice :{MI_u_v}") if MI_u_v < self.floor_CL and MI_v_u > self.cap_CL: self.SetHoldings("AAPL", -self.weight_v, False, f'Coef: {self.coef}') self.SetHoldings("TSLA", self.weight_v * self.coef * self.Portfolio["AAPL"].Price / self.Portfolio["TSLA"].Price) elif MI_u_v > self.cap_CL and MI_v_u < self.floor_CL: self.SetHoldings("TSLA", self.weight_v, False, f'Coef: {self.coef}') self.SetHoldings("AAPL", -self.weight_v * self.coef * self.Portfolio["AAPL"].Price / self.Portfolio["TSLA"].Price) self.day = self.Time.day def SetSignal(self, slice): if self.Time.month == self.month: return logreturns = self._get_historical_returns(self.pair, self.numdays) x, y = logreturns[self.pair[0]], logreturns[self.pair[1]] ecdf_x, ecdf_y = ECDF(x), ECDF(y) u, v = [ecdf_x(a) for a in x], [ecdf_y(a) for a in y] tau = kendalltau(x, y)[0] AIC ={} for i in ['clayton', 'frank', 'gumbel']: param = self._parameter(i, tau) lpdf = [self._lpdf_copula(i, param, x, y) for (x, y) in zip(u, v)] lpdf = np.nan_to_num(lpdf) loglikelihood = sum(lpdf) AIC[i] = [param, -2 * loglikelihood + 2] self.copula = min(AIC.items(), key = lambda x: x[1][1])[0] logreturns = logreturns.tail(self.lookbackdays) x, y = logreturns[self.pair[0]], logreturns[self.pair[1]] tau = kendalltau(x, y)[0] self.theta = self._parameter(self.copula, tau) self.ecdf_x, self.ecdf_y = ECDF(x), ECDF(y) self.coef = stats.linregress(x,y).slope self.month = self.Time.month