Overall Statistics
Total Orders
48
Average Win
0.48%
Average Loss
-0.29%
Compounding Annual Return
118.963%
Drawdown
1.400%
Expectancy
0.440
Start Equity
1000000.00
End Equity
1030517.38
Net Profit
3.052%
Sharpe Ratio
6.616
Sortino Ratio
12.001
Probabilistic Sharpe Ratio
88.200%
Loss Rate
46%
Win Rate
54%
Profit-Loss Ratio
1.66
Alpha
0.643
Beta
0.149
Annual Standard Deviation
0.113
Annual Variance
0.013
Information Ratio
0.412
Tracking Error
0.123
Treynor Ratio
5.006
Total Fees
$0.00
Estimated Strategy Capacity
$210000.00
Lowest Capacity Asset
LTCUSD 2XR
Portfolio Turnover
136.48%
import numpy as np
import pandas as pd
import statsmodels.api as sm
from datetime import datetime
from AlgorithmImports import *
from arch.unitroot import ADF
from arch.unitroot import KPSS
from arch.unitroot import PhillipsPerron
from sklearn.linear_model import LinearRegression

###Minute Scale
class CointegrationAlgorithm(QCAlgorithm):

    def Initialize(self):
        self.SetStartDate(2024, 6, 1)  # Set Start Date
        self.SetEndDate(2024, 6, 14)    # Set End Date
        self.SetCash(1000000)            # Set Strategy Cash
        self.set_brokerage_model(BrokerageName.QuantConnectBrokerage, AccountType.MARGIN)

        self.formulation_hours = 21 #Hours to look back, optimized parameter
        self.model = False
        self.cointegrated = False
        self.model_date = None
        self.coef = None
        self.position = False
        self.upper_bound = None
        self.lower_bound = None
        self.mean = None
        self.pos_type = None
        self.sigma = 3.5 #upper/lower boundaries, optimized parameter
        self.spread_val = None
        self.stop_loss_spread_val = None
        self.stop_loss_constant = 0.05
        self.lev = 1
        self.trailing_spread_val = None
        self.trailing_constant = 0.02

        # Add Crypto assets
        self.symbol1 = "LTCUSD"
        self.symbol2 = "SOLUSD"
        #self.symbol3 = "SOLUSD"
        self.symbols = ['LTCUSD','SOLUSD']
        self.amount_assets = len(self.symbols)
        self.a1 = self.AddCrypto(self.symbol1, Resolution.Minute, leverage=1.0).Symbol
        self.a2 = self.AddCrypto(self.symbol2, Resolution.Minute, leverage=1.0).Symbol
        #self.a3 = self.AddCrypto(self.symbol3, Resolution.Minute, leverage=1.0).Symbol
        
        self.SetWarmUp(self.formulation_hours)


    def OnData(self, data): #goes through each minute
        #checks for warm-up period
        if self.is_warming_up:
            return
        current_time = self.Time 
        current_hour = current_time.hour 
        if current_hour >= 20 or current_hour <= 13: #if the hour of day is less than 18
            #see if there is a model built
            if self.model == False and self.cointegrated == False:
                self.stationarity_tests()
                if self.model == False and self.cointegrated == True:
                    self.create_model()
            else: #we have a model built and we check to see if our spread is at a boundary
                self.calculate_spread_value()
                #self.debug(f"spread val:{self.spread_val}, lower_bound:{self.lower_bound}, upper_bound:{self.upper_bound}")
                self.check_spread() #check to enter/exit trades
                if self.position == True:
                    self.check_stop()
                pass 
        else:
            #liquidate all positions and clear vars
            self.clear_vars()
            self.close_all_positions()
            self.debug(f"clearing vars and liquidating")
            
        pass
    
    def close_all_positions(self):
        for i in range(len(self.symbols)):
            self.SetHoldings(self.symbols[i], 0)
        return

    def check_spread(self):
        prices = [self.securities[symbol].Price for symbol in self.symbols]
        if self.position == True: #close out existing once target hit
            if self.pos_type == "LONG":
                if self.spread_val > self.upper_bound:
                    for i in range(len(self.symbols)):
                        self.SetHoldings(self.symbols[i], 0)
                    self.debug(f"{self.symbol1}: {prices[0]}, {self.symbol2}: {prices[1]}, {self.symbol2}: {prices[1]}")
                    self.debug(f"Closing Long spread_value {self.spread_val} lower bound:{self.lower_bound}, upper bound: {self.upper_bound}, coefs:{self.coef}") 
                    self.clear_vars_exit()
                    return
            elif self.pos_type == "SHORT":
                if self.spread_val < self.lower_bound:
                    for i in range(len(self.symbols)):
                        self.SetHoldings(self.symbols[i], 0)
                    self.debug(f"{self.symbol1}: {prices[0]}, {self.symbol2}: {prices[1]}, {self.symbol2}: {prices[1]}")
                    self.debug(f"Closing Short spread_value {self.spread_val} lower bound:{self.lower_bound}, upper bound: {self.upper_bound}, coefs:{self.coef}")
                    self.clear_vars_exit()
                    return
        else: #enter new position
            if self.spread_val > self.upper_bound:
                self.calc_qty()
                for i in range(len(self.symbols)):
                    self.market_order(self.symbols[i], -1*self.lev*self.coef[i])
                self.debug(f"{self.symbol1}: {prices[0]}, {self.symbol2}: {prices[1]}, {self.symbol2}: {prices[1]} QTY Spread: {self.lev}, Cash avail:{self.portfolio.cash}")
                self.debug(f"Short spread spread_value {self.spread_val} lower bound:{self.lower_bound}, upper bound: {self.upper_bound}, buy {self.symbol2}:{self.coef}")
                self.position = True
                self.pos_type = "SHORT"
                self.stop_loss_spread_val = self.upper_bound + self.upper_bound * self.stop_loss_constant if self.spread_val > 0 else self.upper_bound - self.upper_bound * self.stop_loss_constant
                self.trailing_spread_val = self.stop_loss_spread_val
            elif self.spread_val < self.lower_bound: #we buy the spread
                self.calc_qty()
                for i in range(len(self.symbols)):
                    self.market_order(self.symbols[i], self.lev*self.coef[i])
                self.debug(f"{self.symbol1}: {prices[0]}, {self.symbol2}: {prices[1]}, {self.symbol2}: {prices[1]} QTY Spread: {self.lev}, Cash avail:{self.portfolio.cash}")
                self.debug(f"Long spread spread_value {self.spread_val} lower bound:{self.lower_bound}, upper bound: {self.upper_bound}, buy {self.symbol2}:{self.coef}")
                self.position = True
                self.pos_type = "LONG"
                self.stop_loss_spread_val = self.lower_bound - self.lower_bound * self.stop_loss_constant if self.spread_val > 0 else self.lower_bound + self.lower_bound * self.stop_loss_constant
                self.trailing_spread_val = self.stop_loss_spread_val
            return

    def check_stop(self):
        if self.spread_val < self.stop_loss_spread_val and self.pos_type == "LONG":
            for i in range(len(self.symbols)):
                self.SetHoldings(self.symbols[i], 0)
            self.debug(f"Long pos stopped out: {self.spread_val}")
            self.clear_vars()
        elif self.spread_val > self.stop_loss_spread_val and self.pos_type == "SHORT":
            for i in range(len(self.symbols)):
                self.SetHoldings(self.symbols[i], 0)
            self.debug(f"Short pos stopped out: {self.spread_val}")
            self.clear_vars()
        elif self.spread_val < self.trailing_spread_val and self.pos_type == "LONG":
            for i in range(len(self.symbols)):
                self.SetHoldings(self.symbols[i], 0)
            self.debug(f"stopped out trailing, spread_val: {self.spread_val}")
            self.clear_vars()
        elif self.spread_val > self.trailing_spread_val and self.pos_type == "SHORT":
            for i in range(len(self.symbols)):
                self.SetHoldings(self.symbols[i], 0)
            self.debug(f"stopped out trailing, spread_val: {self.spread_val}")
            self.clear_vars()
        #update trailing stop
        if self.pos_type == "LONG":
            self.trailing_spread_val = self.spread_val - self.spread_val * self.trailing_constant if self.spread_val > 0 else self.spread_val + self.spread_val * self.trailing_constant
        if self.pos_type == "SHORT":
            self.trailing_spread_val = self.spread_val + self.spread_val * self.trailing_constant if self.spread_val > 0 else self.spread_val - self.spread_val * self.trailing_constant
        
        return

    def clear_vars_exit(self):
        self.position = False
        self.pos_type = None
        self.stop_loss_spread_val = None
        return

    def calc_qty(self):
        avail_cash = self.portfolio.cash
        prices = [self.Securities[symbol].Price for symbol in self.symbols]
        total_value = 0
        for i in range(len(prices)):
            total_value += abs(self.coef[i]*prices[i])
        pct_portfolio = 0.80
        self.lev = (pct_portfolio * avail_cash)/(total_value)
        #self.lev = 1
        return

    def clear_vars(self):
        self.model = False
        self.cointegrated = False
        self.model_date = None
        self.coef = None
        self.position = False
        self.upper_bound = None
        self.lower_bound = None
        self.mean = None
        self.pos_type = None
        self.spread_val = None
        self.stop_loss_spread_val = None
        return

    def calculate_spread_value(self):
        assets = {}
        for symbol, security in self.securities.items():
            history = self.History([symbol], 5 , Resolution.Minute)
            if len(history) > 0:
                assets[str(symbol)] = history.loc[symbol]['close']
            else:
                return
        df_spread = pd.DataFrame.from_dict(assets)        
        last_row = df_spread.iloc[-1].values       
        self.spread_val = np.dot(last_row, np.array(self.coef))        
        return

    def stationarity_tests(self):
        for symbol, security in self.securities.items():
            history = self.History([symbol], timedelta(hours=self.formulation_hours), Resolution.Minute)
            close_prices = history.loc[symbol]['close'].diff().dropna()
            adf = ADF(close_prices)
            pp = PhillipsPerron(close_prices)
            kpss = KPSS(close_prices)
            if adf.pvalue < 0.05 and pp.pvalue < 0.05 and kpss.pvalue > 0.10: #check weather pass all the tests
                continue
            else:
                return
        #make it through all of the tickers then it is cointegrated
        self.cointegrated = True 

        return

    def create_model(self):
        lin_model = LinearRegression()
        assets = {}
        for symbol, security in self.securities.items():
            #self.debug(f"symbol is:{symbol}")
            history = self.History([symbol], timedelta(hours=self.formulation_hours), Resolution.Minute)
            assets[str(symbol)] = history.loc[symbol]['close']

        if self.amount_assets == 3:
            X = np.column_stack((assets[self.symbol2].values, assets[self.symbol3].values))
            Y = assets[self.symbol1].values
            lin_model.fit(X,Y)  
            spread = assets[self.symbol1].values - (lin_model.coef_[0]*assets[self.symbol2].values) - (lin_model.coef_[1]*assets[self.symbol3].values)
        else:
            X = np.array(assets[self.symbol2].values).reshape(-1, 1)
            Y = assets[self.symbol1].values
            lin_model.fit(X,Y)  
            spread = assets[self.symbol1].values - (lin_model.coef_[0]*assets[self.symbol2].values)
        
        self.mean = np.mean(spread)
        self.upper_bound = self.mean + self.sigma * np.std(spread)
        self.lower_bound = self.mean - self.sigma * np.std(spread)
        self.coef = [1] + list(-1*lin_model.coef_)
        #self.debug(f"coef:{self.coef}")
        df_spread = pd.DataFrame.from_dict(assets)        
        last_row = df_spread.iloc[-1].values       
        self.spread_val = np.dot(last_row, np.array(self.coef))        
        self.model_built = True
        self.model_date = self.Time
        self.debug(f"time:{self.Time}, spread val:{self.spread_val}, lower_bound:{self.lower_bound}, upper_bound:{self.upper_bound}")
        return