Overall Statistics
Total Trades
0
Average Win
0%
Average Loss
0%
Compounding Annual Return
0%
Drawdown
0%
Expectancy
0
Net Profit
0%
Sharpe Ratio
0
Probabilistic Sharpe Ratio
0%
Loss Rate
0%
Win Rate
0%
Profit-Loss Ratio
0
Alpha
0
Beta
0
Annual Standard Deviation
0
Annual Variance
0
Information Ratio
-0.91
Tracking Error
0.173
Treynor Ratio
0
Total Fees
$0.00
Estimated Strategy Capacity
$0
from clr import AddReference
AddReference("System")
AddReference("QuantConnect.Algorithm")
AddReference("QuantConnect.Common")

from System import *
from QuantConnect import *
from QuantConnect.Algorithm import *
#from QuantConnect.Python import PythonQuandl

from datetime import timedelta
from math import floor
from decimal import Decimal
import numpy as np
import decimal as d

from sklearn import linear_model
import pandas as pd
from scipy import stats
from math import floor

from ArbAlphaModel import ArbAlphaModel

class CommoditiesIntradayArbitrageAlgorithm(QCAlgorithm):

    def Initialize(self):
        self.SetStartDate(2010, 4, 1)
        self.SetEndDate(2021, 4, 1)
        self.SetCash(1000000)
        
        self.numdays = 1000  # set the length of training period
        
        tickers = ["Futures.Grains.Corn", "Futures.Grains.Soybeans"]
        self.symbols = [ Symbol.Create(t, SecurityType.Future, Market.Globex) for t in tickers ]
        #self.Data = {}
        for ticker in tickers:
            #data = self.AddData(self.symbols, Resolution.Daily)
            
            future = self.AddFuture(ticker)
            future.SetFilter(timedelta(0), timedelta(days = 90))  

        self.threshold = 1.
        for i in tickers:
            self.symbols.append(self.AddSecurity(SecurityType.Future, i, Resolution.Minute).Symbol)
        for i in self.symbols:
            i.hist_window = RollingWindow[QuoteBar](self.numdays) 
        
        self.SetUniverseSelection(ManualUniverseSelectionModel(self.symbols) )
        self.UniverseSettings.Resolution = Resolution.Minute
        
        self.AddAlpha(ArbAlphaModel())
        
        self.SetPortfolioConstruction(EqualWeightingPortfolioConstructionModel(lambda time: None))
        self.SetExecution(ImmediateExecutionModel())
        
        self.Settings.RebalancePortfolioOnSecurityChanges = False
import pandas as pd
import numpy as np

