Overall Statistics
Total Trades
10394
Average Win
0.03%
Average Loss
-0.03%
Compounding Annual Return
10.070%
Drawdown
2.500%
Expectancy
0.067
Net Profit
9.648%
Sharpe Ratio
2.336
Probabilistic Sharpe Ratio
97.351%
Loss Rate
48%
Win Rate
52%
Profit-Loss Ratio
1.04
Alpha
0.066
Beta
0.023
Annual Standard Deviation
0.03
Annual Variance
0.001
Information Ratio
-0.335
Tracking Error
0.279
Treynor Ratio
3.063
Total Fees
$13519.59
Estimated Strategy Capacity
$3600000.00
Lowest Capacity Asset
NGG ROOA6XQS4GH1
# region imports
import numpy as np
from AlgorithmImports import *
from decimal import Decimal
from io import StringIO
from collections import deque
from time import gmtime, strftime

# endregion
"""

outline:
    get pairs monthly
        every day compute the spread using the computed weights
        if spread is above mean, buy spread
            reverse position when spread crossovers the mean
        if spread is below mean, sell spread
            reverse position when spread crossovers the mean

new features:
- don't reenter spreads from timeout until it has crossed the sma at least once
 |- how to do it? store trades, exit type, and flag as not tradeable until currpx > sma for long and vice versa    
 |- updated logic so that spread is calculated on every interval to check spread crossing after trade timeout   
"""


class StockDataSource(PythonData):
    def GetSource(self, config, date, isLiveMode):
        ######## DEBUG
        # self.Log("in def GetSource")
        ######## DEBUG
        url = (
            # "https://www.dropbox.com/s/2l73mu97gcehmh7/daily-stock-picker-live.csv?dl=1" # Brian's
            # "https://www.dropbox.com/s/t4nafyooof485h7/pair_csv.csv?dl=1" # Richard's
            # "https://www.dropbox.com/s/v2ipmdc5ykziwfm/symbols%20-%20Sheet1.csv?dl=1" # Richard's 2
            # "https://docs.google.com/spreadsheets/d/e/2PACX-1vRi02Q7O9jJG6Nl04PnpXBld_HhYyRBZchvQocMHnopjCN7jIQz6i1JYRKLPpvu3D5WPOakfEeNxZ-_/pub?gid=0&single=true&output=csv" # Brian's
            "https://docs.google.com/spreadsheets/d/1jtMR6fAewQzDb08cg5qsCzXNliq5zpTvWCm7goVdDMc/export?format=csv"  # Richard's
            if isLiveMode
            # else "https://docs.google.com/spreadsheets/d/e/2PACX-1vRi02Q7O9jJG6Nl04PnpXBld_HhYyRBZchvQocMHnopjCN7jIQz6i1JYRKLPpvu3D5WPOakfEeNxZ-_/pub?gid=0&single=true&output=csv" # Brian's
            else "https://docs.google.com/spreadsheets/d/1dl314lKv-ZGCOExmgNgcmHfc_cchmgDr7jHzcHWnfQk/export?format=csv"
            # Richard's
        )

        return SubscriptionDataSource(url, SubscriptionTransportMedium.RemoteFile)

    def Reader(self, config, line, date, isLiveMode):
        ######## DEBUG
        # self.Log("in def Reader")
        ######## DEBUG
        if not (line.strip() and line[0].isdigit()):
            return None

        stocks = StockDataSource()
        stocks.Symbol = config.Symbol

        def get_point_in_time_symbol(ticker):
            sid = SecurityIdentifier.GenerateEquity(
                ticker, Market.USA, mappingResolveDate=date
            )
            return Symbol(sid, ticker)

        csv = line.split(",")
        if isLiveMode:
            stocks.EndTime = Extensions.ConvertFromUtc(
                datetime.utcnow(), config.ExchangeTimeZone
            )
            stocks["Symbols"] = [get_point_in_time_symbol(t) for t in csv[1:]]
        else:
            stocks.Time = datetime.strptime(csv[0], "%Y-%m-%d")
            stocks["Symbols"] = [get_point_in_time_symbol(t) for t in csv[1:]]
        return stocks


