Overall Statistics
Total Orders
9613
Average Win
0.39%
Average Loss
-0.36%
Compounding Annual Return
4.623%
Drawdown
15.800%
Expectancy
0.042
Start Equity
100000
End Equity
194197.98
Net Profit
94.198%
Sharpe Ratio
0.272
Sortino Ratio
0.336
Probabilistic Sharpe Ratio
1.195%
Loss Rate
50%
Win Rate
50%
Profit-Loss Ratio
1.10
Alpha
0.013
Beta
0.051
Annual Standard Deviation
0.064
Annual Variance
0.004
Information Ratio
-0.463
Tracking Error
0.15
Treynor Ratio
0.347
Total Fees
$12793.59
Estimated Strategy Capacity
$4600000.00
Lowest Capacity Asset
DGX R735QTJ8XC9X
Portfolio Turnover
35.87%
# https://quantpedia.com/strategies/pairs-trading-with-stocks/
#
# The investment universe consists of stocks from NYSE, AMEX, and NASDAQ, while illiquid stocks are removed from the investment universe. Cumulative
# total return index is then created for each stock (dividends included), and the starting price during the formation period is set to $1 (price normalization). 
# Pairs are formed over twelve months (formation period) and are then traded in the next six-month period (trading period). The matching partner for each stock
# is found by looking for the security that minimizes the sum of squared deviations between two normalized price series. Top 20 pairs with the smallest historical 
# distance measure are then traded, and a long-short position is opened when pair prices have diverged by two standard deviations, and the position is closed
# when prices revert.
#
# QC implementation changes:
#   - Universe consists of top 500 most liquid US stocks with price > 5$.
#   - Maximum number of pairs traded at one time is set to 5.

#region imports
from AlgorithmImports import *
import numpy as np
import itertools as it
from pandas.core.frame import DataFrame
#endregion