class ArbAlphaModel(AlphaModel):
    """
    This class monitors the intraday bid and ask prices of two correlated instruments. 
    When the bid price of Instrument A (B) diverts high enough away from the ask price of Instrument B (A) such that the profit_pct_threshold is reached, 
    we start a timer. If the arbitrage opportunity is still present after the specified timesteps, we enter the arbitrage trade by going long 
    Instrument B (A) and short Instrument A (B). When the spread reverts back to where the bid of Instrument B (A) >= the ask of Instrument A (B) for 
    the same number of timesteps, we exit the trade. To address a trending historical spread between the two Instruments, we adjust the spread by removing the mean spread over a lookback window.
    """
    symbols = [] 
    entry_timer = [0, 0]
    exit_timer = [0, 0]
    spread_adjusters = [0, 0]
    long_side = -1
    consolidators = {}
    history = {}
    
    def __init__(self, order_delay = 3, profit_pct_threshold = 0.15, window_size = 3):  
        """
        Input:
         - order_delay
            The number of timesteps to wait while an arbitrage opportunity is present before emitting insights
            (>= 0)
         - profit_pct_threshold
            The amount of adjusted profit there must be in an arbitrage opportunity to signal a potential entry
            (> 0)
         - window_size
            The length of the lookback window used to calculate the spread adjusters (to address the trending spread b/w Instruments)
            (> 0)
        """
        self.order_delay = order_delay
        self.pct_threshold = profit_pct_threshold / 100
        self.window_size = window_size
        self.consolidated_update = 0
    
    
    def Update(self, algorithm, data):
        """
        Called each time our alpha model receives a new data slice.
        
        Input:
         - algorithm
            Algorithm instance running the backtest
         - data
            Data for the current time step in the backtest
            
        Returns a list of Insights to the portfolio construction model.
        """
        if algorithm.IsWarmingUp:
            return []
        
        quotebars = self.get_quotebars(data)
        if not quotebars:
            return []

        # Ensure we are not within 5 minutes of either the open or close
        exchange = algorithm.Securities["Futures.Grains.Corn"].Exchange
        if not (exchange.DateTimeIsOpen(algorithm.Time - timedelta(minutes=5)) and \
                exchange.DateTimeIsOpen(algorithm.Time + timedelta(minutes=5))):
            return []
        
        # Search for entries
        for i in range(2):
            if quotebars[abs(i-1)].Bid.Close / quotebars[i].Ask.Close - self.spread_adjusters[abs(i-1)] >= self.pct_threshold:
                self.entry_timer[i] += 1
                if self.entry_timer[i] == self.order_delay:
                    self.exit_timer = [0, 0]
                    if self.long_side == i:
                        return []
                    self.long_side = i
                    return [Insight.Price(self.symbols[i], timedelta(days=60), InsightDirection.Up), #1, 1),   
                            Insight.Price(self.symbols[abs(i-1)], timedelta(days=60), InsightDirection.Down)] #, 1, 1)]
                else:
                    return []
            self.entry_timer[i] = 0
            
        # Search for an exit
        if self.long_side >= 0: # In a position
            if quotebars[self.long_side].Bid.Close / quotebars[abs(self.long_side-1)].Ask.Close - self.spread_adjusters[self.long_side] >= 0: # Exit signal
                self.exit_timer[self.long_side] += 1
                if self.exit_timer[self.long_side] == self.order_delay: # Exit signal lasted long enough
                    self.exit_timer[self.long_side] = 0
                    i = self.long_side
                    self.long_side = -1
                    return [Insight.Price(self.symbols[i], timedelta(days=60), InsightDirection.Flat), #, 1, 1), 
                            Insight.Price(self.symbols[abs(i-1)], timedelta(days=60), InsightDirection.Flat)] #, 1, 1)]
                else:
                    return []
        return []
        
        
    def OnSecuritiesChanged(self, algorithm, changes):
        """
        Called each time our universe has changed.
        
        Inputs:
         - algorithm
            Algorithm instance running the backtest
         - changes
            The additions and subtractions to the algorithm's security subscriptions
        """
        added_symbols = [security.Symbol for security in changes.AddedSecurities]
        if len(added_symbols) > 0:
            self.symbols.extend(added_symbols)
            
            if len(self.symbols) != 2:
                algorithm.Error(f"ArbitrageAlphaModel must have 2 symbols to trade")
                algorithm.Quit()
                return
            
            history = algorithm.History(self.symbols, self.window_size, Resolution.Minute)[['bidclose', 'askclose']]
            starting_row_count = min([history.loc[symbol].shape[0] for symbol in self.symbols])

            for symbol in self.symbols:
                self.history[symbol] = {'bids': history.loc[symbol].bidclose.to_numpy()[-starting_row_count:],
                                        'asks': history.loc[symbol].askclose.to_numpy()[-starting_row_count:]}

            self.update_spread_adjusters()
            
            # Setup daily consolidators to update spread_adjusters
            for symbol in self.symbols:
                self.consolidators[symbol] = QuoteBarConsolidator(1)
                self.consolidators[symbol].DataConsolidated += self.CustomDailyHandler
                algorithm.SubscriptionManager.AddConsolidator(symbol, self.consolidators[symbol])
            
        for removed in changes.RemovedSecurities:
            algorithm.SubscriptionManager.RemoveConsolidator(removed.Symbol, self.consolidators[removed.Symbol])
    
    
    def CustomDailyHandler(self, sender, consolidated):
        """
        Updates the rolling lookback window with the latest data.
        
        Inputs
         - sender
            Function calling the consolidator
         - consolidated
            Tradebar representing the latest completed trading day
        """
        # Add new data point to history while removing expired history
        self.history[consolidated.Symbol]['bids'] = np.append(self.history[consolidated.Symbol]['bids'][-self.window_size:], consolidated.Bid.Close)
        self.history[consolidated.Symbol]['asks'] = np.append(self.history[consolidated.Symbol]['asks'][-self.window_size:], consolidated.Ask.Close)
        
        # After updating the history of both symbols, update the spread adjusters
        self.consolidated_update += 1
        if self.consolidated_update == 2:
            self.consolidated_update = 0
            self.update_spread_adjusters()
        


    def get_quotebars(self, data):
        """
        Extracts the QuoteBars from the given slice.
        
        Inputs
         - data
            Latest slice object the algorithm has received
            
        Returns the QuoteBars for the symbols we are trading.
        """
        if not all([data.QuoteBars.ContainsKey(symbol) for symbol in self.symbols]):
            return []
            
        quotebars = [data.QuoteBars[self.symbols[i]] for i in range(2)]
        
        if not all([q is not None for q in quotebars]):
            return []
            
        # Ensure ask > bid for each Instrument
        if not all([q.Ask.Close > q.Bid.Close for q in quotebars]):
            return []
            
        return quotebars
    
    
    def update_spread_adjusters(self):
        """
        Updates the spread adjuster by finding the mean of the trailing spread ratio b/w Instruments.
        """
        for i in range(2):
            numerator_history = self.history[self.symbols[i]]['bids']
            denominator_history = self.history[self.symbols[abs(i-1)]]['asks']
            self.spread_adjusters[i] = (numerator_history / denominator_history).mean()