Overall Statistics |
Total Orders 26 Average Win 1.02% Average Loss -0.61% Compounding Annual Return 13.440% Drawdown 1.800% Expectancy 0.561 Start Equity 1000000.00 End Equity 1061150.86 Net Profit 6.115% Sharpe Ratio 0.801 Sortino Ratio 0.867 Probabilistic Sharpe Ratio 76.132% Loss Rate 42% Win Rate 58% Profit-Loss Ratio 1.68 Alpha 0.037 Beta 0.006 Annual Standard Deviation 0.047 Annual Variance 0.002 Information Ratio -1.503 Tracking Error 0.102 Treynor Ratio 6.806 Total Fees $0.00 Estimated Strategy Capacity $13000000.00 Lowest Capacity Asset LTCUSD 2XR Portfolio Turnover 1.18% |
import numpy as np import pandas as pd import statsmodels.api as sm from datetime import datetime from AlgorithmImports import * class CointegrationAlgorithm(QCAlgorithm): def Initialize(self): self.SetStartDate(2024, 1, 1) # Set Start Date self.SetEndDate(2024, 6, 19) # Set End Date self.SetCash(1000000) # Set Strategy Cash self.set_brokerage_model(BrokerageName.QuantConnectBrokerage, AccountType.MARGIN) self.model = False self.model_date = None self.coef = None self.position = False self.upper_bound = None self.lower_bound = None self.pos_type = None self.sigma = 1.75 self.spread_val = None self.stop_loss_spread_val = None self.stop_loss_constant = 0.05 self.lev = 1 # Add Crypto assets self.symbol1 = "LTCUSD" self.symbol2 = "SOLUSD" self.a1 = self.AddCrypto(self.symbol1, Resolution.Daily, leverage=1.0).Symbol self.a2 = self.AddCrypto(self.symbol2, Resolution.Daily, leverage=1.0).Symbol # Warm-up period to collect enough historical data self.SetWarmUp(65) # Schedule a daily event to run the trading logic self.Schedule.On(self.DateRules.EveryDay(), self.TimeRules.At(12, 0), self.Rebalance) def Rebalance(self): if self.IsWarmingUp: return # Get historical data history = self.History([self.a1, self.a2], 65, Resolution.Daily) #get n days of data if history.empty: return # Prepare data for cointegration test prices = history.close.unstack(level=0) a1_prices = prices[self.a1] a2_prices = prices[self.a2] if self.model == True: self.calc_spread(a1_prices, a2_prices) self.debug(f"spread val: {self.spread_val}, lower: {self.lower_bound}, upper:{self.upper_bound}") #check if in any positions if self.position == False: # Perform cointegration test if existing model is not built if self.model == False: coint_result = sm.tsa.stattools.coint(a1_prices.diff().dropna(), a2_prices.diff().dropna()) if coint_result[1] < 0.05 and self.model == False: self.debug(f"{self.symbol1} and {self.symbol2} are cointegrated") self.TradePair(a1_prices, a2_prices, self.a1, self.a2) #return if self.model == True and self.position == False: self.calc_spread(a1_prices, a2_prices) self.check_spread(self.a1, self.a2) elif self.position == True: self.calc_spread(a1_prices, a2_prices) self.check_spread(self.a1, self.a2) if self.position == True: self.check_stop() #close out, been in trade too long elif self.model == True and self.position == True: n_days = 90 if (self.Time - self.model_date).days > n_days: self.SetHoldings(self.symbol1, 0) self.SetHoldings(self.symbol2, 0) self.debug(f"*****REBUILD AND Close pos past {n_days}: {self.spread_val}") self.clear_vars() #spread is too far out from current model, must rebuild elif self.model == True and self.position_opened == False: #only rebuild if not in a position if abs(self.spread_val) > abs(1.10*self.upper_bound) or abs(self.spread_val) < abs((1-.10)*self.lower_bound): self.debug('#####REBUILD THE MODEL spread is to farout from model') self.clear_vars() return def check_stop(self): if self.spread_val < self.stop_loss_spread_val and self.pos_type == "LONG": self.SetHoldings(self.symbol1, 0) self.SetHoldings(self.symbol2, 0) self.debug(f"Long pos stopped out: {self.spread_val}") self.clear_vars() elif self.spread_val > self.stop_loss_spread_val and self.pos_type == "SHORT": self.SetHoldings(self.symbol1, 0) self.SetHoldings(self.symbol2, 0) self.debug(f"Short pos stopped out: {self.spread_val}") self.clear_vars() return def check_spread(self, symbol1, symbol2): price1 = self.Securities[symbol1].Price #asset 1 price price2 = self.Securities[symbol2].Price #asset 2 price if self.position == True: #close out existing once target hit if self.pos_type == "LONG": if self.spread_val > self.upper_bound: self.SetHoldings(self.symbol1, 0) self.SetHoldings(self.symbol2, 0) self.debug(f"{self.symbol1}: {price1}, {self.symbol2}: {price2}") self.debug(f"Closing Long spread_value {self.spread_val} lower bound:{self.lower_bound}, upper bound: {self.upper_bound}, {self.symbol2}:{self.coef}") self.clear_vars() return elif self.pos_type == "SHORT": if self.spread_val < self.lower_bound: self.SetHoldings(self.symbol1, 0) self.SetHoldings(self.symbol2, 0) self.debug(f"{self.symbol1}: {price1}, {self.symbol2}: {price2}") self.debug(f"Closing Short spread_value {self.spread_val} lower bound:{self.lower_bound}, upper bound: {self.upper_bound}, {self.symbol2}:{self.coef}") self.clear_vars() return else: #enter new position if self.spread_val > self.upper_bound: self.calc_qty(price1, price2) self.market_order(self.symbol1, -1*self.lev) self.market_order(self.symbol2, self.coef*self.lev) self.debug(f"{self.symbol1}: {price1}, {self.symbol2}: {price2}, QTY Spread: {self.lev}, Cash avail:{self.portfolio.cash}") self.debug(f"Short spread spread_value {self.spread_val} lower bound:{self.lower_bound}, upper bound: {self.upper_bound}, buy {self.symbol2}:{self.coef}") self.position = True self.pos_type = "SHORT" self.stop_loss_spread_val = self.upper_bound + self.upper_bound * self.stop_loss_constant if self.spread_val > 0 else self.upper_bound - self.upper_bound * self.stop_loss_constant elif self.spread_val < self.lower_bound: self.calc_qty(price1, price2) # If spread is below mean - 1 standard deviation, buy the spread (buy symbol1, sell symbol2) self.market_order(self.symbol1, 1*self.lev) self.market_order(self.symbol2, -self.coef*self.lev) self.debug(f"{self.symbol1}: {price1}, {self.symbol2}: {price2}, QTY Spread: {self.lev}, Cash avail:{self.portfolio.cash}") self.debug(f"Long spread spread_value {self.spread_val} lower bound:{self.lower_bound}, upper bound: {self.upper_bound}, buy {self.symbol2}:{self.coef}") self.position = True self.pos_type = "LONG" self.stop_loss_spread_val = self.lower_bound - self.lower_bound * self.stop_loss_constant if self.spread_val > 0 else self.lower_bound + self.lower_bound * self.stop_loss_constant return def calc_qty(self, price1, price2): avail_cash = self.portfolio.cash pct_portfolio = 0.15 self.lev = (pct_portfolio * avail_cash)/(price1 + abs(self.coef)*price2) return def clear_vars(self): self.model = False self.coef = None self.position = False self.upper_bound = None self.lower_bound = None self.pos_type = None self.spread_val = None self.stop_loss_spread_val = None return def calc_spread(self, prices1, prices2): spread = prices1 - self.coef * prices2 self.spread_val = spread[-1] return def TradePair(self, prices1, prices2, symbol1, symbol2): price1 = self.Securities[symbol1].Price price2 = self.Securities[symbol2].Price # Calculate the hedge ratio using OLS model = sm.OLS(prices1, sm.add_constant(prices2)).fit() self.coef = model.params[1] self.model = True self.model_date = self.Time # Calculate the spread spread = prices1 - self.coef * prices2 spread_mean = spread.mean() spread_std = spread.std() self.upper_bound = spread_mean + self.sigma*spread_std self.lower_bound = spread_mean - self.sigma*spread_std # Get the latest spread value self.spread_val = spread[-1] def OnData(self, data): pass