Overall Statistics
Total Orders
2708
Average Win
0.04%
Average Loss
-0.01%
Compounding Annual Return
-3.349%
Drawdown
8.600%
Expectancy
-0.426
Start Equity
50000
End Equity
46192.44
Net Profit
-7.615%
Sharpe Ratio
-2.129
Sortino Ratio
-0.886
Probabilistic Sharpe Ratio
0.000%
Loss Rate
84%
Win Rate
16%
Profit-Loss Ratio
2.63
Alpha
-0.059
Beta
0.002
Annual Standard Deviation
0.028
Annual Variance
0.001
Information Ratio
-0.59
Tracking Error
0.157
Treynor Ratio
-30.433
Total Fees
$2708.00
Estimated Strategy Capacity
$600000.00
Lowest Capacity Asset
DGRO VRBX895UDV1H
Portfolio Turnover
31.71%
#region imports
from AlgorithmImports import *
import statsmodels.formula.api as sm
from statsmodels.tsa.stattools import coint, adfuller
#endregion

class Pairs(object):

    def __init__(self, a, b):
        self.a = a
        self.b = b
        self.Name = f'{a.Symbol.Value}:{b.Symbol.Value}'

        self.Model = None
        self.MeanError = 0
        self.StandardDeviation = 0
        self.Epsilon = 0

    @property
    def DataFrame(self):
        df = pd.concat([self.a.DataFrame.droplevel([0]), self.b.DataFrame.droplevel([0])], axis=1).dropna()
        df.columns = [self.a.Symbol.Value, self.b.Symbol.Value]
        return df
    
    @property
    def Correlation(self):
        return self.DataFrame.corr().iloc[0][1]

    def cointegration_test(self):
        coint_test = coint(self.a.Series.values.flatten(), self.b.Series.values.flatten(), trend="n", maxlag=0)

        # Return if not cointegrated
        if coint_test[1] >= 0.05:
            return False

        self.Model = sm.ols(formula = f'{self.a.Symbol.Value} ~ {self.b.Symbol.Value}', data=self.DataFrame).fit()
        self.StationaryP = adfuller(self.Model.resid, autolag = 'BIC')[1]
        self.MeanError = np.mean(self.Model.resid)
        self.Epsilon = np.std(self.Model.resid)
        
        return True
#region imports
from AlgorithmImports import *
#endregion

