Overall Statistics
Total Orders
26
Average Win
1.02%
Average Loss
-0.61%
Compounding Annual Return
13.440%
Drawdown
1.800%
Expectancy
0.561
Start Equity
1000000.00
End Equity
1061150.86
Net Profit
6.115%
Sharpe Ratio
0.801
Sortino Ratio
0.867
Probabilistic Sharpe Ratio
76.132%
Loss Rate
42%
Win Rate
58%
Profit-Loss Ratio
1.68
Alpha
0.037
Beta
0.006
Annual Standard Deviation
0.047
Annual Variance
0.002
Information Ratio
-1.503
Tracking Error
0.102
Treynor Ratio
6.806
Total Fees
$0.00
Estimated Strategy Capacity
$13000000.00
Lowest Capacity Asset
LTCUSD 2XR
Portfolio Turnover
1.18%
import numpy as np
import pandas as pd
import statsmodels.api as sm
from datetime import datetime
from AlgorithmImports import *

class CointegrationAlgorithm(QCAlgorithm):

    def Initialize(self):
        self.SetStartDate(2024, 1, 1)  # Set Start Date
        self.SetEndDate(2024, 6, 19)    # Set End Date
        self.SetCash(1000000)            # Set Strategy Cash
        self.set_brokerage_model(BrokerageName.QuantConnectBrokerage, AccountType.MARGIN)

        self.model = False
        self.model_date = None
        self.coef = None
        self.position = False
        self.upper_bound = None
        self.lower_bound = None
        self.pos_type = None
        self.sigma = 1.75
        self.spread_val = None
        self.stop_loss_spread_val = None
        self.stop_loss_constant = 0.05
        self.lev = 1

        # Add Crypto assets
        self.symbol1 = "LTCUSD"
        self.symbol2 = "SOLUSD"
        self.a1 = self.AddCrypto(self.symbol1, Resolution.Daily, leverage=1.0).Symbol
        self.a2 = self.AddCrypto(self.symbol2, Resolution.Daily, leverage=1.0).Symbol
        
        # Warm-up period to collect enough historical data
        self.SetWarmUp(65)
        
        # Schedule a daily event to run the trading logic
        self.Schedule.On(self.DateRules.EveryDay(), self.TimeRules.At(12, 0), self.Rebalance)

    def Rebalance(self):
        if self.IsWarmingUp:
            return
        # Get historical data
        history = self.History([self.a1, self.a2], 65, Resolution.Daily) #get n days of data
        if history.empty:
            return
        # Prepare data for cointegration test
        prices = history.close.unstack(level=0)
        a1_prices = prices[self.a1]
        a2_prices = prices[self.a2]

        if self.model == True:
            self.calc_spread(a1_prices, a2_prices)
            self.debug(f"spread val: {self.spread_val}, lower: {self.lower_bound}, upper:{self.upper_bound}")

        #check if in any positions
        if self.position == False:
            # Perform cointegration test if existing model is not built
            if self.model == False:
                coint_result = sm.tsa.stattools.coint(a1_prices.diff().dropna(), a2_prices.diff().dropna())
                if coint_result[1] < 0.05 and self.model == False:
                    self.debug(f"{self.symbol1} and {self.symbol2} are cointegrated")
                    self.TradePair(a1_prices, a2_prices, self.a1, self.a2)
                    #return
            if self.model == True and self.position == False:
                self.calc_spread(a1_prices, a2_prices)
                self.check_spread(self.a1, self.a2)

        elif self.position == True:
            self.calc_spread(a1_prices, a2_prices)
            self.check_spread(self.a1, self.a2)
            if self.position == True:
                self.check_stop()
        #close out, been in trade too long
        elif self.model == True and self.position == True:
            n_days = 90
            if (self.Time - self.model_date).days > n_days:
                self.SetHoldings(self.symbol1, 0)
                self.SetHoldings(self.symbol2, 0)
                self.debug(f"*****REBUILD AND Close pos past {n_days}: {self.spread_val}")
                self.clear_vars()
        #spread is too far out from current model, must rebuild
        elif self.model == True and self.position_opened == False: #only rebuild if not in a position
           if abs(self.spread_val) > abs(1.10*self.upper_bound) or abs(self.spread_val) < abs((1-.10)*self.lower_bound):
                self.debug('#####REBUILD THE MODEL spread is to farout from model')
                self.clear_vars()
    
        return

    def check_stop(self):
        if self.spread_val < self.stop_loss_spread_val and self.pos_type == "LONG":
            self.SetHoldings(self.symbol1, 0)
            self.SetHoldings(self.symbol2, 0)
            self.debug(f"Long pos stopped out: {self.spread_val}")
            self.clear_vars()
        elif self.spread_val > self.stop_loss_spread_val and self.pos_type == "SHORT":
            self.SetHoldings(self.symbol1, 0)
            self.SetHoldings(self.symbol2, 0)
            self.debug(f"Short pos stopped out: {self.spread_val}")
            self.clear_vars()
        return

    def check_spread(self, symbol1, symbol2):
        price1 = self.Securities[symbol1].Price #asset 1 price
        price2 = self.Securities[symbol2].Price #asset 2 price
        if self.position == True: #close out existing once target hit
            if self.pos_type == "LONG":
                if self.spread_val > self.upper_bound:
                    self.SetHoldings(self.symbol1, 0)
                    self.SetHoldings(self.symbol2, 0)
                    self.debug(f"{self.symbol1}: {price1}, {self.symbol2}: {price2}")
                    self.debug(f"Closing Long spread_value {self.spread_val} lower bound:{self.lower_bound}, upper bound: {self.upper_bound}, {self.symbol2}:{self.coef}") 
                    self.clear_vars()
                    return
            elif self.pos_type == "SHORT":
                if self.spread_val < self.lower_bound:
                    self.SetHoldings(self.symbol1, 0)
                    self.SetHoldings(self.symbol2, 0)
                    self.debug(f"{self.symbol1}: {price1}, {self.symbol2}: {price2}")
                    self.debug(f"Closing Short spread_value {self.spread_val} lower bound:{self.lower_bound}, upper bound: {self.upper_bound}, {self.symbol2}:{self.coef}")
                    self.clear_vars()
                    return
        else: #enter new position
            if self.spread_val > self.upper_bound:
                self.calc_qty(price1, price2)
                self.market_order(self.symbol1, -1*self.lev)
                self.market_order(self.symbol2, self.coef*self.lev)
                self.debug(f"{self.symbol1}: {price1}, {self.symbol2}: {price2}, QTY Spread: {self.lev}, Cash avail:{self.portfolio.cash}")
                self.debug(f"Short spread spread_value {self.spread_val} lower bound:{self.lower_bound}, upper bound: {self.upper_bound}, buy {self.symbol2}:{self.coef}")
                self.position = True
                self.pos_type = "SHORT"
                self.stop_loss_spread_val = self.upper_bound + self.upper_bound * self.stop_loss_constant if self.spread_val > 0 else self.upper_bound - self.upper_bound * self.stop_loss_constant
            elif self.spread_val < self.lower_bound:
                self.calc_qty(price1, price2)
                # If spread is below mean - 1 standard deviation, buy the spread (buy symbol1, sell symbol2)
                self.market_order(self.symbol1, 1*self.lev)
                self.market_order(self.symbol2, -self.coef*self.lev)
                self.debug(f"{self.symbol1}: {price1}, {self.symbol2}: {price2}, QTY Spread: {self.lev}, Cash avail:{self.portfolio.cash}")
                self.debug(f"Long spread spread_value {self.spread_val} lower bound:{self.lower_bound}, upper bound: {self.upper_bound}, buy {self.symbol2}:{self.coef}")
                self.position = True
                self.pos_type = "LONG"
                self.stop_loss_spread_val = self.lower_bound - self.lower_bound * self.stop_loss_constant if self.spread_val > 0 else self.lower_bound + self.lower_bound * self.stop_loss_constant

        return 

    def calc_qty(self, price1, price2):
        avail_cash = self.portfolio.cash
        pct_portfolio = 0.15
        self.lev = (pct_portfolio * avail_cash)/(price1 + abs(self.coef)*price2)
        return

    def clear_vars(self):
        self.model = False
        self.coef = None
        self.position = False
        self.upper_bound = None
        self.lower_bound = None
        self.pos_type = None
        self.spread_val = None
        self.stop_loss_spread_val = None
        return

    def calc_spread(self, prices1, prices2):
        spread = prices1 - self.coef * prices2
        self.spread_val = spread[-1]
        return 

    def TradePair(self, prices1, prices2, symbol1, symbol2):
        price1 = self.Securities[symbol1].Price
        price2 = self.Securities[symbol2].Price
        # Calculate the hedge ratio using OLS
        model = sm.OLS(prices1, sm.add_constant(prices2)).fit()
        self.coef = model.params[1]
        self.model = True
        self.model_date = self.Time
        # Calculate the spread
        spread = prices1 - self.coef * prices2
        spread_mean = spread.mean()
        spread_std = spread.std()
        self.upper_bound = spread_mean + self.sigma*spread_std
        self.lower_bound = spread_mean - self.sigma*spread_std

        # Get the latest spread value
        self.spread_val = spread[-1]

    def OnData(self, data):
        pass