Overall Statistics
Total Orders
3162
Average Win
0.07%
Average Loss
-0.06%
Compounding Annual Return
-1.911%
Drawdown
16.900%
Expectancy
-0.100
Start Equity
50000
End Equity
45394.40
Net Profit
-9.211%
Sharpe Ratio
-0.713
Sortino Ratio
-0.201
Probabilistic Sharpe Ratio
0.199%
Loss Rate
61%
Win Rate
39%
Profit-Loss Ratio
1.28
Alpha
-0.03
Beta
-0.001
Annual Standard Deviation
0.042
Annual Variance
0.002
Information Ratio
-0.656
Tracking Error
0.164
Treynor Ratio
42.116
Total Fees
$3193.95
Estimated Strategy Capacity
$3300000.00
Lowest Capacity Asset
IVV RUTTRZ1RC7L1
Portfolio Turnover
91.50%
#region imports
from AlgorithmImports import *
#endregion


class ArbitrageAlphaModel(AlphaModel):
    """
    This class monitors the intraday bid and ask prices of two correlated ETFs. When the bid price of ETF A (B) diverts 
    high enough away from the ask price of ETF B (A) such that the profit_pct_threshold is reached, we start a timer. If 
    the arbitrage opportunity is still present after the specified timesteps, we enter the arbitrage trade by going long 
    ETF B (A) and short ETF A (B). When the spread reverts back to where the bid of ETF B (A) >= the ask of ETF A (B) for 
    the same number of timesteps, we exit the trade. To address a trending historical spread between the two ETFs, we 
    adjust the spread by removing the mean spread over a lookback window.
    """
    _symbols = [] # IVV, SPY
    _entry_timer = [0, 0]
    _exit_timer = [0, 0]
    _spread_adjusters = [0, 0]
    _long_side = -1
    _consolidators = {}
    _history = {}
    
    def __init__(self, order_delay = 3, profit_pct_threshold = 0.02, window_size = 400):
        """
        Input:
         - order_delay
            The number of timesteps to wait while an arbitrage opportunity is present before emitting insights
            (>= 0)
         - profit_pct_threshold
            The amount of adjusted profit there must be in an arbitrage opportunity to signal a potential entry
            (> 0)
         - window_size
            The length of the lookback window used to calculate the spread adjusters (to address the trending spread b/w ETFs)
            (> 0)
        """
        self._order_delay = order_delay
        self._pct_threshold = profit_pct_threshold / 100
        self._window_size = window_size
        self._consolidated_update = 0
    
    def update(self, algorithm, data):
        """
        Called each time our alpha model receives a new data slice.
        
        Input:
         - algorithm
            Algorithm instance running the backtest
         - data
            Data for the current time step in the backtest
            
        Returns a list of Insights to the portfolio construction model.
        """
        if algorithm.is_warming_up:
            return []
        
        quotebars = self._get_quotebars(data)
        if not quotebars:
            return []

        # Ensure we are not within 5 minutes of either the open or close
        exchange = algorithm.securities['SPY'].exchange
        if (not exchange.date_time_is_open(algorithm.time - timedelta(minutes=5)) and 
            exchange.date_time_is_open(algorithm.time + timedelta(minutes=5))):
            return []
        
        # Search for entries
        for i in range(2):
            if quotebars[abs(i-1)].bid.close / quotebars[i].ask.close - self._spread_adjusters[abs(i-1)] >= self._pct_threshold:
                self._entry_timer[i] += 1
                if self._entry_timer[i] == self._order_delay:
                    self._exit_timer = [0, 0]
                    if self._long_side == i:
                        return []
                    self._long_side = i
                    return [Insight.price(self._symbols[i], timedelta(days=9999), InsightDirection.UP),
                            Insight.price(self._symbols[abs(i-1)], timedelta(days=9999), InsightDirection.DOWN)]
                else:
                    return []
            self._entry_timer[i] = 0
            
        # Search for an exit
        if self._long_side >= 0: # In a position
            if quotebars[self._long_side].bid.close / quotebars[abs(self._long_side-1)].ask.close - self._spread_adjusters[self._long_side] >= 0: # Exit signal
                self._exit_timer[self._long_side] += 1
                if self._exit_timer[self._long_side] == self._order_delay: # Exit signal lasted long enough
                    self._exit_timer[self._long_side] = 0
                    i = self._long_side
                    self._long_side = -1
                    return [Insight.price(self._symbols[i], timedelta(days=9999), InsightDirection.FLAT),
                            Insight.price(self._symbols[abs(i-1)], timedelta(days=9999), InsightDirection.FLAT)]
                else:
                    return []
        return []
        
    def on_securities_changed(self, algorithm, changes):
        """
        Called each time our universe has changed.
        
        Inputs:
         - algorithm
            Algorithm instance running the backtest
         - changes
            The additions and subtractions to the algorithm's security subscriptions
        """
        added_symbols = [security.symbol for security in changes.added_securities]
        if len(added_symbols) > 0:
            self._symbols.extend(added_symbols)
            
            if len(self._symbols) != 2:
                algorithm.error(f"ArbitrageAlphaModel must have 2 symbols to trade")
                algorithm.quit()
                return
            
            history = algorithm.history(self._symbols, self._window_size, Resolution.SECOND)[['bidclose', 'askclose']]
            starting_row_count = min([history.loc[symbol].shape[0] for symbol in self._symbols])

            for symbol in self._symbols:
                self._history[symbol] = {'bids': history.loc[symbol].bidclose.to_numpy()[-starting_row_count:],
                                        'asks': history.loc[symbol].askclose.to_numpy()[-starting_row_count:]}

            self._update_spread_adjusters()
            
            # Setup daily consolidators to update spread_adjusters
            for symbol in self._symbols:
                self._consolidators[symbol] = QuoteBarConsolidator(1)
                self._consolidators[symbol].data_consolidated += self._custom_daily_handler
                algorithm.subscription_manager.add_consolidator(symbol, self._consolidators[symbol])
            
        for removed in changes.removed_securities:
            algorithm.subscription_manager.remove_consolidator(removed.symbol, self._consolidators[removed.symbol])
    
    def _custom_daily_handler(self, sender, consolidated):
        """
        Updates the rolling lookback window with the latest data.
        
        Inputs
         - sender
            Function calling the consolidator
         - consolidated
            Tradebar representing the latest completed trading day
        """
        # Add new data point to history while removing expired history
        self._history[consolidated.symbol]['bids'] = np.append(self._history[consolidated.symbol]['bids'][-self._window_size:], consolidated.bid.close)
        self._history[consolidated.symbol]['asks'] = np.append(self._history[consolidated.symbol]['asks'][-self._window_size:], consolidated.ask.close)
        
        # After updating the history of both symbols, update the spread adjusters
        self._consolidated_update += 1
        if self._consolidated_update == 2:
            self._consolidated_update = 0
            self._update_spread_adjusters()

    def _get_quotebars(self, data):
        """
        Extracts the QuoteBars from the given slice.
        
        Inputs
         - data
            Latest slice object the algorithm has received
            
        Returns the QuoteBars for the symbols we are trading.
        """
        if not all([data.quote_bars.contains_key(symbol) for symbol in self._symbols]):
            return []
            
        quotebars = [data.quote_bars[self._symbols[i]] for i in range(2)]
        
        if not all([q is not None for q in quotebars]):
            return []
            
        # Ensure ask > bid for each ETF
        if not all([q.ask.close > q.bid.close for q in quotebars]):
            return []
            
        return quotebars
    
    
    def _update_spread_adjusters(self):
        """
        Updates the spread adjuster by finding the mean of the trailing spread ratio b/w ETFs.
        """
        for i in range(2):
            numerator_history = self._history[self._symbols[i]]['bids']
            denominator_history = self._history[self._symbols[abs(i-1)]]['asks']
            self._spread_adjusters[i] = (numerator_history / denominator_history).mean()
        
#region imports
from AlgorithmImports import *

from alpha import ArbitrageAlphaModel
#endregion


class IntradayIndexETFArbitrageAlgorithm(QCAlgorithm):

    def initialize(self):
        self.set_start_date(2015, 8, 11)
        self.set_end_date(2020, 8, 11)
        
        self.set_cash(50000)
        
        tickers = ['IVV', 'SPY']
        symbols = [ Symbol.create(t, SecurityType.EQUITY, Market.USA) for t in tickers ]
        self.set_universe_selection(ManualUniverseSelectionModel(symbols))
        self.universe_settings.resolution = Resolution.SECOND
        
        self.add_alpha(ArbitrageAlphaModel())
        
        self.set_portfolio_construction(EqualWeightingPortfolioConstructionModel(lambda _: None))
        self.settings.rebalance_portfolio_on_security_changes = False
        
        self.set_execution(ImmediateExecutionModel())