class PairsTradingwithStocks(QCAlgorithm):
    
    def Initialize(self):
        self.SetStartDate(2010, 1, 1)
        self.SetCash(100_000)
       
        self.market: Symbol = self.AddEquity('SPY', Resolution.Daily).Symbol

        # Daily price data.
        self.history_price: Dict[Symbol, RollingWindow] = {}
        self.period: int = 12 * 21
        self.leverage: int = 5
        self.min_share_price: float = 5.
        self.selection_month: int = 6
        
        # Equally weighted brackets.
        self.max_traded_pairs: int = 5
        self.traded_pairs:List = []
        self.traded_quantity: Dict = {}
        
        self.sorted_pairs: List = []
        
        self.fundamental_count: int = 500
        self.fundamental_sorting_key = lambda x: x.DollarVolume

        self.month: int = 6
        self.selection_flag: bool = True
        self.UniverseSettings.Resolution = Resolution.Daily
        self.AddUniverse(self.FundamentalSelectionFunction)
        self.Settings.MinimumOrderMarginPortfolioPercentage = 0.
        self.Schedule.On(self.DateRules.MonthStart(self.market), self.TimeRules.AfterMarketOpen(self.market), self.Selection)

    def OnSecuritiesChanged(self, changes: SecurityChanges) -> None:
        for security in changes.AddedSecurities:
            security.SetFeeModel(CustomFeeModel())
            security.SetLeverage(self.leverage)
            
        for security in changes.RemovedSecurities:
            symbol: Symbol = security.Symbol
            if symbol in self.history_price:
                del self.history_price[symbol]
            
        symbols: List[Symbol] = [x for x in self.history_price.keys() if x != self.market]
        self.symbol_pairs = list(it.combinations(symbols, 2))
        
        # minimize the sum of squared deviations
        distances: Dict = {}
        for pair in self.symbol_pairs:
            if self.history_price[pair[0]].IsReady and self.history_price[pair[1]].IsReady:
                distances[pair] = self.Distance(self.history_price[pair[0]], self.history_price[pair[1]])
        
        if len(distances) != 0:
            self.sorted_pairs = [x[0] for x in sorted(distances.items(), key = lambda x: x[1])[:20]]
        
        self.Liquidate()
        self.traded_pairs.clear()
        self.traded_quantity.clear()

    def FundamentalSelectionFunction(self, fundamental: List[Fundamental]) -> List[Symbol]:
        # Update the rolling window every day.
        for stock in fundamental:
            symbol: Symbol = stock.Symbol

            if symbol in self.history_price:
                self.history_price[symbol].Add(stock.AdjustedPrice)

        if not self.selection_flag:
            return Universe.Unchanged
        self.selection_flag = False
        
        selected: List[Fundamental] = [
            x for x in fundamental if x.HasFundamentalData and x.Price > self.min_share_price and x.Market == 'usa'
        ]
        if len(selected) > self.fundamental_count:
            selected = [x for x in sorted(selected, key=self.fundamental_sorting_key, reverse=True)[:self.fundamental_count]]
            
        # Warmup price rolling windows.
        for stock in selected:
            symbol: Symbol = stock.Symbol
            
            if symbol in self.history_price:
                continue
            
            self.history_price[symbol] = RollingWindow[float](self.period)
            history: DataFrame = self.History(symbol, self.period, Resolution.Daily)
            if history.empty:
                self.Log(f"Not enough data for {symbol} yet")
                continue
            closes: Series = history.loc[symbol].close
            for time, close in closes.items():
                self.history_price[symbol].Add(close)
                
        return [x.Symbol for x in selected if self.history_price[x.Symbol].IsReady]
    
    def OnData(self, data: Slice) -> None:
        if self.sorted_pairs is None: return
        
        pairs_to_remove:List = []
        
        for pair in self.sorted_pairs:
            # Calculate the spread of two price series.
            price_a: List[float] = list(self.history_price[pair[0]])
            price_b: List[float] = list(self.history_price[pair[1]])
            norm_a: np.ndarray = np.array(price_a) / price_a[-1]
            norm_b: np.ndarray = np.array(price_b) / price_b[-1]
            
            spread: np.ndarray = norm_a - norm_b
            mean: float = np.mean(spread)
            std: float = np.std(spread)
            actual_spread: float = spread[0]
            
            # Long-short position is opened when pair prices have diverged by two standard deviations.
            traded_portfolio_value: float = self.Portfolio.TotalPortfolioValue / self.max_traded_pairs
            if actual_spread > mean + 2*std or actual_spread < mean - 2*std:
                if pair not in self.traded_pairs:
                    # open new position for pair, if there's place for it.
                    if len(self.traded_pairs) < self.max_traded_pairs:
                        symbol_a: Symbol = pair[0]
                        symbol_b: Symbol = pair[1]
                        a_price_norm: float = norm_a[0]
                        b_price_norm: float = norm_b[0]
                        a_price: float = price_a[0]
                        b_price: float = price_b[0]
                        
                        # a stock's price > b stock's price
                        if a_price_norm > b_price_norm:
                            if b_price != 0 and a_price != 0:
                                long_q: float = traded_portfolio_value / b_price    # long b stock
                                short_q: float = -traded_portfolio_value / a_price  # short a stock
                                if self.Securities.ContainsKey(symbol_a) and self.Securities.ContainsKey(symbol_b) and \
                                    self.Securities[symbol_a].Price != 0 and self.Securities[symbol_a].IsTradable and \
                                    self.Securities[symbol_b].Price != 0 and self.Securities[symbol_b].IsTradable:
                                    self.MarketOrder(symbol_a, short_q)
                                    self.MarketOrder(symbol_b, long_q)
                                    
                                    self.traded_quantity[pair] = (short_q, long_q)
                                    self.traded_pairs.append(pair)
                        # b stock's price > a stock's price
                        else:
                            if b_price != 0 and a_price != 0:
                                long_q: float = traded_portfolio_value / a_price
                                short_q: float = -traded_portfolio_value / b_price
                                if self.Securities.ContainsKey(symbol_a) and self.Securities.ContainsKey(symbol_b) and \
                                    self.Securities[symbol_a].Price != 0 and self.Securities[symbol_a].IsTradable and \
                                    self.Securities[symbol_b].Price != 0 and self.Securities[symbol_b].IsTradable:
                                    self.MarketOrder(symbol_a, long_q)
                                    self.MarketOrder(symbol_b, short_q)

                                    self.traded_quantity[pair] = (long_q, short_q)
                                    self.traded_pairs.append(pair)
            # The position is closed when prices revert back.
            else:
                if pair in self.traded_pairs and pair in self.traded_quantity:
                    # make opposite order to opened position
                    self.MarketOrder(pair[0], -self.traded_quantity[pair][0])
                    self.MarketOrder(pair[1], -self.traded_quantity[pair][1])
                    pairs_to_remove.append(pair)
            
        for pair in pairs_to_remove:
            self.traded_pairs.remove(pair)
            del self.traded_quantity[pair]
            
    def Distance(self, price_a, price_b) -> float:
        # Calculate the sum of squared deviations between two normalized price series.
        price_a: List = list(price_a)
        price_b: List = list(price_b)
        
        norm_a: np.ndarray = np.array(price_a) / price_a[-1]
        norm_b: np.ndarray = np.array(price_b) / price_b[-1]
        return sum((norm_a - norm_b)**2)
        
    def Selection(self) -> None:
        if self.month == self.selection_month:
            self.selection_flag = True
            
        self.month += 1
        if self.month > 12:
            self.month = 1

# Custom fee model.
class CustomFeeModel(FeeModel):
    def GetOrderFee(self, parameters):
        fee = parameters.Security.Price * parameters.Order.AbsoluteQuantity * 0.00005
        return OrderFee(CashAmount(fee, "USD"))