Overall Statistics
Total Trades
83
Average Win
1.42%
Average Loss
-1.76%
Compounding Annual Return
-6.349%
Drawdown
19.100%
Expectancy
-0.114
Net Profit
-10.361%
Sharpe Ratio
-0.193
Probabilistic Sharpe Ratio
3.146%
Loss Rate
51%
Win Rate
49%
Profit-Loss Ratio
0.81
Alpha
-0.059
Beta
0.626
Annual Standard Deviation
0.163
Annual Variance
0.027
Information Ratio
-0.521
Tracking Error
0.145
Treynor Ratio
-0.05
Total Fees
$1161.63
Estimated Strategy Capacity
$24000000.00
Lowest Capacity Asset
IBM R735QTJ8XC9X
# region imports
from AlgorithmImports import *
from decimal import Decimal
from io import StringIO
from time import gmtime, strftime

# endregion
"""

outline:
    get pairs monthly
        every day compute the spread using the computed weights
        if spread is above mean, buy spread
            reverse position when spread crossovers the mean
        if spread is below mean, sell spread
            reverse position when spread crossovers the mean

new features:
- don't reenter spreads from timeout until it has crossed the sma at least once
 |- how to do it? store trades, exit type, and flag as not tradeable until currpx > sma for long and vice versa    
 |- updated logic so that spread is calculated on every interval to check spread crossing after trade timeout   
"""


def quantize(number, digits=-2):
    """
    convert messy floating point to clean num of digits
    """
    num_places = Decimal(10) ** digits
    q = Decimal(number).quantize(num_places)
    q = float(q)
    return q


# Custom slippage implementation
class CustomSlippageModel:
    def __init__(self, algorithm):
        self.algorithm = algorithm

    def GetSlippageApproximation(self, asset, order):
        # custom slippage math
        slippage = asset.Price * 0.0001 * np.log10(2 * float(order.AbsoluteQuantity))
        self.algorithm.Debug(f"CustomSlippageModel: {slippage:.2f}")
        return slippage


class symbolData:
    def __init__(
        self,
        pair,
        price_multiple,
        bb_multiple,
        window,
        algorithm,
        spread_smoothing_window=None,
    ):
        self.pair = pair
        self.symbol1 = self.pair[0]
        self.symbol2 = self.pair[1]
        self.price_multiple = price_multiple
        self.bb_multiple = bb_multiple
        self.window = window
        self.spread_smoothing_window = spread_smoothing_window
        self.spread = None
        self.roll_mean = None
        self.roll_std = None
        self.bb_upper = None
        self.bb_lower = None
        self.long_spread = False
        self.short_spread = False
        self.in_trade = False
        self.trade_entry_timestamp = None
        self.algo = algorithm
        self.trade_data = None
        self.trade_timeout = False

    def reset_trade_flags(self):
        self.long_spread = False
        self.short_spread = False
        self.in_trade = False
        self.trade_entry_timestamp = None
        self.trade_data = None
        self.trade_timeout = False
        return

    def spread_crossed_since_trade(self):
        """this should be used only if trade is timed out"""

        # time diff
        ts = self.trade_data["timestamp"].iloc[0]
        tmp_spread = self.spread.loc[ts:]
        tmp_roll_mean = self.roll_mean.loc[ts:]

        if self.long_spread:
            if any(tmp_spread > tmp_roll_mean):
                self.reset_trade_flags()
        elif self.short_spread:
            if any(tmp_spread < tmp_roll_mean):
                self.reset_trade_flags()

    def check_trade_timeout(self):
        if self.in_trade and self.trade_entry_timestamp is not None:
            trade_duration = (self.algo.Time - self.trade_entry_timestamp).days
            if trade_duration >= self.algo.max_trade_duration:
                for symbol in self.pair:
                    # how many shares do we hold of the symbol in this pair trade
                    # liquidate only the amount for that pair trade
                    held_quantity = self.trade_data.loc[symbol, "quantity"]
                    if self.long_spread:
                        self.algo.MarketOnCloseOrder(
                            symbol,
                            -1 * held_quantity,
                            tag=f"TRADE TIMEOUT LONG SPREAD::{self.pair[0]}-{self.pair[1]}",
                        )
                        # need to track over time to confirm that spread crosses sma before anymore long positions
                        # cannot reset all flags before this happens
                        self.trade_timeout = True
                        self.spread_crossed_since_trade()

                    elif self.short_spread:
                        self.algo.MarketOnCloseOrder(
                            symbol,
                            -1 * held_quantity,
                            tag=f"TRADE TIMEOUT SHORT SPREAD::{self.pair[0]}-{self.pair[1]}",
                        )
                        self.trade_timeout = True
                        self.spread_crossed_since_trade()
        return

    def compute_spread(self, prices):

        # use ratio spread
        blabling = datetime.now()
        tz = time.tzname
        hey = strftime("%z", gmtime())
        spread = prices[self.symbol1] / prices[self.symbol2]
        if self.spread_smoothing_window is not None:
            self.spread = spread.rolling(self.spread_smoothing_window).mean()
        else:
            self.spread = spread

        roll_mean = spread.rolling(self.window).mean()
        roll_std = spread.rolling(self.window).std()

        self.roll_mean = roll_mean
        self.roll_std = roll_std

        bb_upper = roll_mean + self.bb_multiple * roll_std
        bb_lower = roll_mean - self.bb_multiple * roll_std

        self.bb_upper = bb_upper
        self.bb_lower = bb_lower

    def is_short_trigger(self):
        cond = self.spread.iloc[-1] > self.bb_upper.iloc[-1]
        if cond:
            return True
        return False

    def is_long_trigger(self):
        cond = self.spread.iloc[-1] < self.bb_lower.iloc[-1]
        if cond:
            return True
        return False