def quantize(number, digits=-2):
    """
    convert messy floating point to clean num of digits
    """
    num_places = Decimal(10) ** digits
    q = Decimal(number).quantize(num_places)
    q = float(q)
    return q


# Custom slippage implementation
class CustomSlippageModel:
    def __init__(self, algorithm):
        self.algorithm = algorithm

    def GetSlippageApproximation(self, asset, order):
        # custom slippage math
        slippage = asset.Price * 0.0001 * np.log10(2 * float(order.AbsoluteQuantity))
        self.algorithm.Debug(f"CustomSlippageModel: {slippage:.2f}")
        return slippage


class symbolData:
    def __init__(
        self,
        pair,
        bb_multiple,
        window,
        algorithm,
    ):
        self.pair = pair
        self.symbol1 = self.pair[0]
        self.symbol2 = self.pair[1]
        self.bb_multiple = bb_multiple
        self.window = window
        self.spread = None
        self.intraday_spread = None
        self.roll_mean = None
        self.roll_std = None
        self.bb_upper = None
        self.bb_lower = None
        self.long_spread = False
        self.short_spread = False
        self.in_trade = False
        self.trade_entry_timestamp = None
        self.algo = algorithm
        self.trade_data = None
        self.trade_timeout = False

    def reset_trade_flags(self):
        self.long_spread = False
        self.short_spread = False
        self.in_trade = False
        self.trade_entry_timestamp = None
        self.trade_data = None
        self.trade_timeout = False
        return

    def spread_crossed_since_trade(self):
        """this should be used only if trade is timed out"""

        # time diff
        ts = self.trade_data["timestamp"].iloc[0]
        tmp_spread = self.spread.loc[ts:]
        tmp_roll_mean = self.spread.rolling(self.window).mean().loc[ts:]

        if self.long_spread:
            if any(tmp_spread > tmp_roll_mean):
                self.reset_trade_flags()
        elif self.short_spread:
            if any(tmp_spread < tmp_roll_mean):
                self.reset_trade_flags()

    def check_trade_timeout(self):
        if (self.in_trade) and (self.trade_entry_timestamp is not None):
            # numpy.busday_count(start, end)
            trade_duration = np.busday_count(
                self.trade_entry_timestamp.strftime("%Y-%m-%d"),
                self.algo.Time.strftime("%Y-%m-%d"),
            )
            if trade_duration >= self.algo.max_trade_duration:

                for symbol in self.pair:
                    # how many shares do we hold of the symbol in this pair trade
                    # liquidate only the amount for that pair trade
                    held_quantity = self.trade_data.loc[symbol, "quantity"]
                    if self.long_spread:
                        self.algo.MarketOnCloseOrder(
                            symbol,
                            -1 * held_quantity,
                            tag=f"TRADE TIMEOUT LONG SPREAD::{str(self.pair[0])}-{str(self.pair[1])}",
                        )

                        # need to track over time to confirm that spread crosses sma before anymore long positions
                        # cannot reset all flags before this happens
                        self.trade_timeout = True

                    elif self.short_spread:
                        self.algo.MarketOnCloseOrder(
                            symbol,
                            -1 * held_quantity,
                            tag=f"TRADE TIMEOUT SHORT SPREAD::{str(self.pair[0])}-{str(self.pair[1])}",
                        )

                        # need to track over time to confirm that spread crosses sma before anymore short positions
                        # cannot reset all flags before this happens
                        self.trade_timeout = True

                # put the check after both symbols have been processed so as not to reset the trade flags prematurely
                self.spread_crossed_since_trade()
        return

    def compute_spread(self, prices, intraday_prices):

        # use ratio spread
        self.spread = prices[self.symbol1] / prices[self.symbol2]
        # intraday spread for entering positions
        for symbol in self.pair:
            if symbol not in intraday_prices.columns:
                self.algo.Debug(f"{self.algo.Time} {symbol} not found in prices")
                return

        self.intraday_spread = (
            intraday_prices[self.symbol1].iloc[-1]
            / intraday_prices[self.symbol2].iloc[-1]
        )

        self.roll_mean = self.spread.rolling(self.window).mean()
        self.roll_std = self.spread.rolling(self.window).std()

        bb_upper = self.roll_mean + self.bb_multiple * self.roll_std
        bb_lower = self.roll_mean - self.bb_multiple * self.roll_std

        self.bb_upper = bb_upper
        self.bb_lower = bb_lower
        return

    def is_short_trigger(self):
        cond = self.intraday_spread > self.bb_upper.iloc[-1]
        if cond:
            return True
        return False

    def is_long_trigger(self):
        cond = self.intraday_spread < self.bb_lower.iloc[-1]
        if cond:
            return True
        return False

    ##################
    def pair_can_trade(self):
        """
        make sure the pair is tradeable:
        1) by ensuring the security price is populated
        2) that trade has not been timed out waiting for another crossover
        """

        for symbol in self.pair:
            if not self.algo.Securities[symbol].Close > 0:
                self.algo.Debug(f"{self.algo.Time} {symbol} is missing price data")
                return False
        if self.trade_timeout:
            if not self.spread_crossed_since_trade():
                self.algo.Log(
                    f"{self.algo.Time} {[str(p) for p in self.pair]} has trade timed out, waiting for crossover"
                )
                return False
        return True

    ##################
    def buy_spread(self, allocation=None):
        """
        function to buy spread
        """
        if allocation is None:
            allocation = self.algo.allocation
        wt1 = allocation
        wt2 = allocation * -1

        pair0_shares = int(wt1 / self.algo.Securities[self.pair[0]].Close)
        pair1_shares = int(wt2 / self.algo.Securities[self.pair[1]].Close)

        self.algo.Debug(
            f"LONG spread : {str(self.pair[0])} dv={wt1:.2f} shares={pair0_shares} vs {str(self.pair[1])} dv={wt2:.2f} shares={pair1_shares}"
        )
        # send market on close orders instead
        mkto0 = self.algo.MarketOnCloseOrder(
            self.pair[0],
            pair0_shares,
            tag=f"TRADE ENTRY LONG SPREAD::{str(self.pair[0])}-{str(self.pair[1])}",
        )
        mkto1 = self.algo.MarketOnCloseOrder(
            self.pair[1],
            pair1_shares,
            tag=f"TRADE ENTRY LONG SPREAD::{str(self.pair[0])}-{str(self.pair[1])}",
        )

        # save trade data
        self.trade_data = pd.DataFrame(
            index=[self.pair[0], self.pair[1]],
            columns=["timestamp", "market_order", "quantity"],
        )
        self.trade_data.loc[self.pair[0], :] = (mkto0.Time, mkto0, mkto0.Quantity)
        self.trade_data.loc[self.pair[1], :] = (mkto1.Time, mkto1, mkto1.Quantity)

        # set trade flags
        self.in_trade = True
        self.long_spread = True
        self.trade_entry_timestamp = self.algo.Time
        return

    ##################
    def sell_spread(self, allocation=None):
        """
        function to short spread
        """
        if allocation is None:
            allocation = self.algo.allocation
        wt1 = allocation * -1
        wt2 = allocation

        pair0_shares = int(wt1 / self.algo.Securities[self.pair[0]].Close)
        pair1_shares = int(wt2 / self.algo.Securities[self.pair[1]].Close)

        self.algo.Debug(
            f"SHORT spread : {str(self.pair[0])} dv={wt1:.2f} shares={pair0_shares} vs {str(self.pair[1])} dv={wt2:.2f} shares={pair1_shares}"
        )
        # send market on close orders instead
        mkto0 = self.algo.MarketOnCloseOrder(
            self.pair[0],
            pair0_shares,
            tag=f"TRADE ENTRY SHORT SPREAD::{str(self.pair[0])}-{str(self.pair[1])}",
        )
        mkto1 = self.algo.MarketOnCloseOrder(
            self.pair[1],
            pair1_shares,
            tag=f"TRADE ENTRY SHORT SPREAD::{str(self.pair[0])}-{str(self.pair[1])}",
        )

        # save trade data
        self.trade_data = pd.DataFrame(
            index=[self.pair[0], self.pair[1]],
            columns=["timestamp", "market_order", "quantity"],
        )
        self.trade_data.loc[self.pair[0], :] = (mkto0.Time, mkto0, mkto0.Quantity)
        self.trade_data.loc[self.pair[1], :] = (mkto1.Time, mkto1, mkto1.Quantity)

        # set trade flags
        self.in_trade = True
        self.short_spread = True
        self.trade_entry_timestamp = self.algo.Time
        return