class SymbolData(object):

    def __init__(self, algorithm, symbol, lookback, interval):
        lookback = int(lookback)
        self.Symbol = symbol
        self.Prices = RollingWindow[TradeBar](lookback // interval)
        self.Series = None
        self.DataFrame = None

        self._algorithm = algorithm
        self._consolidator = TradeBarConsolidator(timedelta(minutes=interval))
        self._consolidator.DataConsolidated += self.OnDataConsolidated

        history = algorithm.History(symbol, lookback, Resolution.Minute)
        for bar in history.itertuples():
            trade_bar = TradeBar(bar.Index[1], symbol, bar.open, bar.high, bar.low, bar.close, bar.volume)
            self.Update(trade_bar)

    @property
    def IsReady(self):
        return self.Prices.IsReady
    
    def Update(self, trade_bar):
        self._consolidator.Update(trade_bar)
    
    def OnDataConsolidated(self, sender, consolidated):
        self.Prices.Add(consolidated)
        if self.IsReady:
            self.Series = self._algorithm.PandasConverter.GetDataFrame[TradeBar](self.Prices)['close']
            self.DataFrame = self.Series.to_frame()
#region imports
from AlgorithmImports import *
#endregion

class TradingPair(object):

    def __init__(self, ticket_a, ticket_b, intercept, slope, mean_error, epsilon):
        self.ticket_a = ticket_a
        self.ticket_b = ticket_b

        self.model_intercept = intercept
        self.model_slope = slope

        self.mean_error = mean_error
        self.epsilon = epsilon
#region imports
from AlgorithmImports import *
from Pair import *
from SymbolData import *
from TradingPair import *

from itertools import combinations
import statsmodels.tsa.stattools as ts
#endregion

class PairsTrading(QCAlgorithm):     

    symbols = ['IVV', 'USMV', 'QUAL', 'DGRO' , 'DVY', 'VLUE', 'MTUM']



    num_bar = 390*21*3
    interval = 10
    pair_num = 10
    leverage = 1
    min_corr_threshold = 0.9
    open_size = 2.32
    close_size = 0.5
    stop_loss_size = 6

    def Initialize(self):
        self.SetStartDate(2021, 12, 1)
        self.SetEndDate(2024, 3, 31)
        self.SetCash(50000)
        
        self.symbol_data = {}
        self.pair_list = []
        self.selected_pair = []
        self.trading_pairs = {}
        self.regenerate_time = datetime.min

        for ticker in self.symbols:
            symbol = self.AddEquity(ticker, Resolution.Minute).Symbol
            self.symbol_data[symbol] = SymbolData(self, symbol, self.num_bar, self.interval)
        
        for pair in combinations(self.symbol_data.items(), 2):
            if pair[0][1].IsReady and pair[1][1].IsReady:
                self.pair_list.append(Pairs(pair[0][1], pair[1][1]))

    def GeneratePairs(self):
        selected_pair = []
        for pair in self.pair_list:
            # correlation selection 
            if pair.Correlation < self.min_corr_threshold:
                continue

            # cointegration selection 
            coint = pair.cointegration_test()
            if coint and pair.StationaryP < 0.05:
                selected_pair.append(pair)

        if len(selected_pair) == 0:
            self.Debug('No selected pair')
            return []

        selected_pair.sort(key = lambda x: x.Correlation, reverse = True)
        if len(selected_pair) > self.pair_num:
            selected_pair = selected_pair[:self.pair_num]
        selected_pair.sort(key = lambda x: x.StationaryP)

        return selected_pair

    def OnData(self, data):
        for symbol, symbolData in self.symbol_data.items():
            if data.Bars.ContainsKey(symbol):
                symbolData.Update(data.Bars[symbol])

        # generate pairs with correlation and cointegration selection 
        if self.regenerate_time < self.Time:
            self.selected_pair = self.GeneratePairs()
            self.regenerate_time = self.Time + timedelta(days=5)

        # closing existing position
        for pair, trading_pair in self.trading_pairs.copy().items():
            # close: if not correlated nor cointegrated anymore
            if pair not in self.selected_pair:
                self.MarketOrder(pair.a.Symbol, -trading_pair.ticket_a.Quantity)
                self.MarketOrder(pair.b.Symbol, -trading_pair.ticket_b.Quantity)
                self.trading_pairs.pop(pair)
                self.Debug(f'Close {pair.Name}')
                continue

            # get current cointegrated series deviation from mean
            error = pair.a.Prices[0].Close - (trading_pair.model_intercept + trading_pair.model_slope * pair.b.Prices[0].Close)
            
            # close: when the cointegrated series is deviated less than 0.5 SD from its mean
            if trading_pair.ticket_a.Quantity > 0 \
            and (error > trading_pair.mean_error - self.close_size * trading_pair.epsilon \
            or error < trading_pair.mean_error - self.stop_loss_size * trading_pair.epsilon):
                self.MarketOrder(pair.a.Symbol, -trading_pair.ticket_a.Quantity)
                self.MarketOrder(pair.b.Symbol, -trading_pair.ticket_b.Quantity)
                self.trading_pairs.pop(pair)
                self.Debug(f'Close {pair.Name}')

            elif trading_pair.ticket_a.Quantity < 0 \
            and (error < trading_pair.mean_error + self.close_size * trading_pair.epsilon \
            or error > trading_pair.mean_error + self.stop_loss_size * trading_pair.epsilon):
                self.MarketOrder(pair.a.Symbol, -trading_pair.ticket_a.Quantity)
                self.MarketOrder(pair.b.Symbol, -trading_pair.ticket_b.Quantity)
                self.trading_pairs.pop(pair)
                self.Debug(f'Close {pair.Name}')

        # entry: when the cointegrated series is deviated by more than 2.32 SD from its mean
        for pair in self.selected_pair:
            # get current cointegrated series deviation from mean
            price_a = pair.a.Prices[0].Close
            price_b = pair.b.Prices[0].Close
            error = price_a - (pair.Model.params[0] + pair.Model.params[1] * price_b)

            if pair not in self.trading_pairs:
                if error < pair.MeanError - self.open_size * pair.Epsilon:
                    qty_a = self.CalculateOrderQuantity(symbol, self.leverage/self.pair_num / 2)
                    qty_b = self.CalculateOrderQuantity(symbol, -self.leverage/self.pair_num / 2)
                    ticket_a = self.MarketOrder(pair.a.Symbol, qty_a)
                    ticket_b = self.MarketOrder(pair.b.Symbol, qty_b)
                    
                    self.trading_pairs[pair] = TradingPair(ticket_a, ticket_b, pair.Model.params[0], pair.Model.params[1], pair.MeanError, pair.Epsilon)
                    self.Debug(f'Long {qty_a} {pair.a.Symbol.Value} and short {qty_b} {pair.b.Symbol.Value}')

                elif error > pair.MeanError + self.open_size * pair.Epsilon:
                    qty_a = self.CalculateOrderQuantity(symbol, -self.leverage/self.pair_num / 2)
                    qty_b = self.CalculateOrderQuantity(symbol, self.leverage/self.pair_num / 2)
                    ticket_a = self.MarketOrder(pair.a.Symbol, qty_a)
                    ticket_b = self.MarketOrder(pair.b.Symbol, qty_b)
                    
                    self.trading_pairs[pair] = TradingPair(ticket_a, ticket_b, pair.Model.params[0], pair.Model.params[1], pair.MeanError, pair.Epsilon)
                    self.Debug(f'Long {qty_b} {pair.b.Symbol.Value} and short {qty_a} {pair.a.Symbol.Value}')