Overall Statistics |
Total Orders 48 Average Win 0.48% Average Loss -0.29% Compounding Annual Return 118.963% Drawdown 1.400% Expectancy 0.440 Start Equity 1000000.00 End Equity 1030517.38 Net Profit 3.052% Sharpe Ratio 6.616 Sortino Ratio 12.001 Probabilistic Sharpe Ratio 88.200% Loss Rate 46% Win Rate 54% Profit-Loss Ratio 1.66 Alpha 0.643 Beta 0.149 Annual Standard Deviation 0.113 Annual Variance 0.013 Information Ratio 0.412 Tracking Error 0.123 Treynor Ratio 5.006 Total Fees $0.00 Estimated Strategy Capacity $210000.00 Lowest Capacity Asset LTCUSD 2XR Portfolio Turnover 136.48% |
import numpy as np import pandas as pd import statsmodels.api as sm from datetime import datetime from AlgorithmImports import * from arch.unitroot import ADF from arch.unitroot import KPSS from arch.unitroot import PhillipsPerron from sklearn.linear_model import LinearRegression ###Minute Scale class CointegrationAlgorithm(QCAlgorithm): def Initialize(self): self.SetStartDate(2024, 6, 1) # Set Start Date self.SetEndDate(2024, 6, 14) # Set End Date self.SetCash(1000000) # Set Strategy Cash self.set_brokerage_model(BrokerageName.QuantConnectBrokerage, AccountType.MARGIN) self.formulation_hours = 21 #Hours to look back, optimized parameter self.model = False self.cointegrated = False self.model_date = None self.coef = None self.position = False self.upper_bound = None self.lower_bound = None self.mean = None self.pos_type = None self.sigma = 3.5 #upper/lower boundaries, optimized parameter self.spread_val = None self.stop_loss_spread_val = None self.stop_loss_constant = 0.05 self.lev = 1 self.trailing_spread_val = None self.trailing_constant = 0.02 # Add Crypto assets self.symbol1 = "LTCUSD" self.symbol2 = "SOLUSD" #self.symbol3 = "SOLUSD" self.symbols = ['LTCUSD','SOLUSD'] self.amount_assets = len(self.symbols) self.a1 = self.AddCrypto(self.symbol1, Resolution.Minute, leverage=1.0).Symbol self.a2 = self.AddCrypto(self.symbol2, Resolution.Minute, leverage=1.0).Symbol #self.a3 = self.AddCrypto(self.symbol3, Resolution.Minute, leverage=1.0).Symbol self.SetWarmUp(self.formulation_hours) def OnData(self, data): #goes through each minute #checks for warm-up period if self.is_warming_up: return current_time = self.Time current_hour = current_time.hour if current_hour >= 20 or current_hour <= 13: #if the hour of day is less than 18 #see if there is a model built if self.model == False and self.cointegrated == False: self.stationarity_tests() if self.model == False and self.cointegrated == True: self.create_model() else: #we have a model built and we check to see if our spread is at a boundary self.calculate_spread_value() #self.debug(f"spread val:{self.spread_val}, lower_bound:{self.lower_bound}, upper_bound:{self.upper_bound}") self.check_spread() #check to enter/exit trades if self.position == True: self.check_stop() pass else: #liquidate all positions and clear vars self.clear_vars() self.close_all_positions() self.debug(f"clearing vars and liquidating") pass def close_all_positions(self): for i in range(len(self.symbols)): self.SetHoldings(self.symbols[i], 0) return def check_spread(self): prices = [self.securities[symbol].Price for symbol in self.symbols] if self.position == True: #close out existing once target hit if self.pos_type == "LONG": if self.spread_val > self.upper_bound: for i in range(len(self.symbols)): self.SetHoldings(self.symbols[i], 0) self.debug(f"{self.symbol1}: {prices[0]}, {self.symbol2}: {prices[1]}, {self.symbol2}: {prices[1]}") self.debug(f"Closing Long spread_value {self.spread_val} lower bound:{self.lower_bound}, upper bound: {self.upper_bound}, coefs:{self.coef}") self.clear_vars_exit() return elif self.pos_type == "SHORT": if self.spread_val < self.lower_bound: for i in range(len(self.symbols)): self.SetHoldings(self.symbols[i], 0) self.debug(f"{self.symbol1}: {prices[0]}, {self.symbol2}: {prices[1]}, {self.symbol2}: {prices[1]}") self.debug(f"Closing Short spread_value {self.spread_val} lower bound:{self.lower_bound}, upper bound: {self.upper_bound}, coefs:{self.coef}") self.clear_vars_exit() return else: #enter new position if self.spread_val > self.upper_bound: self.calc_qty() for i in range(len(self.symbols)): self.market_order(self.symbols[i], -1*self.lev*self.coef[i]) self.debug(f"{self.symbol1}: {prices[0]}, {self.symbol2}: {prices[1]}, {self.symbol2}: {prices[1]} QTY Spread: {self.lev}, Cash avail:{self.portfolio.cash}") self.debug(f"Short spread spread_value {self.spread_val} lower bound:{self.lower_bound}, upper bound: {self.upper_bound}, buy {self.symbol2}:{self.coef}") self.position = True self.pos_type = "SHORT" self.stop_loss_spread_val = self.upper_bound + self.upper_bound * self.stop_loss_constant if self.spread_val > 0 else self.upper_bound - self.upper_bound * self.stop_loss_constant self.trailing_spread_val = self.stop_loss_spread_val elif self.spread_val < self.lower_bound: #we buy the spread self.calc_qty() for i in range(len(self.symbols)): self.market_order(self.symbols[i], self.lev*self.coef[i]) self.debug(f"{self.symbol1}: {prices[0]}, {self.symbol2}: {prices[1]}, {self.symbol2}: {prices[1]} QTY Spread: {self.lev}, Cash avail:{self.portfolio.cash}") self.debug(f"Long spread spread_value {self.spread_val} lower bound:{self.lower_bound}, upper bound: {self.upper_bound}, buy {self.symbol2}:{self.coef}") self.position = True self.pos_type = "LONG" self.stop_loss_spread_val = self.lower_bound - self.lower_bound * self.stop_loss_constant if self.spread_val > 0 else self.lower_bound + self.lower_bound * self.stop_loss_constant self.trailing_spread_val = self.stop_loss_spread_val return def check_stop(self): if self.spread_val < self.stop_loss_spread_val and self.pos_type == "LONG": for i in range(len(self.symbols)): self.SetHoldings(self.symbols[i], 0) self.debug(f"Long pos stopped out: {self.spread_val}") self.clear_vars() elif self.spread_val > self.stop_loss_spread_val and self.pos_type == "SHORT": for i in range(len(self.symbols)): self.SetHoldings(self.symbols[i], 0) self.debug(f"Short pos stopped out: {self.spread_val}") self.clear_vars() elif self.spread_val < self.trailing_spread_val and self.pos_type == "LONG": for i in range(len(self.symbols)): self.SetHoldings(self.symbols[i], 0) self.debug(f"stopped out trailing, spread_val: {self.spread_val}") self.clear_vars() elif self.spread_val > self.trailing_spread_val and self.pos_type == "SHORT": for i in range(len(self.symbols)): self.SetHoldings(self.symbols[i], 0) self.debug(f"stopped out trailing, spread_val: {self.spread_val}") self.clear_vars() #update trailing stop if self.pos_type == "LONG": self.trailing_spread_val = self.spread_val - self.spread_val * self.trailing_constant if self.spread_val > 0 else self.spread_val + self.spread_val * self.trailing_constant if self.pos_type == "SHORT": self.trailing_spread_val = self.spread_val + self.spread_val * self.trailing_constant if self.spread_val > 0 else self.spread_val - self.spread_val * self.trailing_constant return def clear_vars_exit(self): self.position = False self.pos_type = None self.stop_loss_spread_val = None return def calc_qty(self): avail_cash = self.portfolio.cash prices = [self.Securities[symbol].Price for symbol in self.symbols] total_value = 0 for i in range(len(prices)): total_value += abs(self.coef[i]*prices[i]) pct_portfolio = 0.80 self.lev = (pct_portfolio * avail_cash)/(total_value) #self.lev = 1 return def clear_vars(self): self.model = False self.cointegrated = False self.model_date = None self.coef = None self.position = False self.upper_bound = None self.lower_bound = None self.mean = None self.pos_type = None self.spread_val = None self.stop_loss_spread_val = None return def calculate_spread_value(self): assets = {} for symbol, security in self.securities.items(): history = self.History([symbol], 5 , Resolution.Minute) if len(history) > 0: assets[str(symbol)] = history.loc[symbol]['close'] else: return df_spread = pd.DataFrame.from_dict(assets) last_row = df_spread.iloc[-1].values self.spread_val = np.dot(last_row, np.array(self.coef)) return def stationarity_tests(self): for symbol, security in self.securities.items(): history = self.History([symbol], timedelta(hours=self.formulation_hours), Resolution.Minute) close_prices = history.loc[symbol]['close'].diff().dropna() adf = ADF(close_prices) pp = PhillipsPerron(close_prices) kpss = KPSS(close_prices) if adf.pvalue < 0.05 and pp.pvalue < 0.05 and kpss.pvalue > 0.10: #check weather pass all the tests continue else: return #make it through all of the tickers then it is cointegrated self.cointegrated = True return def create_model(self): lin_model = LinearRegression() assets = {} for symbol, security in self.securities.items(): #self.debug(f"symbol is:{symbol}") history = self.History([symbol], timedelta(hours=self.formulation_hours), Resolution.Minute) assets[str(symbol)] = history.loc[symbol]['close'] if self.amount_assets == 3: X = np.column_stack((assets[self.symbol2].values, assets[self.symbol3].values)) Y = assets[self.symbol1].values lin_model.fit(X,Y) spread = assets[self.symbol1].values - (lin_model.coef_[0]*assets[self.symbol2].values) - (lin_model.coef_[1]*assets[self.symbol3].values) else: X = np.array(assets[self.symbol2].values).reshape(-1, 1) Y = assets[self.symbol1].values lin_model.fit(X,Y) spread = assets[self.symbol1].values - (lin_model.coef_[0]*assets[self.symbol2].values) self.mean = np.mean(spread) self.upper_bound = self.mean + self.sigma * np.std(spread) self.lower_bound = self.mean - self.sigma * np.std(spread) self.coef = [1] + list(-1*lin_model.coef_) #self.debug(f"coef:{self.coef}") df_spread = pd.DataFrame.from_dict(assets) last_row = df_spread.iloc[-1].values self.spread_val = np.dot(last_row, np.array(self.coef)) self.model_built = True self.model_date = self.Time self.debug(f"time:{self.Time}, spread val:{self.spread_val}, lower_bound:{self.lower_bound}, upper_bound:{self.upper_bound}") return