Overall Statistics |
Total Orders 59 Average Win 0.02% Average Loss -0.02% Compounding Annual Return 1.017% Drawdown 0.100% Expectancy 0.309 Start Equity 1000000 End Equity 1001295.4 Net Profit 0.130% Sharpe Ratio 1.876 Sortino Ratio 2.041 Probabilistic Sharpe Ratio 80.962% Loss Rate 32% Win Rate 68% Profit-Loss Ratio 0.91 Alpha 0 Beta 0 Annual Standard Deviation 0.002 Annual Variance 0 Information Ratio 2.891 Tracking Error 0.002 Treynor Ratio 0 Total Fees $54.60 Estimated Strategy Capacity $70000.00 Lowest Capacity Asset SPXW XWIOBRFBVCEM|SPX 31 Portfolio Turnover 0.02% |
#region imports from AlgorithmImports import * #endregion from Underlying import Underlying import operator class DataHandler: # The supported cash indices by QC https://www.quantconnect.com/docs/v2/writing-algorithms/datasets/tickdata/us-cash-indices#05-Supported-Indices # These need to be added using AddIndex instead of AddEquity CashIndices = ['VIX','SPX','NDX'] def __init__(self, context, ticker, strategy): self.ticker = ticker self.context = context self.strategy = strategy # Method to add the ticker[String] data to the context. # @param resolution [Resolution] # @return [Symbol] def AddUnderlying(self, resolution=Resolution.Minute): if self.__CashTicker(): return self.context.AddIndex(self.ticker, resolution=resolution) else: return self.context.AddEquity(self.ticker, resolution=resolution) # SECTION BELOW IS FOR ADDING THE OPTION CHAIN # Method to add the option chain data to the context. # @param resolution [Resolution] def AddOptionsChain(self, underlying, resolution=Resolution.Minute): if self.ticker == "SPX": # Underlying is SPX. We'll load and use SPXW and ignore SPX options (these close in the AM) return self.context.AddIndexOption(underlying.Symbol, "SPXW", resolution) elif self.__CashTicker(): # Underlying is an index return self.context.AddIndexOption(underlying.Symbol, resolution) else: # Underlying is an equity return self.context.AddOption(underlying.Symbol, resolution) # Should be called on an option object like this: option.SetFilter(self.OptionFilter) # !This method is called every minute if the algorithm resolution is set to minute def SetOptionFilter(self, universe): # Start the timer # self.context.executionTimer.start() # Include Weekly contracts # nStrikes contracts to each side of the ATM # Contracts expiring in the range (DTE-5, DTE) filteredUniverse = universe.Strikes(-self.strategy.nStrikesLeft, self.strategy.nStrikesRight)\ .Expiration(max(0, self.strategy.dte - self.strategy.dteWindow), max(0, self.strategy.dte))\ .IncludeWeeklys() # Stop the timer # self.context.executionTimer.stop() return filteredUniverse # SECTION BELOW HANDLES OPTION CHAIN PROVIDER METHODS def optionChainProviderFilter(self, symbols, min_strike_rank, max_strike_rank, minDte, maxDte): # Check if we got any symbols to process if len(symbols) == 0: return None # Filter the symbols based on the expiry range filteredSymbols = [symbol for symbol in symbols if minDte <= (symbol.ID.Date.date() - self.context.Time.date()).days <= maxDte ] # Exit if there are no symbols for the selected expiry range if not filteredSymbols: return None # If this is not a cash ticker, filter out any non-tradable symbols. # to escape the error `Backtest Handled Error: The security with symbol 'SPY 220216P00425000' is marked as non-tradable.` if not self.__CashTicker(): filteredSymbols = [x for x in filteredSymbols if self.context.Securities[x.ID.Symbol].IsTradable] underlying = Underlying(self.context, self.strategy.underlyingSymbol) # Get the latest price of the underlying underlyingLastPrice = underlying.Price() # Find the ATM strike atm_strike = sorted(filteredSymbols ,key = lambda x: abs(x.ID.StrikePrice - underlying.Price()) )[0].ID.StrikePrice # Get the list of available strikes strike_list = sorted(set([i.ID.StrikePrice for i in filteredSymbols])) # Find the index of ATM strike in the sorted strike list atm_strike_rank = strike_list.index(atm_strike) # Get the Min and Max strike price based on the specified number of strikes min_strike = strike_list[max(0, atm_strike_rank + min_strike_rank + 1)] max_strike = strike_list[min(atm_strike_rank + max_strike_rank - 1, len(strike_list)-1)] # Get the list of symbols within the selected strike range selectedSymbols = [symbol for symbol in filteredSymbols if min_strike <= symbol.ID.StrikePrice <= max_strike ] # Loop through all Symbols and create a list of OptionContract objects contracts = [] for symbol in selectedSymbols: # Create the OptionContract contract = OptionContract(symbol, symbol.Underlying) self.AddOptionContracts([contract], resolution = self.context.timeResolution) # Set the BidPrice contract.BidPrice = self.Securities[contract.Symbol].BidPrice # Set the AskPrice contract.AskPrice = self.Securities[contract.Symbol].AskPrice # Set the UnderlyingLastPrice contract.UnderlyingLastPrice = underlyingLastPrice # Add this contract to the output list contracts.append(contract) # Return the list of contracts return contracts def getOptionContracts(self, slice): # Start the timer # self.context.executionTimer.start('Tools.DataHandler -> getOptionContracts') contracts = None # Set the DTE range (make sure values are not negative) minDte = max(0, self.strategy.dte - self.strategy.dteWindow) maxDte = max(0, self.strategy.dte) # Loop through all chains for chain in slice.OptionChains: # Look for the specified optionSymbol if chain.Key != self.strategy.optionSymbol: continue # Make sure there are any contracts in this chain if chain.Value.Contracts.Count != 0: # Put the contracts into a list so we can cache the Greeks across multiple strategies contracts = [ contract for contract in chain.Value if minDte <= (contract.Expiry.date() - self.context.Time.date()).days <= maxDte ] # If no chains were found, use OptionChainProvider to see if we can find any contracts # Only do this for short term expiration contracts (DTE < 3) where slice.OptionChains usually fails to retrieve any chains # We don't want to do this all the times for performance reasons if contracts == None and self.strategy.dte < 3: # Get the list of available option Symbols symbols = self.context.OptionChainProvider.GetOptionContractList(self.ticker, self.context.Time) # Get the contracts contracts = self.optionChainProviderFilter(symbols, -self.strategy.nStrikesLeft, self.strategy.nStrikesRight, minDte, maxDte) # Stop the timer # self.context.executionTimer.stop('Tools.DataHandler -> getOptionContracts') return contracts # Method to add option contracts data to the context. # @param contracts [Array] # @param resolution [Resolution] # @return [Symbol] def AddOptionContracts(self, contracts, resolution = Resolution.Minute): # Add this contract to the data subscription so we can retrieve the Bid/Ask price if self.__CashTicker(): for contract in contracts: if contract.Symbol not in self.context.optionContractsSubscriptions: self.context.AddIndexOptionContract(contract, resolution) else: for contract in contracts: if contract.Symbol not in self.context.optionContractsSubscriptions: self.context.AddOptionContract(contract, resolution) # PRIVATE METHODS # Internal method to determine if we are using a cashticker to add the data. # @returns [Boolean] def __CashTicker(self): return self.ticker in self.CashIndices
#region imports from AlgorithmImports import * #endregion """ Underlying class for the Options Strategy Framework. This class is used to get the underlying price. Example: self.underlying = Underlying(self, self.underlyingSymbol) self.underlyingPrice = self.underlying.Price() """ class Underlying: def __init__(self, context, underlyingSymbol): self.context = context self.underlyingSymbol = underlyingSymbol def Security(self): return self.context.Securities[self.underlyingSymbol] # Returns the underlying symbol current price. def Price(self): return self.Security().Price def Close(self): return self.Security().Close
from AlgorithmImports import * from DataHandler import DataHandler class LiveSPXTest(QCAlgorithm): def initialize(self) -> None: self.set_start_date(2022, 1, 20) self.set_end_date(2022, 3, 7) self.set_cash(1000000) self.SetBrokerageModel(BrokerageName.InteractiveBrokersBrokerage, AccountType.Margin) self.strategy = Strategy() self.dataHandler = DataHandler(self, "SPX", self.strategy) self.underlying = self.dataHandler.AddUnderlying(Resolution.MINUTE) self.option = self.dataHandler.AddOptionsChain(self.underlying, Resolution.MINUTE) self.underlying.SetDataNormalizationMode(DataNormalizationMode.Raw) self.option.SetFilter(self.dataHandler.SetOptionFilter) self.spx = self.underlying.Symbol self.strategy.underlyingSymbol = self.spx self.spx_option = self.option.Symbol self.strategy.optionSymbol = self.spx_option self.ema_slow = self.ema(self.spx, 80) self.ema_fast = self.ema(self.spx, 200) self.openOrders = {} self.stopOrders = {} self.openPositions = {} # Dictionary to track open positions def OnData(self, slice: Slice) -> None: if self.Time.hour >= 14: self.CancelOpenOrders() return if self.checkOpenOrders(): return self.checkExpiredPositions() # Check if there are any open SPX positions if self.openPositions: # self.Log(f"Already have open SPX positions: {self.openPositions}") return # Exit if there are open positions chain = self.dataHandler.getOptionContracts(slice) self.placeCallOrder(chain) # self.placePutOrder(chain) def OnOrderEvent(self, orderEvent: OrderEvent) -> None: if orderEvent.Status == OrderStatus.Filled: if orderEvent.OrderId in self.openPositions: del self.openPositions[orderEvent.OrderId] self.Log(f"Order closed: {orderEvent.OrderId}") # Remove stoped orders as the position has been closed for order_id in list(self.stopOrders.keys()): self.Transactions.CancelOrder(order_id) del self.stopOrders[order_id] if orderEvent.OrderId in self.openOrders: del self.openOrders[orderEvent.OrderId] self.Log(f"Order filled: {orderEvent.OrderId}") self.openPositions[orderEvent.OrderId] = self.Time # Track the open position # Add stop limit order at 2-3weeks worth of premium symbol = orderEvent.Symbol quantity = 3 # Example quantity, adjust as needed stop_price = 0.15 * 12.5 # Example stop price, adjust as needed limit_price = 0.15 * 13 # Example limit price, adjust as needed tag = f"Stop Limit Order for {orderEvent.OrderId}" # Place the stop limit order ticket = self.StopLimitOrder(symbol, quantity, stop_price, limit_price, tag) self.stopOrders[ticket.OrderId] = self.Time self.Log(f"Stop Limit Order placed: {ticket.OrderId}") elif orderEvent.Status == OrderStatus.Submitted: # if orderEvent.OrderId not in self.stopOrders: # self.openOrders[orderEvent.OrderId] = self.Time self.Log(f"Limit Order submitted: {orderEvent.OrderId}") elif orderEvent.Status == OrderStatus.Canceled: if orderEvent.OrderId in self.openOrders: del self.openOrders[orderEvent.OrderId] self.Log(f"Order canceled: {orderEvent.OrderId}") else: if orderEvent.OrderId in self.openPositions: self.Log(f"Order closed via exercise?!: {orderEvent.OrderId}") del self.openPositions[orderEvent.OrderId] def CancelOpenOrders(self): for order_id in list(self.openOrders.keys()): self.Log(f"Cancelling order: {order_id}") self.Transactions.CancelOrder(order_id) # No need to delete from here as we delete in OnOrderEvent # del self.openOrders[order_id] def checkOpenOrders(self): if self.openOrders: for order_id in list(self.openOrders.keys()): # Use list to create a copy of keys order_time = self.openOrders[order_id] # Check if any order has been open for less than 15 minutes if (self.Time - order_time).total_seconds() / 60 < 15: return True # Exit if any order is still within the 15-minute window # Check if the order has been open for more than 15 minutes if (self.Time - order_time).total_seconds() / 60 > 15: self.Log(f"Order has been open for more than 15 minutes: {order_id}") self.Transactions.CancelOrder(order_id) return False def placeCallOrder(self, chain): if chain is None: # self.Log(" -> No chains inside currentSlice!") return chain = self.filterByExpiry(chain, self.Time.date()) underlying_price = self.Securities[self.spx].Price target_strike = underlying_price * 1.01 # strikes_rights_and_bids = [(contract.Strike, contract.Right, contract.BidPrice) for contract in chain] # self.Log(f"Contract Strikes, Rights, and Bid Prices: {strikes_rights_and_bids}") # self.Log(f"Target strike: {target_strike}") # self.Log(f"Underlying price: {underlying_price}") otm_calls = [x for x in chain if x.Right == OptionRight.Call and x.Strike >= target_strike] if not otm_calls: self.Log("No OTM calls found") return # Filter for the call option with the premium closest to 0.10 otm_call = min(otm_calls, key=lambda x: abs(x.BidPrice - 0.15)) # self.Log(f"OTM call: {otm_call.Symbol}, Bid Price: {otm_call.BidPrice}") if otm_call.BidPrice > 0.10: # Sell the selected call option with a limit order ticket = self.LimitOrder(otm_call.Symbol, -3, otm_call.BidPrice) self.openOrders[ticket.OrderId] = self.Time self.Log(f"Limit Order placed: {otm_call.Symbol} at {otm_call.BidPrice}") # else: # self.Log("Bid price is below the minimum threshold of 0.10") def checkExpiredPositions(self): expired_positions = [order_id for order_id, position_time in self.openPositions.items() if position_time.date() < self.Time.date()] for order_id in expired_positions: del self.openPositions[order_id] self.Log(f"Expired position removed: {order_id}") # Remove stoped orders as the position has been closed expired_stop_orders = [order_id for order_id, position_time in self.stopOrders.items() if position_time.date() < self.Time.date()] for order_id in expired_stop_orders: self.Transactions.CancelOrder(order_id) del self.stopOrders[order_id] self.Log(f"Expired stop order removed: {order_id}") def filterByExpiry(self, chain, expiry=None): if expiry is not None: filteredChain = [ contract for contract in chain if contract.Expiry.date() == expiry ] else: filteredChain = chain return filteredChain def on_end_of_algorithm(self) -> None: if self.portfolio[self.spx].total_sale_volume > 0: raise Exception("Index is not tradable.") class Strategy: def __init__(self): self.nStrikesLeft = 20 self.nStrikesRight = 20 self.dte = 0 self.underlyingSymbol = None self.optionSymbol = None self.dteWindow = 0