Overall Statistics |
Total Orders 10 Average Win 1.26% Average Loss -0.72% Compounding Annual Return 7.230% Drawdown 1.900% Expectancy 0.379 Start Equity 1000000 End Equity 1013597.73 Net Profit 1.360% Sharpe Ratio -0.088 Sortino Ratio -0.107 Probabilistic Sharpe Ratio 51.555% Loss Rate 50% Win Rate 50% Profit-Loss Ratio 1.76 Alpha 0 Beta 0 Annual Standard Deviation 0.046 Annual Variance 0.002 Information Ratio 1.102 Tracking Error 0.046 Treynor Ratio 0 Total Fees $86.96 Estimated Strategy Capacity $17000000.00 Lowest Capacity Asset XLK RGRPZX100F39 Portfolio Turnover 7.09% |
#region imports from AlgorithmImports import * from collections import deque from statsmodels.distributions.empirical_distribution import ECDF from scipy.stats import kendalltau from scipy.optimize import minimize from scipy.integrate import quad from scipy import stats import sys #endregion class CopulaPairsTradingAlphaModel(AlphaModel): _window = {} # stores historical price used to calculate trading day's stock return _coef = 0 # to be calculated: requested ratio of quantity_u / quantity_v _day = 0 # keep track of current day for daily rebalance _month = 0 # keep track of current month for monthly recalculation of optimal trading pair _pair = [] # stores the selected trading pair _duration = timedelta(90) def __init__(self, lookback_days, num_days, cap_CL, floor_CL, weight_v): self._lookback_days = lookback_days # length of history data in trading period self._num_days = num_days # length of formation period which determine the copula we use self._cap__c_l = cap_CL # cap confidence level self._floor__c_l = floor_CL # floor confidence level self._weight_v = weight_v # desired holding weight of asset v in the portfolio, adjusted to avoid insufficient buying power def update(self, algorithm: QCAlgorithm, slice: Slice) -> List[Insight]: insights = [] self.set_signal(algorithm, slice) # only executed at first day of each month # Daily rebalance if algorithm.time.day == self._day or slice.quote_bars.count == 0: return [] long, short = self._pair[0], self._pair[1] if len(self._window[long]) < 2 or len(self._window[short]) < 2: return [] # Compute the mispricing indices for u and v by using estimated copula m_i_u_v, m_i_v_u = self._misprice_index() # Placing orders: if long is relatively underpriced, buy the pair if m_i_u_v < self._floor__c_l and m_i_v_u > self._cap__c_l: insights.extend([ Insight.price(short, self._duration, InsightDirection.DOWN, weight=self._weight_v), Insight.price(long, self._duration, InsightDirection.UP, weight=self._weight_v * self._coef * algorithm.portfolio[long].price / algorithm.portfolio[short].price), ]) # Placing orders: if short is relatively underpriced, sell the pair elif m_i_u_v > self._cap__c_l and m_i_v_u < self._floor__c_l: insights.extend([ Insight.price(short, self._duration, InsightDirection.UP, weight=self._weight_v), Insight.price(long, self._duration, InsightDirection.DOWN, weight=self._weight_v * self._coef * algorithm.portfolio[long].price / algorithm.portfolio[short].price), ]) self._day = algorithm.time.day return insights def set_signal(self, algorithm, slice): '''Computes the mispricing indices to generate the trading signals. It's called on first day of each month''' if algorithm.time.month == self._month: return ## Compute the best copula # Pull historical log returns used to determine copula history = algorithm.history(self._pair, self._num_days, Resolution.DAILY, data_normalization_mode=DataNormalizationMode.SCALED_RAW) if history.empty: return history = history.close.unstack(level=0) logreturns = (np.log(history) - np.log(history.shift(1))).dropna() x, y = logreturns[str(self._pair[0])], logreturns[str(self._pair[1])] # Convert the two returns series to two uniform values u and v using the empirical distribution functions ecdf_x, ecdf_y = ECDF(x), ECDF(y) u, v = [ecdf_x(a) for a in x], [ecdf_y(a) for a in y] # Compute the Akaike Information Criterion (AIC) for different copulas and choose copula with minimum AIC tau = kendalltau(x, y)[0] # estimate Kendall'rank correlation AIC ={} # generate a dict with key being the copula family, value = [theta, AIC] for i in ['clayton', 'frank', 'gumbel']: param = self._parameter(i, tau) lpdf = [self._lpdf_copula(i, param, x, y) for (x, y) in zip(u, v)] # Replace nan with zero and inf with finite numbers in lpdf list lpdf = np.nan_to_num(lpdf) loglikelihood = sum(lpdf) AIC[i] = [param, -2 * loglikelihood + 2] # Choose the copula with the minimum AIC self.copula = min(AIC.items(), key = lambda x: x[1][1])[0] ## Compute the signals # Generate the log return series of the selected trading pair logreturns = logreturns.tail(self._lookback_days) x, y = logreturns[str(self._pair[0])], logreturns[str(self._pair[1])] # Estimate Kendall'rank correlation tau = kendalltau(x, y)[0] # Estimate the copula parameter: theta self.theta = self._parameter(self.copula, tau) # Simulate the empirical distribution function for returns of selected trading pair self.ecdf_x, self.ecdf_y = ECDF(x), ECDF(y) # Run linear regression over the two history return series and return the desired trading size ratio self._coef = stats.linregress(x,y).slope self._month = algorithm.time.month def _parameter(self, family, tau): ''' Estimate the parameters for three kinds of Archimedean copulas according to association between Archimedean copulas and the Kendall rank correlation measure ''' if family == 'clayton': return 2 * tau / (1 - tau) elif family == 'frank': integrand = lambda t: t / (np.exp(t) - 1) # generate the integrand frank_fun = lambda theta: ((tau - 1) / 4.0 - (quad(integrand, sys.float_info.epsilon, theta)[0] / theta - 1) / theta) ** 2 return minimize(frank_fun, 4, method='BFGS', tol=1e-5).x elif family == 'gumbel': return 1 / (1 - tau) def _lpdf_copula(self, family, theta, u, v): '''Estimate the log probability density function of three kinds of Archimedean copulas ''' if family == 'clayton': pdf = (theta + 1) * ((u ** (-theta) + v ** (-theta) - 1) ** (-2 - 1 / theta)) * (u ** (-theta - 1) * v ** (-theta - 1)) elif family == 'frank': num = -theta * (np.exp(-theta) - 1) * (np.exp(-theta * (u + v))) denom = ((np.exp(-theta * u) - 1) * (np.exp(-theta * v) - 1) + (np.exp(-theta) - 1)) ** 2 pdf = num / denom elif family == 'gumbel': A = (-np.log(u)) ** theta + (-np.log(v)) ** theta c = np.exp(-A ** (1 / theta)) pdf = c * (u * v) ** (-1) * (A ** (-2 + 2 / theta)) * ((np.log(u) * np.log(v)) ** (theta - 1)) * (1 + (theta - 1) * A ** (-1 / theta)) return np.log(pdf) def _misprice_index(self): '''Calculate mispricing index for every day in the trading period by using estimated copula Mispricing indices are the conditional probability P(U < u | V = v) and P(V < v | U = u)''' return_x = np.log(self._window[self._pair[0]][-1] / self._window[self._pair[0]][-2]) return_y = np.log(self._window[self._pair[1]][-1] / self._window[self._pair[1]][-2]) # Convert the two returns to uniform values u and v using the empirical distribution functions u = self.ecdf_x(return_x) v = self.ecdf_y(return_y) if self.copula == 'clayton': m_i_u_v = v ** (-self.theta - 1) * (u ** (-self.theta) + v ** (-self.theta) - 1) ** (-1 / self.theta - 1) # P(U<u|V=v) m_i_v_u = u ** (-self.theta - 1) * (u ** (-self.theta) + v ** (-self.theta) - 1) ** (-1 / self.theta - 1) # P(V<v|U=u) elif self.copula == 'frank': A = (np.exp(-self.theta * u) - 1) * (np.exp(-self.theta * v) - 1) + (np.exp(-self.theta * v) - 1) B = (np.exp(-self.theta * u) - 1) * (np.exp(-self.theta * v) - 1) + (np.exp(-self.theta * u) - 1) C = (np.exp(-self.theta * u) - 1) * (np.exp(-self.theta * v) - 1) + (np.exp(-self.theta) - 1) m_i_u_v = B / C m_i_v_u = A / C elif self.copula == 'gumbel': A = (-np.log(u)) ** self.theta + (-np.log(v)) ** self.theta c_uv = np.exp(-A ** (1 / self.theta)) # c_uv is gumbel copula function C(u,v) m_i_u_v = c_uv * (A ** ((1 - self.theta) / self.theta)) * (-np.log(v)) ** (self.theta - 1) * (1.0 / v) m_i_v_u = c_uv * (A ** ((1 - self.theta) / self.theta)) * (-np.log(u)) ** (self.theta - 1) * (1.0 / u) return m_i_u_v, m_i_v_u def on_securities_changed(self, algorithm: QCAlgorithm, changes: SecurityChanges) -> None: for security in changes.removed_securities: symbol = security.symbol self._window.pop(symbol) if security.invested: algorithm.liquidate(symbol, "Removed from Universe") algorithm.subscription_manager.remove_consolidator(security.symbol, security.consolidator) for security in changes.added_securities: self._window[security.symbol] = deque(maxlen = 2) security.consolidator = TradeBarConsolidator(timedelta(1)) security.consolidator.data_consolidated += lambda _, consolidated_bar: self._window[consolidated_bar.symbol].append(consolidated_bar.close) algorithm.subscription_manager.add_consolidator(security.symbol, security.consolidator) self._pair = list(self._window.keys()) # Warm up historical prices history = algorithm.history(self._pair, 2, Resolution.DAILY, data_normalization_mode=DataNormalizationMode.SCALED_RAW) history = history.close.unstack(level=0) for symbol in self._window: for i in range(2): self._window[symbol].append(history[str(symbol)][i])
#region imports from AlgorithmImports import * from universe import MaximumKendallTauUniverseSelectionModel from alpha import CopulaPairsTradingAlphaModel #endregion class CopulaPairsTradingAlgorithm(QCAlgorithm): _undesired_symbols_from_previous_deployment = [] _checked_symbols_from_previous_deployment = False _previous_expiry_time = None def initialize(self): self.set_start_date(2024, 1, 1) self.set_end_date(2024, 6, 1) self.set_cash(1_000_000) self.settings.minimum_order_margin_portfolio_percentage = 0 self.set_brokerage_model(BrokerageName.INTERACTIVE_BROKERS_BROKERAGE, AccountType.MARGIN) lookback_days = self.get_parameter("lookback_days", 250) # length of history data in trading period self.universe_settings.data_normalization_mode = DataNormalizationMode.RAW self.universe_settings.schedule.on(self.date_rules.month_start()) # Select optimal trading pair into the universe self.add_universe_selection(MaximumKendallTauUniverseSelectionModel(self, lookback_days)) self.add_alpha(CopulaPairsTradingAlphaModel( lookback_days, self.get_parameter("num_days", 1_000), self.get_parameter("cap_CL", 0.95), self.get_parameter("floor_CL", 0.05), self.get_parameter("weight_v", 0.5) )) self.settings.rebalance_portfolio_on_security_changes = False self.settings.rebalance_portfolio_on_insight_changes = False self.set_portfolio_construction(InsightWeightingPortfolioConstructionModel(self._rebalance_func)) self.add_risk_management(NullRiskManagementModel()) self.set_execution(ImmediateExecutionModel()) self.set_warm_up(timedelta(90)) def _rebalance_func(self, time): # Rebalance when all of the following are true: # - There are new insights or old insights have been cancelled # - The algorithm isn't warming up # - There is QuoteBar data in the current slice latest_expiry_time = sorted([insight.close_time_utc for insight in self.insights], reverse=True)[0] if self.insights.count else None if self._previous_expiry_time != latest_expiry_time and not self.is_warming_up and self.current_slice.quote_bars.count > 0: self._previous_expiry_time = latest_expiry_time return time return None def on_data(self, data): # Exit positions that aren't backed by existing insights. # If you don't want this behavior, delete this method definition. if not self.is_warming_up and not self._checked_symbols_from_previous_deployment: for security_holding in self.portfolio.values(): if not security_holding.invested: continue symbol = security_holding.symbol if not self.insights.has_active_insights(symbol, self.utc_time): self._undesired_symbols_from_previous_deployment.append(symbol) self._checked_symbols_from_previous_deployment = True for symbol in self._undesired_symbols_from_previous_deployment: if self.is_market_open(symbol): self.liquidate(symbol, tag="Holding from previous deployment that's no longer desired") self._undesired_symbols_from_previous_deployment.remove(symbol)
#region imports from AlgorithmImports import * from scipy.stats import kendalltau #endregion class MaximumKendallTauUniverseSelectionModel(ScheduledUniverseSelectionModel): _tickers = [ "QQQ", "XLK", "XME", "EWG", "TNA", "TLT", "FAS", "FAZ", "XLF", "XLU", "EWC", "EWA", "QLD", "QID" ] def __init__(self, algorithm, lookback_days: int = 250) -> None: super().__init__(algorithm.date_rules.month_start(), algorithm.time_rules.midnight, self._select_universe) self._algorithm = algorithm self._lookback_days = lookback_days self._symbols = [Symbol.create(ticker, SecurityType.EQUITY, Market.USA) for ticker in self._tickers] def _select_universe(self, date_time: datetime) -> List[Symbol]: history = self._algorithm.history(self._symbols, self._lookback_days, Resolution.DAILY, data_normalization_mode=DataNormalizationMode.SCALED_RAW) if history.empty: return Universe.UNCHANGED history = history.close.unstack(level=0) log_returns = (np.log(history) - np.log(history.shift(1))).dropna() tau = 0 for i in range(0, len(self._symbols), 2): x = log_returns[self._symbols[i]] y = log_returns[self._symbols[i+1]] # Estimate Kendall rank correlation for each pair tau_ = kendalltau(x, y)[0] if tau > tau_: continue tau = tau_ pair = self._symbols[i:i+2] return pair