Overall Statistics |
Total Orders 145 Average Win 0.63% Average Loss -0.62% Compounding Annual Return -10.147% Drawdown 14.600% Expectancy -0.124 Start Equity 1000000 End Equity 898002.18 Net Profit -10.200% Sharpe Ratio -1.434 Sortino Ratio -1.299 Probabilistic Sharpe Ratio 1.002% Loss Rate 57% Win Rate 43% Profit-Loss Ratio 1.02 Alpha 0 Beta 0 Annual Standard Deviation 0.086 Annual Variance 0.007 Information Ratio -0.815 Tracking Error 0.086 Treynor Ratio 0 Total Fees $2316.07 Estimated Strategy Capacity $1500000.00 Lowest Capacity Asset PX R735QTJ8XC9X Portfolio Turnover 29.22% |
# region imports from AlgorithmImports import * from universe import SectorETFUniverseSelectionModel from portfolio import CointegratedVectorPortfolioConstructionModel # endregion class ETFPairsTrading(QCAlgorithm): def initialize(self): self.set_start_date(2023, 3, 1) # Set Start Date self.set_end_date(2024, 3, 1) self.set_cash(1000000) # Set Strategy Cash lookback = self.get_parameter("lookback", 60) # lookback window on correlation & coinetgration threshold = self.get_parameter("threshold", 2) # we want at least 2+% expected profit margin to cover fees self.set_brokerage_model(BrokerageName.INTERACTIVE_BROKERS_BROKERAGE, AccountType.MARGIN) # This should be a intra-day strategy self.set_security_initializer(lambda security: security.set_margin_model(PatternDayTradingMarginModel())) self.universe_settings.resolution = Resolution.MINUTE self.universe_settings.data_normalization_mode = DataNormalizationMode.RAW self.set_universe_selection(SectorETFUniverseSelectionModel(self.universe_settings)) # This alpha model helps to pick the most correlated pair # and emit signal when they have mispricing that stay active for a predicted period # https://www.quantconnect.com/docs/v2/writing-algorithms/algorithm-framework/alpha/supported-models#09-Pearson-Correlation-Pairs-Trading-Model self.add_alpha(PearsonCorrelationPairsTradingAlphaModel(lookback, Resolution.DAILY, threshold=threshold)) # We try to use cointegrating vector to decide the relative movement magnitude of the paired assets self.pcm = CointegratedVectorPortfolioConstructionModel(self, lookback, Resolution.DAILY) self.pcm.rebalance_portfolio_on_security_changes = False self.set_portfolio_construction(self.pcm) # Avoid catastrophic loss in portfolio level by pair trading strategy #self.add_risk_management(MaximumDrawdownPercentPortfolio(0.08)) self.set_warm_up(timedelta(90)) def on_data(self, slice): if slice.splits or slice.dividends: self.pcm.handle_corporate_actions(self, slice)
#region imports from AlgorithmImports import * from Portfolio.EqualWeightingPortfolioConstructionModel import EqualWeightingPortfolioConstructionModel from arch.unitroot.cointegration import engle_granger from utils import reset_and_warm_up #endregion class CointegratedVectorPortfolioConstructionModel(EqualWeightingPortfolioConstructionModel): def __init__(self, algorithm, lookback = 252, resolution = Resolution.MINUTE, rebalance = Expiry.END_OF_WEEK) -> None: super().__init__(rebalance, PortfolioBias.LONG_SHORT) self.algorithm = algorithm self.lookback = lookback self.resolution = resolution def should_create_target_for_insight(self, insight: Insight) -> bool: # Ignore insights if the asset has open position in the same direction return self.should_create_new_target(insight.symbol, insight.direction) def determine_target_percent(self, active_insights: List[Insight]) -> Dict[Insight, float]: # If less than 2 active insights, no valid pair trading can be resulted if len(active_insights) < 2: self.live_log(self.algorithm, f'PortfolioContructionModel: Less then 2 insights. Create zero-quantity targets') return {insight: 0 for insight in active_insights} result = {} # Get log return for cointegrating vector regression logr = pd.DataFrame({symbol: self.returns(self.algorithm.securities[symbol]) for symbol in self.algorithm.securities.keys() if symbol in [x.symbol for x in active_insights]}) # fill nans with mean, if the whole column is nan, drop it logr = logr.fillna(logr.mean()).dropna(axis=1) # make sure we have at least 2 columns if logr.shape[1] < 2: self.live_log(self.algorithm, f'PortfolioContructionModel: Less then 2 insights. Create zero-quantity targets.') return {insight: 0 for insight in active_insights} # Obtain the cointegrating vector of all signaled assets for statistical arbitrage model = engle_granger(logr.iloc[:, 0], logr.iloc[:, 1:], trend='n', lags=0) # If result not significant, return if model.pvalue > 0.05: return {insight: 0 for insight in active_insights} # Normalization for budget constraint coint_vector = model.cointegrating_vector total_weight = sum(abs(coint_vector)) for insight, weight in zip(active_insights, coint_vector): # we can assume any paired assets' 2 dimensions in coint_vector are in opposite sign result[insight] = abs(weight) / total_weight * insight.direction return result def on_securities_changed(self, algorithm, changes): self.live_log(algorithm, f'PortfolioContructionModel.on_securities_changed: Changes: {changes}') super().on_securities_changed(algorithm, changes) for added in changes.added_securities: self.init_security_data(algorithm, added) for removed in changes.removed_securities: self.dispose_security_data(algorithm, removed) def handle_corporate_actions(self, algorithm, slice): symbols = set(slice.dividends.keys()) symbols.update(slice.splits.keys()) for symbol in symbols: self.warm_up_indicator(algorithm.securities[symbol]) def live_log(self, algorithm, message): if algorithm.live_mode: algorithm.log(message) def init_security_data(self, algorithm, security): # To store the historical daily log return security['window'] = RollingWindow[IndicatorDataPoint](self.lookback) # Use daily log return to predict cointegrating vector security['logr'] = LogReturn(1) security['logr'].updated += lambda _, updated: security['window'].add(IndicatorDataPoint(updated.end_time, updated.value)) security['consolidator'] = TradeBarConsolidator(timedelta(1)) # Subscribe the consolidator and indicator to data for automatic update algorithm.register_indicator(security.symbol, security['logr'], security['consolidator']) algorithm.subscription_manager.add_consolidator(security.symbol, security['consolidator']) self.warm_up_indicator(security) def warm_up_indicator(self, security): self.reset(security) security['consolidator'] = reset_and_warm_up(self.algorithm, security, self.resolution, self.lookback) def reset(self, security): security['logr'].reset() security['window'].reset() def dispose_security_data(self, algorithm, security): self.reset(security) algorithm.subscription_manager.remove_consolidator(security.symbol, security['consolidator']) def should_create_new_target(self, symbol, direction): quantity = self.algorithm.portfolio[symbol].quantity return quantity == 0 or direction != int(np.sign(quantity)) def returns(self, security): return pd.Series( data = [x.value for x in security['window']], index = [x.end_time for x in security['window']])[::-1]
#region imports from AlgorithmImports import * #endregion class SectorETFUniverseSelectionModel(ETFConstituentsUniverseSelectionModel): def __init__(self, universe_settings: UniverseSettings = None) -> None: # Select the tech sector ETF constituents to get correlated assets symbol = Symbol.create("IYM", SecurityType.EQUITY, Market.USA) super().__init__(symbol, universe_settings, self.etf_constituents_filter) def etf_constituents_filter(self, constituents: List[ETFConstituentData]) -> List[Symbol]: # Get the 10 securities with the largest weight in the index to reduce slippage and keep speed of the algorithm selected = sorted([c for c in constituents if c.weight], key=lambda c: c.weight, reverse=True) return [c.symbol for c in selected[:10]]
#region imports from AlgorithmImports import * #endregion def reset_and_warm_up(algorithm, security, resolution, lookback = None): indicator = security['logr'] consolidator = security['consolidator'] if not lookback: lookback = indicator.warm_up_period # historical request to update the consolidator that will warm up the indicator history = algorithm.history[consolidator.input_type](security.symbol, lookback, resolution, data_normalization_mode = DataNormalizationMode.SCALED_RAW) indicator.reset() # Replace the consolidator, since we cannot reset it # # # # # # # Not ideal since we don't the consolidator type and period algorithm.subscription_manager.remove_consolidator(security.symbol, consolidator) consolidator = TradeBarConsolidator(timedelta(1)) algorithm.register_indicator(security.symbol, indicator, consolidator) for bar in list(history)[:-1]: consolidator.update(bar) return consolidator ''' # In main.py, OnData and call HandleCorporateActions for framework models (if necessary) def on_data(self, slice): if slice.splits or slice.dividends: self.alpha.handle_corporate_actions(self, slice) self.pcm.handle_corporate_actions(self, slice) self.risk.handle_corporate_actions(self, slice) self.execution.handle_corporate_actions(self, slice) # In the framework models, add from utils import ResetAndWarmUp and implement HandleCorporateActions. E.g.: def handle_corporate_actions(self, algorithm, slice): for security.symbol, data in self.security.symbol_data.items(): if slice.splits.contains_key(security.symbol) or slice.dividends.contains_key(security.symbol): data.warm_up_indicator() where WarmUpIndicator will call ResetAndWarmUp for each indicator/consolidator pair '''