Overall Statistics |
Total Trades 0 Average Win 0% Average Loss 0% Compounding Annual Return 0% Drawdown 0% Expectancy 0 Net Profit 0% Sharpe Ratio 0 Probabilistic Sharpe Ratio 0% Loss Rate 0% Win Rate 0% Profit-Loss Ratio 0 Alpha 0 Beta 0 Annual Standard Deviation 0 Annual Variance 0 Information Ratio -0.91 Tracking Error 0.173 Treynor Ratio 0 Total Fees $0.00 Estimated Strategy Capacity $0 |
from clr import AddReference AddReference("System") AddReference("QuantConnect.Algorithm") AddReference("QuantConnect.Common") from System import * from QuantConnect import * from QuantConnect.Algorithm import * #from QuantConnect.Python import PythonQuandl from datetime import timedelta from math import floor from decimal import Decimal import numpy as np import decimal as d from sklearn import linear_model import pandas as pd from scipy import stats from math import floor from ArbAlphaModel import ArbAlphaModel class CommoditiesIntradayArbitrageAlgorithm(QCAlgorithm): def Initialize(self): self.SetStartDate(2010, 4, 1) self.SetEndDate(2021, 4, 1) self.SetCash(1000000) self.numdays = 1000 # set the length of training period tickers = ["Futures.Grains.Corn", "Futures.Grains.Soybeans"] self.symbols = [ Symbol.Create(t, SecurityType.Future, Market.Globex) for t in tickers ] #self.Data = {} for ticker in tickers: #data = self.AddData(self.symbols, Resolution.Daily) future = self.AddFuture(ticker) future.SetFilter(timedelta(0), timedelta(days = 90)) self.threshold = 1. for i in tickers: self.symbols.append(self.AddSecurity(SecurityType.Future, i, Resolution.Minute).Symbol) for i in self.symbols: i.hist_window = RollingWindow[QuoteBar](self.numdays) self.SetUniverseSelection(ManualUniverseSelectionModel(self.symbols) ) self.UniverseSettings.Resolution = Resolution.Minute self.AddAlpha(ArbAlphaModel()) self.SetPortfolioConstruction(EqualWeightingPortfolioConstructionModel(lambda time: None)) self.SetExecution(ImmediateExecutionModel()) self.Settings.RebalancePortfolioOnSecurityChanges = False
import pandas as pd import numpy as np class ArbAlphaModel(AlphaModel): """ This class monitors the intraday bid and ask prices of two correlated instruments. When the bid price of Instrument A (B) diverts high enough away from the ask price of Instrument B (A) such that the profit_pct_threshold is reached, we start a timer. If the arbitrage opportunity is still present after the specified timesteps, we enter the arbitrage trade by going long Instrument B (A) and short Instrument A (B). When the spread reverts back to where the bid of Instrument B (A) >= the ask of Instrument A (B) for the same number of timesteps, we exit the trade. To address a trending historical spread between the two Instruments, we adjust the spread by removing the mean spread over a lookback window. """ symbols = [] entry_timer = [0, 0] exit_timer = [0, 0] spread_adjusters = [0, 0] long_side = -1 consolidators = {} history = {} def __init__(self, order_delay = 3, profit_pct_threshold = 0.15, window_size = 3): """ Input: - order_delay The number of timesteps to wait while an arbitrage opportunity is present before emitting insights (>= 0) - profit_pct_threshold The amount of adjusted profit there must be in an arbitrage opportunity to signal a potential entry (> 0) - window_size The length of the lookback window used to calculate the spread adjusters (to address the trending spread b/w Instruments) (> 0) """ self.order_delay = order_delay self.pct_threshold = profit_pct_threshold / 100 self.window_size = window_size self.consolidated_update = 0 def Update(self, algorithm, data): """ Called each time our alpha model receives a new data slice. Input: - algorithm Algorithm instance running the backtest - data Data for the current time step in the backtest Returns a list of Insights to the portfolio construction model. """ if algorithm.IsWarmingUp: return [] quotebars = self.get_quotebars(data) if not quotebars: return [] # Ensure we are not within 5 minutes of either the open or close exchange = algorithm.Securities["Futures.Grains.Corn"].Exchange if not (exchange.DateTimeIsOpen(algorithm.Time - timedelta(minutes=5)) and \ exchange.DateTimeIsOpen(algorithm.Time + timedelta(minutes=5))): return [] # Search for entries for i in range(2): if quotebars[abs(i-1)].Bid.Close / quotebars[i].Ask.Close - self.spread_adjusters[abs(i-1)] >= self.pct_threshold: self.entry_timer[i] += 1 if self.entry_timer[i] == self.order_delay: self.exit_timer = [0, 0] if self.long_side == i: return [] self.long_side = i return [Insight.Price(self.symbols[i], timedelta(days=60), InsightDirection.Up), #1, 1), Insight.Price(self.symbols[abs(i-1)], timedelta(days=60), InsightDirection.Down)] #, 1, 1)] else: return [] self.entry_timer[i] = 0 # Search for an exit if self.long_side >= 0: # In a position if quotebars[self.long_side].Bid.Close / quotebars[abs(self.long_side-1)].Ask.Close - self.spread_adjusters[self.long_side] >= 0: # Exit signal self.exit_timer[self.long_side] += 1 if self.exit_timer[self.long_side] == self.order_delay: # Exit signal lasted long enough self.exit_timer[self.long_side] = 0 i = self.long_side self.long_side = -1 return [Insight.Price(self.symbols[i], timedelta(days=60), InsightDirection.Flat), #, 1, 1), Insight.Price(self.symbols[abs(i-1)], timedelta(days=60), InsightDirection.Flat)] #, 1, 1)] else: return [] return [] def OnSecuritiesChanged(self, algorithm, changes): """ Called each time our universe has changed. Inputs: - algorithm Algorithm instance running the backtest - changes The additions and subtractions to the algorithm's security subscriptions """ added_symbols = [security.Symbol for security in changes.AddedSecurities] if len(added_symbols) > 0: self.symbols.extend(added_symbols) if len(self.symbols) != 2: algorithm.Error(f"ArbitrageAlphaModel must have 2 symbols to trade") algorithm.Quit() return history = algorithm.History(self.symbols, self.window_size, Resolution.Minute)[['bidclose', 'askclose']] starting_row_count = min([history.loc[symbol].shape[0] for symbol in self.symbols]) for symbol in self.symbols: self.history[symbol] = {'bids': history.loc[symbol].bidclose.to_numpy()[-starting_row_count:], 'asks': history.loc[symbol].askclose.to_numpy()[-starting_row_count:]} self.update_spread_adjusters() # Setup daily consolidators to update spread_adjusters for symbol in self.symbols: self.consolidators[symbol] = QuoteBarConsolidator(1) self.consolidators[symbol].DataConsolidated += self.CustomDailyHandler algorithm.SubscriptionManager.AddConsolidator(symbol, self.consolidators[symbol]) for removed in changes.RemovedSecurities: algorithm.SubscriptionManager.RemoveConsolidator(removed.Symbol, self.consolidators[removed.Symbol]) def CustomDailyHandler(self, sender, consolidated): """ Updates the rolling lookback window with the latest data. Inputs - sender Function calling the consolidator - consolidated Tradebar representing the latest completed trading day """ # Add new data point to history while removing expired history self.history[consolidated.Symbol]['bids'] = np.append(self.history[consolidated.Symbol]['bids'][-self.window_size:], consolidated.Bid.Close) self.history[consolidated.Symbol]['asks'] = np.append(self.history[consolidated.Symbol]['asks'][-self.window_size:], consolidated.Ask.Close) # After updating the history of both symbols, update the spread adjusters self.consolidated_update += 1 if self.consolidated_update == 2: self.consolidated_update = 0 self.update_spread_adjusters() def get_quotebars(self, data): """ Extracts the QuoteBars from the given slice. Inputs - data Latest slice object the algorithm has received Returns the QuoteBars for the symbols we are trading. """ if not all([data.QuoteBars.ContainsKey(symbol) for symbol in self.symbols]): return [] quotebars = [data.QuoteBars[self.symbols[i]] for i in range(2)] if not all([q is not None for q in quotebars]): return [] # Ensure ask > bid for each Instrument if not all([q.Ask.Close > q.Bid.Close for q in quotebars]): return [] return quotebars def update_spread_adjusters(self): """ Updates the spread adjuster by finding the mean of the trailing spread ratio b/w Instruments. """ for i in range(2): numerator_history = self.history[self.symbols[i]]['bids'] denominator_history = self.history[self.symbols[abs(i-1)]]['asks'] self.spread_adjusters[i] = (numerator_history / denominator_history).mean()