class pairsTrader(QCAlgorithm):
    def Initialize(self):
        self.cash = 1_000_000
        self.SetStartDate(2021, 1, 1)  # Set Start Date
        self.SetEndDate(2022, 9, 1)
        self.SetCash(self.cash)  # Set Strategy Cash

        # example of csv file import using google sheets
        url = "https://docs.google.com/spreadsheets/d/e/2PACX-1vRi02Q7O9jJG6Nl04PnpXBld_HhYyRBZchvQocMHnopjCN7jIQz6i1JYRKLPpvu3D5WPOakfEeNxZ-_/pub?gid=0&single=true&output=csv"
        file = self.Download(url)
        symbol_df = pd.read_csv(StringIO(file), header=None)

        self.pairs = list(zip(symbol_df.iloc[:, 0], symbol_df.iloc[:, 1]))

        """self.pairs =  [  
            #('MSFT', 'GOOG'),
            #('MSFT', 'AAPL'),
            #('AAPL', 'NFLX'), 
            ('MSFT', 'IBM'),
            #('MSFT', 'ORCL'),
            #('BRK.B', 'JPM'),
        ]"""

        self.symbols = [elem for iterable in self.pairs for elem in iterable]
        for pair in self.pairs:
            for symbol in pair:
                tmp = self.AddEquity(
                    symbol, Resolution.Minute, Market.USA, True, 0, True
                )
                tmp_symbol = tmp.Symbol
                tmp_symbol.MarginModel = PatternDayTradingMarginModel()
                tmp.SetDataNormalizationMode(DataNormalizationMode.Raw)
                # tmp.SetSlippageModel(CustomSlippageModel(self))

        self.splotName = "Spread"
        sPlot = Chart(self.splotName)
        sPlot.AddSeries(Series("spread", SeriesType.Line, 0))
        sPlot.AddSeries(Series("rollmean", SeriesType.Line, 0))
        sPlot.AddSeries(Series("bb upper", SeriesType.Line, 0))
        sPlot.AddSeries(Series("bb lower", SeriesType.Line, 0))
        self.AddChart(sPlot)
        self.PLOT = False

        #####################################
        # algo parameters

        self.allocation = int(0.5 * self.cash / len(self.pairs))
        self.Debug(f"default allocation to each pair trade: {self.allocation}")
        MarketOnCloseOrder.SubmissionTimeBuffer = timedelta(minutes=10)

        self.bb_multiple = 2.0  # for spread bollinger bands
        self.window = 20  # for spread bollinger bands
        self.spread_smoothing_window = None  # 2
        self.lookback = int(60)  # for price data
        self.history_resolution = Resolution.Daily
        self.max_trade_duration = 7
        self.target_vol = 0.10

        self.spy = self.AddEquity("SPY").Symbol
        self.SetBenchmark(self.spy)

        self.symbol_data = dict()

        #####################################
        # schedule functions

        self.Schedule.On(
            self.DateRules.EveryDay(self.spy), self.TimeRules.At(9, 25), self.get_pairs
        )

        self.Schedule.On(
            self.DateRules.EveryDay(self.spy), self.TimeRules.At(15, 45), self.trade
        )

        self.Schedule.On(
            self.DateRules.EveryDay(self.spy),
            self.TimeRules.At(15, 30),
            self.check_trade_exit,
        )

        self.Schedule.On(
            self.DateRules.EveryDay(self.spy),
            self.TimeRules.At(15, 0),
            self.check_trade_exit,
        )

    #####################################
    # algo functions

    def get_prices(self): 
        """
        get price history and make sure each stock has minimum amount of data 
        """
        prices = self.get_history( 
            self.symbols, self.lookback, Resolution.Daily, "close", "close prices" 
        )
        if prices.empty: 
            return prices
        prices = prices["close"].unstack(level=0).dropna()
        return prices

    def get_my_prices(self): 
        """
        get price history and make sure each stock has minimum amount of data 
        """
        prices = self.get_history( 
            self.symbols, 2, Resolution.Minute, "close", "close prices" 
        )
        if prices.empty: 
            return prices
        prices = prices["close"].unstack(level=0).dropna()
        return prices

    ##################
    def get_history(self, symbol, lookback, resolution, column, data_label):
        """
        - function to download security history and check that the column we need
            exists in the dataframe. if it is missing it retries twice.
        - returns empty series if column not found
        """
        hist = self.History(symbol, lookback, resolution)
        if column not in hist.columns:
            self.Debug(f"{self.Time} {data_label} {column} data missing")
            hist = self.History(symbol, lookback, resolution)
            if column not in hist.columns:
                self.Debug(f"{self.Time} {data_label} {column} data missing")
                hist = self.History(symbol, lookback, resolution)
                if column not in hist.columns:
                    self.Debug(
                        f"{self.Time} {data_label} {column} data missing no trades today"
                    )
                    return pd.Series()
        return hist

    ##################
    def get_pairs(self):
        """
        get all pairs for trading
        """

        if len(self.symbols) < 1:
            self.Debug(f"[{self.Time}] missing symbols")
            return

        prices = self.get_prices()

        if prices.empty:
            return

        for pair in self.pairs:
            price_multiple = 1.0 / (prices[pair[0]].iloc[0] / prices[pair[1]].iloc[0])
            price_multiple = quantize(price_multiple, -1)
            if pair not in self.symbol_data.keys():
                self.symbol_data[pair] = symbolData(
                    pair,
                    price_multiple,
                    self.bb_multiple,
                    self.window,
                    self,
                    self.spread_smoothing_window,
                )

        # self.Debug(f"pairs:\n{pair_df['pair']}")
        return

    ##################
    def pair_can_trade(self, pair):
        """
        make sure the pair is tradeable:
        1) by ensuring the security price is populated
        2) that trade has not been timed out waiting for another crossover
        """
        sd = self.symbol_data[pair]

        for symbol in pair:
            if not self.Securities[symbol].Close > 0:
                self.Debug(f"{self.Time} {symbol} is missing price data")
                return False
        if sd.trade_timeout:
            if not sd.spread_crossed_since_trade():
                self.Log(
                    f"{self.Time} {pair} has trade timed out, waiting for crossover"
                )
                return False
        return True

    ##################
    def buy_spread(self, sd, pair, allocation=None):
        """
        function to buy spread
        """
        if allocation is None:
            allocation = self.allocation
        wt1 = allocation
        wt2 = allocation * -1

        # current shares
        currPair0_shares = self.Portfolio[pair[0]].Quantity
        currPair1_shares = self.Portfolio[pair[1]].Quantity

        pair0_shares = int(wt1 / self.Securities[pair[0]].Close)
        pair1_shares = int(wt2 / self.Securities[pair[1]].Close)

        self.Debug(
            f"LONG spread {sd.price_multiple}: {sd.pair[0]} dv={wt1:.2f} shares={pair0_shares} vs {sd.pair[1]} dv={wt2:.2f} shares={pair1_shares}"
        )
        # send market on close orders instead
        mkto0 = self.MarketOnCloseOrder(pair[0], pair0_shares)
        mkto1 = self.MarketOnCloseOrder(pair[1], pair1_shares)

        # save trade data
        sd.trade_data = pd.DataFrame(
            index=[pair[0], pair[1]], columns=["timestamp", "market_order", "quantity"]
        )
        sd.trade_data.loc[pair[0], :] = (mkto0.Time, mkto0, mkto0.Quantity)
        sd.trade_data.loc[pair[1], :] = (mkto1.Time, mkto1, mkto1.Quantity)

        # set trade flags
        sd.in_trade = True
        sd.long_spread = True
        sd.trade_entry_timestamp = self.Time
        return

    ##################
    def short_spread(self, sd, pair, allocation=None):
        """
        function to short spread
        """
        if allocation is None:
            allocation = self.allocation
        wt1 = allocation * -1
        wt2 = allocation

        # current shares
        currPair0_shares = self.Portfolio[pair[0]].Quantity
        currPair1_shares = self.Portfolio[pair[1]].Quantity

        pair0_shares = int(wt1 / self.Securities[pair[0]].Close)
        pair1_shares = int(wt2 / self.Securities[pair[1]].Close)

        self.Debug(
            f"SHORT spread {sd.price_multiple}: {sd.pair[0]} dv={wt1:.2f} shares={pair0_shares} vs {sd.pair[1]} dv={wt2:.2f} shares={pair1_shares}"
        )
        # send market on close orders instead
        mkto0 = self.MarketOnCloseOrder(pair[0], pair0_shares)
        mkto1 = self.MarketOnCloseOrder(pair[1], pair1_shares)

        # save trade data
        sd.trade_data = pd.DataFrame(
            index=[pair[0], pair[1]], columns=["timestamp", "market_order", "quantity"]
        )
        sd.trade_data.loc[pair[0], :] = (mkto0.Time, mkto0, mkto0.Quantity)
        sd.trade_data.loc[pair[1], :] = (mkto1.Time, mkto1, mkto1.Quantity)

        # set trade flags
        sd.in_trade = True
        sd.short_spread = True
        sd.trade_entry_timestamp = self.Time
        return

        ##################

    def trade(self):
        """
        function to implement trades
        """

        # get prices
        prices = self.get_prices()
        if prices.empty:
            return

        ## MY ADDITION
        myPrices = self.get_my_prices()

        myRatio = myPrices['MSFT R735QTJ8XC9X'].iloc[-1] / myPrices['IBM R735QTJ8XC9X'].iloc[-1]
        self.Debug(myPrices.index[-1])

        for pair in self.symbol_data.copy().keys():

            sd = self.symbol_data[pair]

            # if already in trade check to see if our maximum trade duration is breached
            if sd.in_trade and not sd.trade_timeout:
                sd.check_trade_timeout()

            # compute spread and boundaries
            sd.compute_spread(prices)
            # log rolling annualized spread
            if len(sd.roll_mean.dropna()) > 3:
                anl_vol = sd.roll_mean.pct_change().dropna().std() * np.sqrt(252)
                self.Log(f"{self.Time} {pair} rolling annualized std: {anl_vol:.2%}")
                self.Log(f"spread {sd.spread.iloc[-1] if np.isfinite(sd.spread.iloc[-1]) else 0}")
                self.Log(f"rollmean {sd.roll_mean.iloc[-1] if np.isfinite(sd.roll_mean.iloc[-1]) else 0}")
                self.Log(f"bb upper {sd.bb_upper.iloc[-1] if np.isfinite(sd.bb_upper.iloc[-1]) else 0}")
                self.Log(f"bb lower {sd.bb_lower.iloc[-1] if np.isfinite(sd.bb_lower.iloc[-1]) else 0}")

            if not self.pair_can_trade(pair):
                continue

            # plot spread data for debugging individual pairs
            # NOTE: only works for a single pair set
            if self.PLOT:
                self.Plot(
                    self.splotName,
                    "spread",
                    sd.spread.iloc[-1] if np.isfinite(sd.spread.iloc[-1]) else 0,
                )
                self.Plot(
                    self.splotName,
                    "rollmean",
                    sd.roll_mean.iloc[-1] if np.isfinite(sd.roll_mean.iloc[-1]) else 0,
                )
                self.Plot(
                    self.splotName,
                    "bb upper",
                    sd.bb_upper.iloc[-1] if np.isfinite(sd.bb_upper.iloc[-1]) else 0,
                )
                self.Plot(
                    self.splotName,
                    "bb lower",
                    sd.bb_lower.iloc[-1] if np.isfinite(sd.bb_lower.iloc[-1]) else 0,
                )

            # if not in trade for this pair
            if not sd.in_trade:
                allocation = (
                    self.Portfolio.TotalPortfolioValue
                    * self.target_vol
                    / anl_vol
                    / len(self.symbols)
                )
                allocation = quantize(allocation)
                # is buy trigger?

                

                if sd.is_long_trigger():
                    self.Debug(f"{self.Time}::{pair} {anl_vol:.2%} ${allocation:,}")
                    self.buy_spread(sd, pair, allocation)

                # or short triggered?
                elif sd.is_short_trigger():
                    self.Debug(f"{self.Time}::{pair} {anl_vol:.2%} ${allocation:,}")
                    self.short_spread(sd, pair, allocation)
        return

    def check_trade_exit(self):
        # get prices
        prices = self.get_prices()
        if prices.empty:
            return

        # no trades after 4pm RTH
        if self.Time.hour >= 16:
            return

        for pair in self.symbol_data.copy().keys():

            sd = self.symbol_data[pair]

            # if already in trade check to see if our maximum trade duration is breached
            if sd.in_trade and not sd.trade_timeout:
                sd.check_trade_timeout()

            # compute spread and boundaries
            sd.compute_spread(prices)

            if not self.pair_can_trade(pair):
                continue

            # if already in trade for this pair
            if sd.in_trade:

                # if long spread check that spread is >= roll mean
                if sd.long_spread:
                    spread_gt_mean = sd.spread.iloc[-1] >= sd.roll_mean.iloc[-1]

                    if spread_gt_mean:
                        for symbol in sd.pair:
                            qty = sd.trade_data.loc[symbol, "quantity"]
                            # exit only the quantity of shares involved in that trade
                            self.MarketOrder(
                                symbol,
                                -1 * qty,
                                tag=f"TP LONG SPREAD::{sd.pair[0]}-{sd.pair[1]} | sprd: {sd.spread.iloc[-1]:.2f} rollmean: {sd.roll_mean.iloc[-1]:.2f}",
                            )
                        sd.reset_trade_flags()
                        self.Debug(
                            f"tp liquidating long spread: {sd.pair[0]} vs {sd.pair[1]} | sprd: {sd.spread.iloc[-1]:.2f} rollmean: {sd.roll_mean.iloc[-1]:.2f}"
                        )

                        # else if short spread check that spread is <= roll mean
                elif sd.short_spread:
                    spread_lt_mean = sd.spread.iloc[-1] <= sd.roll_mean.iloc[-1]

                    if spread_lt_mean:
                        for symbol in sd.pair:
                            # exit only the quantity of shares involved in that trade
                            qty = sd.trade_data.loc[symbol, "quantity"]
                            self.MarketOrder(
                                symbol,
                                -1 * qty,
                                tag=f"TP SHORT SPREAD::{sd.pair[0]}-{sd.pair[1]} | sprd: {sd.spread.iloc[-1]:.2f} rollmean: {sd.roll_mean.iloc[-1]:.2f}",
                            )

                        sd.reset_trade_flags()
                        self.Debug(
                            f"tp liquidating short spread: {sd.pair[0]} vs {sd.pair[1]} | sprd: {sd.spread.iloc[-1]:.2f} rollmean: {sd.roll_mean.iloc[-1]:.2f}"
                        )

                        ##################

    def OnData(self, data):
        """OnData event is the primary entry point for your algorithm. Each new data point will be pumped in here.
        Arguments:
            data: Slice object keyed by symbol containing the stock data
        """
        bla = self.Time
        pass