###################################################################################################
# algo class
###################################################################################################
class pairsTrader(QCAlgorithm):
    def Initialize(self):
        self.cash = 1_000_000
        self.SetStartDate(2020, 1, 1)  # Set Start Date
        self.SetEndDate(2022, 4, 1)
        self.SetCash(self.cash)  # Set Strategy Cash

        self.UniverseSettings.ExtendedMarketHours = True
        self.UniverseSettings.DataNormalizationMode = DataNormalizationMode.Adjusted

        self.pairs = None
        self.symbols = None

        self.AddUniverse(StockDataSource, "my-stock-data-source", self.stockDataSource)

        self.splotName = "Spread"
        sPlot = Chart(self.splotName)
        sPlot.AddSeries(Series("spread", SeriesType.Line, 0))
        sPlot.AddSeries(Series("rollmean", SeriesType.Line, 0))
        sPlot.AddSeries(Series("bb upper", SeriesType.Line, 0))
        sPlot.AddSeries(Series("bb lower", SeriesType.Line, 0))
        self.AddChart(sPlot)
        self.PLOT = False

        self.pPlotName = "Concurrent Positions"
        pPlot = Chart(self.pPlotName)
        pPlot.AddSeries(Series("num positions", SeriesType.Line, 0))
        self.AddChart(pPlot)

        #####################################
        # algo parameters

        # self.allocation = int(0.5 * self.cash / 2)  # len(self.pairs))
        self.allocation = 5000  # for a simple dollar amount
        self.use_default_allocation = True  # flag that tells whether or not to use default allocation on every trade
        self.Debug(f"default allocation to each pair trade: {self.allocation}")
        MarketOnCloseOrder.SubmissionTimeBuffer = timedelta(minutes=10)

        self.bb_multiple = 2.0  # for spread bollinger bands
        self.window = 20  # for spread bollinger bands
        self.lookback = int(60)  # for price data
        self.intraday_lookback = 100  # for intraday history call
        self.portfolio_equity_history = list()  # for anl vol of port
        self.portfolio_anl_vol_lookback = 60  # for anl vol of port
        self.history_resolution = Resolution.Daily
        self.max_trade_duration = 7
        self.target_vol = 0.10
        self.same_day = False

        self.spy = self.AddEquity("SPY").Symbol
        self.SetBenchmark(self.spy)

        self.symbol_data = dict()

        #####################################
        # schedule functions

        self.Schedule.On(
            self.DateRules.EveryDay(self.spy),
            self.TimeRules.At(9, 30),
            self.plot_num_positions,
        )

        self.Schedule.On(
            self.DateRules.EveryDay(self.spy),
            self.TimeRules.At(9, 25),
            self.get_pairs
            # self.DateRules.EveryDay(self.spy), self.TimeRules.At(10, 8), self.get_pairs,
        )

        self.Schedule.On(
            self.DateRules.EveryDay(self.spy), self.TimeRules.At(15, 45), self.trade
        )

        self.Schedule.On(
            self.DateRules.EveryDay(self.spy),
            self.TimeRules.At(15, 30),
            self.check_trade_exit,
        )

        self.Schedule.On(
            self.DateRules.EveryDay(self.spy),
            self.TimeRules.At(15, 59),
            self.end_of_day,
        )

    #####################################
    # algo functions

    def end_of_day(self):
        self.same_day = False
        return

    def stockDataSource(self, data):
        if self.same_day:
            return Universe.Unchanged

        ######## DEBUG
        self.Log("in def stockDataSource")
        ######## DEBUG

        self.symbols = [symbol for item in data for symbol in item["Symbols"]]
        self.pairs = []

        for i, v in enumerate(self.symbols, start=1):
            if i % 2 == 0:
                chunk = self.symbols[i - 2 : i]
                pair = []
                for symbol in chunk:
                    pair.append(symbol)
                pair = tuple(pair)
                self.pairs.append(pair)

        self.same_day = True
        self.Debug(
            f"{self.Time} pairs: {[[str(p) for p in tup] for tup in self.pairs]}"
        )
        return self.symbols

    def get_daily_prices(self):
        """
        get price history and make sure each stock has minimum amount of data
        """
        prices = self.get_history(
            self.symbols, self.lookback, Resolution.Daily, "close", "close prices"
        )
        if prices.empty:
            return prices

        # TODO: CHECK LOGIC HERE
        prices = prices["close"].unstack(level=0)
        # ensure enough data is returned
        symbols_with_insufficient_data = prices.loc[
            :, prices.count() < self.window
        ].columns
        if len(symbols_with_insufficient_data) > 0:
            self.Debug(
                f"{self.Time}::symbols with insufficient price data: {symbols_with_insufficient_data}"
            )
            # remove pairs from symbol data
            remove_from_prices = []
            for pair in self.symbol_data.keys():
                if any(symbol in pair for symbol in symbols_with_insufficient_data):
                    del self.symbol_data[pair]
                    # append pair to list
                    remove_from_prices.append(pair)
            # flatten list
            flat_removal_list = pd.Index(
                [item for sublist in remove_from_prices for item in sublist]
            )
            # remove the pair from prices
            prices = prices[~flat_removal_list].dropna()
            # remove the pair from symbols
            self.symbols = [x for x in self.symbols if x not in flat_removal_list]
        return prices.dropna()

    def get_intraday_prices(self):
        """
        get price history and make sure each stock has minimum amount of data
        """
        prices = self.get_history(
            self.symbols,
            self.intraday_lookback,
            Resolution.Minute,
            "close",
            "close prices",
        )
        if prices.empty:
            return prices
        prices = prices["close"].unstack(level=0).dropna()
        return prices

    ##################
    def get_history(self, symbol, lookback, resolution, column, data_label):
        """
        - function to download security history and check that the column we need
            exists in the dataframe. if it is missing it retries twice.
        - returns empty series if column not found
        """
        hist = self.History(symbol, lookback, resolution)
        if column not in hist.columns:
            self.Debug(f"{self.Time} {[str(s) for s in symbol]} data missing")
            hist = self.History(symbol, lookback, resolution)
            if column not in hist.columns:
                self.Debug(f"{self.Time} {[str(s) for s in symbol]} data missing")
                hist = self.History(symbol, lookback, resolution)
                if column not in hist.columns:
                    self.Debug(
                        f"{self.Time} {[str(s) for s in symbol]} data missing no trades today"
                    )
                    return pd.Series()
        return hist

    ##################
    def plot_num_positions(self):
        """function to plot the daily number of concurrent positions"""
        if len(self.symbol_data.keys()) < 1:
            return

        num_concurrent_positions = 0
        for k in self.symbol_data.keys():
            if self.symbol_data[k].in_trade:
                num_concurrent_positions += 1
        self.Plot(
            self.pPlotName,
            "num positions",
            num_concurrent_positions if np.isfinite(num_concurrent_positions) else 0,
        )
        return

        ##################

    def manage_pair_universe(self):
        """
        remove pairs no longer in universe and liquidate if in any trades
        """
        current_pairs = list(self.symbol_data.keys())
        for pair in current_pairs:
            sd = self.symbol_data[pair]
            if pair not in self.pairs:
                if sd.in_trade:
                    for symbol in sd.pair:
                        qty = sd.trade_data.loc[symbol, "quantity"]
                        self.MarketOnCloseOrder(
                            symbol, -1 * qty, tag="symbol no longer in universe"
                        )
                del self.symbol_data[pair]
        return

    ##################
    def get_pairs(self):
        """
        get all pairs for trading
        """

        if self.symbols is None:
            self.Debug(f"[{self.Time}] missing symbols inside get_pairs")
            return

        # manage change in pair universe
        self.manage_pair_universe()

        prices = self.get_daily_prices()

        if prices.empty:
            return

        for pair in self.pairs:
            if pair not in self.symbol_data.keys():
                self.symbol_data[pair] = symbolData(
                    pair,
                    self.bb_multiple,
                    self.window,
                    self,
                )

        # self.Debug(f"pairs:\n{pair_df['pair']}")
        return

        ##################

    def calc_and_log_portfolio_anl_vol(self):
        # accumulate portfolio equity to log rolling annualized vol of portfolio
        self.portfolio_equity_history.append(
            (self.Time, self.Portfolio.TotalPortfolioValue)
        )
        eq_s = pd.DataFrame(self.portfolio_equity_history).set_index(0)[
            1
        ]  # .drop_duplicates()
        if len(eq_s) >= self.portfolio_anl_vol_lookback:
            anl_vol = eq_s.pct_change().dropna().std() * np.sqrt(252)
            self.Log(f"{self.Time} PORTFOLIO rolling annualized std: {anl_vol:.2%}")
        return
        ##################

    def calc_and_log_pair_anl_vol(self, pair, symbol_data):
        if len(symbol_data.roll_mean.dropna()) < 3:
            self.Log(
                f"{self.Time}::{[str(p) for p in pair]} is missing sufficient data"
            )
            return 0
        else:
            anl_vol = symbol_data.roll_mean.pct_change().dropna().std() * np.sqrt(252)
            self.Log(
                f"{self.Time} {[str(p) for p in pair]} rolling annualized std: {anl_vol:.2%}"
            )
            self.Log(
                f"spread {symbol_data.spread.iloc[-1] if np.isfinite(symbol_data.spread.iloc[-1]) else 0}"
            )
            self.Log(
                f"rollmean {symbol_data.roll_mean.iloc[-1] if np.isfinite(symbol_data.roll_mean.iloc[-1]) else 0}"
            )
            self.Log(
                f"bb upper {symbol_data.bb_upper.iloc[-1] if np.isfinite(symbol_data.bb_upper.iloc[-1]) else 0}"
            )
            self.Log(
                f"bb lower {symbol_data.bb_lower.iloc[-1] if np.isfinite(symbol_data.bb_lower.iloc[-1]) else 0}"
            )
            return anl_vol
        ##################

    def trade(self):
        """
        function to implement trades
        """
        # accumulate portfolio equity to log rolling annualized vol of portfolio
        self.calc_and_log_portfolio_anl_vol()

        if self.symbols is None:
            self.Debug(f"[{self.Time}] missing symbols inside trade")
            return

        # get prices
        prices = self.get_daily_prices()
        if prices.empty:
            return

        # get intraday prices for trade entry and exit
        intraday_prices = self.get_intraday_prices()
        if intraday_prices.empty:
            return

        for pair in self.symbol_data.copy().keys():

            sd = self.symbol_data[pair]

            # if already in trade check to see if our maximum trade duration is breached
            if sd.in_trade and not sd.trade_timeout:
                sd.check_trade_timeout()

            # compute spread and boundaries
            sd.compute_spread(prices, intraday_prices)

            # log annualized vol for pair
            pair_anl_vol = self.calc_and_log_pair_anl_vol(pair, sd)
            # need a minimum number of vol
            if pair_anl_vol < 0.1:
                pair_anl_vol = 0.1

            if not sd.pair_can_trade():
                continue

            # plot spread data for debugging individual pairs
            # NOTE: only works for a single pair set
            if self.PLOT:
                self.Plot(
                    self.splotName,
                    "spread",
                    sd.spread.iloc[-1] if np.isfinite(sd.spread.iloc[-1]) else 0,
                )
                self.Plot(
                    self.splotName,
                    "rollmean",
                    sd.roll_mean.iloc[-1] if np.isfinite(sd.roll_mean.iloc[-1]) else 0,
                )
                self.Plot(
                    self.splotName,
                    "bb upper",
                    sd.bb_upper.iloc[-1] if np.isfinite(sd.bb_upper.iloc[-1]) else 0,
                )
                self.Plot(
                    self.splotName,
                    "bb lower",
                    sd.bb_lower.iloc[-1] if np.isfinite(sd.bb_lower.iloc[-1]) else 0,
                )

            # if not in trade for this pair
            if not sd.in_trade:
                if not self.use_default_allocation:
                    allocation = (
                        self.Portfolio.TotalPortfolioValue
                        * self.target_vol
                        / pair_anl_vol
                        / len(self.symbols)
                    )
                    allocation = quantize(allocation)
                else:
                    allocation = self.allocation
                # is buy trigger?

                if sd.is_long_trigger():
                    self.Debug(
                        f"{self.Time}::{[str(p) for p in pair]} {pair_anl_vol:.2%} ${allocation:,}"
                    )
                    sd.buy_spread(allocation)

                # or short triggered?
                elif sd.is_short_trigger():
                    self.Debug(
                        f"{self.Time}::{[str(p) for p in pair]} {pair_anl_vol:.2%} ${allocation:,}"
                    )
                    sd.sell_spread(allocation)
        return

    def check_trade_exit(self):

        if self.symbols is None:
            self.Debug(f"[{self.Time}] missing symbols inside check_trade_exit")
            return

        # get prices
        prices = self.get_daily_prices()
        if prices.empty:
            return

        # get intraday prices for trade entry and exit
        intraday_prices = self.get_intraday_prices()
        if intraday_prices.empty:
            return

        # no trades after 4pm RTH
        if self.Time.hour >= 16:
            return

        for pair in self.symbol_data.copy().keys():

            sd = self.symbol_data[pair]

            # if already in trade check to see if our maximum trade duration is breached
            if sd.in_trade and not sd.trade_timeout:
                sd.check_trade_timeout()

            # compute spread and boundaries
            sd.compute_spread(prices, intraday_prices)

            if not sd.pair_can_trade():
                continue

            # if already in trade for this pair
            if sd.in_trade:

                # if long spread check that spread is >= roll mean
                if sd.long_spread:
                    spread_gt_mean = sd.intraday_spread >= sd.roll_mean.iloc[-1]

                    if spread_gt_mean:
                        for symbol in sd.pair:
                            qty = sd.trade_data.loc[symbol, "quantity"]
                            # exit only the quantity of shares involved in that trade
                            self.MarketOnCloseOrder(
                                symbol,
                                -1 * qty,
                                tag=f"TP LONG SPREAD::{str(sd.pair[0])}-{str(sd.pair[1])} | sprd: {sd.intraday_spread:.2f} rollmean: {sd.roll_mean.iloc[-1]:.2f}",
                            )
                        sd.reset_trade_flags()
                        self.Debug(
                            f"tp liquidating long spread: {str(sd.pair[0])} vs {str(sd.pair[1])} | sprd: {sd.intraday_spread:.2f} rollmean: {sd.roll_mean.iloc[-1]:.2f}"
                        )
                # else if short spread check that spread is <= roll mean
                elif sd.short_spread:
                    spread_lt_mean = sd.intraday_spread <= sd.roll_mean.iloc[-1]

                    if spread_lt_mean:
                        for symbol in sd.pair:
                            # exit only the quantity of shares involved in that trade
                            qty = sd.trade_data.loc[symbol, "quantity"]
                            self.MarketOnCloseOrder(
                                symbol,
                                -1 * qty,
                                tag=f"TP SHORT SPREAD::{str(sd.pair[0])}-{str(sd.pair[1])} | sprd: {sd.intraday_spread:.2f} rollmean: {sd.roll_mean.iloc[-1]:.2f}",
                            )

                        sd.reset_trade_flags()
                        self.Debug(
                            f"tp liquidating short spread: {str(sd.pair[0])} vs {str(sd.pair[1])} | sprd: {sd.intraday_spread:.2f} rollmean: {sd.roll_mean.iloc[-1]:.2f}"
                        )
                        ##################

    def OnSecuritiesChanged(self, changes):
        self._changes = changes
        for security in changes.AddedSecurities:
            security.MarginModel = PatternDayTradingMarginModel()
        self.Debug(self.Time)

    def OnData(self, data):
        """OnData event is the primary entry point for your algorithm. Each new data point will be pumped in here.
        Arguments:
            data: Slice object keyed by symbol containing the stock data
        """
        ############### DEBUG
        # if self.Time == datetime(2020, 8, 6, 15, 29):
        #     if 2==2:
        #         pass
        ###############
        if self._changes is None:
            return

    def OnOrderEvent(self, orderEvent):
        """
        This function is triggered automatically every time an order event occurs.
        """
        self.Log(str(orderEvent))
        # if orderEvent.Status == OrderStatus.Filled:
        #     self.Debug(str(orderEvent))