Overall Statistics |
Total Orders 1207 Average Win 0.12% Average Loss -0.09% Compounding Annual Return 10.771% Drawdown 5.400% Expectancy 0.101 Start Equity 10000 End Equity 10614 Net Profit 6.140% Sharpe Ratio 0.268 Sortino Ratio 0.128 Probabilistic Sharpe Ratio 43.965% Loss Rate 52% Win Rate 48% Profit-Loss Ratio 1.29 Alpha 0.008 Beta 0.228 Annual Standard Deviation 0.088 Annual Variance 0.008 Information Ratio -0.375 Tracking Error 0.118 Treynor Ratio 0.104 Total Fees $27.00 Estimated Strategy Capacity $2000.00 Lowest Capacity Asset SPY YBQ1C4XA1AAU|SPY R735QTJ8XC9X Portfolio Turnover 47.23% |
#region imports from AlgorithmImports import * #endregion from bisect import bisect_left import sys import traceback # ================================================================================================ # Assumes myList is sorted. Returns closest value to myNumber. # # If two numbers are equally close, return the smallest number. # # Credit from: # https://stackoverflow.com/questions/12141150/from-list-of-integers-get-number-closest-to-a-given-value/12141511#12141511 # ================================================================================================ def GetIndexOfClosestMatchFromSortedList(myList, myNumber): pos = bisect_left(myList, myNumber) if pos == 0: return 0 if pos == len(myList): return len(myList)-1 before = myList[pos - 1] after = myList[pos] if after - myNumber < myNumber - before: return pos else: return pos-1 # ============================================================ def HandleExceptionWithDebug(algo): exc_type, exc_value, exc_tb = sys.exc_info() formatted_error = traceback.format_exception(exc_type, exc_value, exc_tb) curframe = inspect.currentframe() calframe = inspect.getouterframes(curframe, 2) callerName = calframe[1][3] algo.Debug(f"{self.algo.Time} [Exception] In Method {callerName}, Called from [...])") algo.Debug('{0}{1}{2}'.format(formatted_error[0], formatted_error[1], formatted_error[2]))
#region imports from AlgorithmImports import * #endregion from StrategyPosition import * from OptionsMoneyManager import * from DataUtils import GetIndexOfClosestMatchFromSortedList from DataUtils import HandleExceptionWithDebug from QuantConnect.Securities.Option import OptionStrategies from QuantConnect.Securities.Option import OptionPriceModels from QuantConnect.Securities.Option import OptionStrategyPositionGroupBuyingPowerModel import datetime import inspect class OptionsExecutionManager(): def __init__(self, algo, optionMgrConfig): self.algo = algo self.managerConfig = optionMgrConfig self.moneyManager = OptionsMoneyManager() self.OpenPositions = [] # When we subscribe to an option contract using AddOptionContract, # the asset price is initially 0 because data for that contract has # not yet been fed into the algorithm. We can work around this issue # by using SetSecurityInitializer with a custom security initializer. # ------------------------------------------------------------------- algo.SetSecurityInitializer(lambda x: x.SetMarketPrice(algo.GetLastKnownPrice(x))) # ============================================================== # Get Target Strike Given Pct Distnace # ============================================================== def GetTargetStrikeByPctDist( self, symbolArg, pctDist): theCurrPrice = self.algo.Securities[symbolArg].Price theStrikePrice = theCurrPrice * (1 + (pctDist) ) return theStrikePrice # ============================================================== # Get Target Expiry Given DTE Days # ============================================================== def GetTargetExpiryByDTE( self, dteDays): theExpiration = self.algo.Time + timedelta(days=dteDays) return theExpiration # ============================================================== # Get Strike & Expiration, fiven # ============================================================== def GetStrikeAndExpiration( self, symbolArg, pctDist, dteDays ): theCurrPrice = self.algo.Securities[symbolArg].Price theExpiration = self.algo.Time + timedelta(days=dteDays) theStrikePrice = theCurrPrice * (1 + (pctDist) ) return theStrikePrice , theExpiration # ============================================================== # Buy Call or Put with Delta # ============================================================== def BuyCallOrPutWithDelta(self, symbolArg, optionDelta, expiryDTE, optionRightArg = OptionRight.Call ): self.BuyCallOrPut(symbolArg, 0, expiryDTE, optionRightArg, optionDelta) # ============================================================== # Get Call or Put # ============================================================== def BuyCallOrPut(self, symbolArg, optionStrike, expiryDTE, optionRightArg = OptionRight.Call, optionDelta=None ): # retrive closest call contracts # ------------------------------- if( optionDelta is None ): optionContract = self.SelectContract(symbolArg, optionStrike, expiryDTE, optionRightArg) else: optionContract = self.SelectContractByDelta(symbolArg, optionDelta, expiryDTE, optionRightArg) if (optionContract is None): # self.algo.Debug(f"{self.algo.Time} [ERROR] Option Contract for {symbolArg} is 'None' ") return # subscribe to data for the contracts # ----------------------------------------- theOption = self.algo.AddOptionContract(optionContract, Resolution.Minute) # buy contract # ------------------- underlyingPrice = self.algo.Securities[symbolArg].Price callOrPutString = "CALL" if (optionRightArg == OptionRight.Call) else "PUT" orderMsg = f"[BUY {callOrPutString}] {symbolArg}-{str(round(theOption.StrikePrice,0))} | Stock @ $ {str(underlyingPrice)}" self.algo.MarketOrder(optionContract, 1, False, orderMsg) # return (optionContract, orderMsg) # ============================================================== # Open PutCreditSpread with Delta # ============================================================== def SellBullPutSpreadWithDelta(self, symbolArg, longStrikeDelta, shortStrikeStepDistance=1, expiryDTE=120, signalMsg=""): self.SellBullPutSpread(symbolArg, 0, shortStrikeStepDistance, expiryDTE, longStrikeDelta, signalMsg) # ============================================================== # Open PutCreditSpread # ============================================================== def SellBullPutSpread(self, symbolArg, longStrikePrice, shortStrikeStepDistance=1, expiryDTE=120, longStrikeDelta=0.0, signalMsg="" ): strategy = None # try: if True: # Use Deltas to retrieve strikes # --------------------------------- if(longStrikeDelta > 0 ): # fetch contracts longPutContract, shortPutContract = self.SelectVerticalSpreadContractsByDelta(OptionRight.Put, \ symbolArg, longStrikeDelta, shortStrikeStepDistance, expiryDTE) theShortStrike = shortPutContract.Strike theLongStrike = longPutContract.Strike theExpiry = shortPutContract.Expiry deltaMsg = f"(Deltas: +{abs(round(longPutContract.Greeks.Delta*100,2))} -{abs(round(shortPutContract.Greeks.Delta*100,2))})" # Else use strike price # ---------------------- else: # fetch contracts longPutContract, shortPutContract = self.SelectVerticalSpreadContracts(OptionRight.Put, \ symbolArg, longStrikePrice, shortStrikeStepDistance, expiryDTE) # subscribe to data for the contracts # ----------------------------------------- theLongOption = self.algo.AddOptionContract(longPutContract, Resolution.Minute) theShortOption = self.algo.AddOptionContract(shortPutContract, Resolution.Minute) theShortStrike = theShortOption.StrikePrice theLongStrike = theLongOption.StrikePrice theExpiry = theShortOption.Expiry deltaMsg = "" # except: # if (longPutContract is None) or (shortPutContract is None): # self.algo.Debug(f"{self.algo.Time}SellBullPutSpread [ERROR] Option Contract for {symbolArg} is 'None' ") # return # Check if there is a mismatch / price anomaly # Check if it's worth it to play the spread # ------------------------------- if(shortPutContract.BidPrice <= longPutContract.AskPrice): # self.algo.Debug(f"{self.algo.Time} [info] Won't open BPS - Price anomaly: Short Bid < Long Ask | {deltaMsg}") # self.algo.Debug(f"{self.algo.Time} \t - SHORT {shortPutContract.BidPrice} for {shortPutContract.Symbol}") # self.algo.Debug(f"{self.algo.Time} \t - LONG {longPutContract.AskPrice} for {longPutContract.Symbol}") failReasonMsg = f"Anomaly: Short Bid < Long Ask | {round(shortPutContract.BidPrice, 2)} < {round(longPutContract.AskPrice, 2)} | {deltaMsg}" strategy = None # Check if there is already a position held for either contract # ------------------------------- elif( self.algo.Securities[shortPutContract.Symbol].Invested or \ self.algo.Securities[longPutContract.Symbol].Invested): alreadyInvestedMsg = f"SELL {shortPutContract.Symbol}" \ if self.algo.Securities[shortPutContract.Symbol].Invested \ else f"BUY {longPutContract.Symbol}" # self.algo.Debug(f"{self.algo.Time} [info] Won't open BPS - Cannot {alreadyInvestedMsg} (position already exists) | {deltaMsg}") failReasonMsg = f"Cannot {alreadyInvestedMsg} - position already exists" strategy = None else: # todo: find out why we cant just use symbolArg # (throws error. requires a 'canonical' symbol) # -------------------------------------------------- optionSymbol = self.algo.AddOption(symbolArg) strategy = OptionStrategies.BullPutSpread(optionSymbol.Symbol, theShortStrike, theLongStrike, theExpiry) # Todo: Open the strategy using limit orders for each leg # Opening at market prices might get us filled at # a Risk/Reward ratio we dont want # -------------------------------------------------- # https://www.quantconnect.com/forum/discussion/8399/optionstrategies-limit-orders # # for leg in strategy.OptionLegs: # leg.OrderType = OrderType.Limit # leg.OrderPrice = shortPutContract.BidPrice or longPutContract.AskPrice #depending on direction if(strategy is None): self.algo.Debug(f"{self.algo.Time} [SKIPPING] Did not open put credit spread for {symbolArg} - {failReasonMsg}") else: # store a reference to the order # ------------------------------- strategyID = f"{symbolArg}+{theLongStrike}P-{theShortStrike}P|{theExpiry}" strategyPosition = StrategyPosition(self.algo, strategyID, strategy, symbolArg) strategyPosition.SpreadType = OptionSpreadType.BULL_PUT_SPREAD strategyPosition.PositionDelta = shortPutContract.Greeks.Delta strategyPosition.ExpectedFees = 2 * self.algo.brokerOrderFee strategyPosition.ExpectedCost = 100 * (longPutContract.AskPrice - shortPutContract.BidPrice) # TODO: use order fees to set expceted fees the right way # ------------------------------------------------------- # dummyOrder = MarketOrder(self.spy, 20) # parameters = OrderFeeParameters(self.spy, order) # fee = self.spy.FeeModel.GetOrderFee(parameters).Value.Amount if( OptionsMoneyManager.IsSpreadTradeWorthy(strategyPosition) ): # place the order # --------------- self.OpenPositions.append(strategyPosition) # todo: do this *after* order's placed (confirm no side effects) # Calculate amount based on percentage # ------------------------------------ # Reference: https://www.quantconnect.com/forum/discussion/3646/understanding-buying-power-calculation-and-setholdings/p1 # ----------------------------------------- marginRemaining = self.algo.Portfolio.MarginRemaining amountToInvest = self.algo.pctEquityPerTrade * marginRemaining # TODO: Do the math to calculate buying power effect, per QC forum # # marginRemaining = self.Portfolio.MarginRemaining # for contract in [theShortContract, theLongContract]: # security = self.Securities[contract.Symbol] # buyingPowerEffect = OptionsMoneyManager.GetBuyingPowerEffect(strategyPosition, True) # affordableQuantity = OptionsMoneyManager.GetAffordableQuantity(strategyPosition, marginRemaining, self.algo) # Fix Above LIne # qtyToPurchase = math.floor(abs(amountToInvest / buyingPowerEffect)) qtyToPurchase = 1 if( qtyToPurchase > 0 ): orderMsg = f"{symbolArg} +{theLongStrike}P -{theShortStrike}P {deltaMsg}" orderMsg = f"[OPENING][{signalMsg}] {orderMsg} | Stock @ {self.algo.Securities[symbolArg].Price}" self.algo.Debug(f"{self.algo.Time} {orderMsg}") strategy.Name = orderMsg orderTickets = self.algo.Order(strategy, qtyToPurchase) strategyPosition.PendingOrders = orderTickets else: self.algo.Debug(f"{self.algo.Time} [SKIPPING] Not enough $$ - ${marginRemaining} margin left | Buying Power Effect: ${buyingPowerEffect} ") else: self.algo.Debug(f"{self.algo.Time} [SKIPPING] Not Tradeworthy - Did not open BPS | {deltaMsg}") # ====================================================================================================== # Select VerticalSpread Contracts, given a symbol, expiration, delta and strike steps # ====================================================================================================== def SelectVerticalSpreadContractsByDelta(self, optionRightArg, symbolArg, primaryStrikeDeltaArg, secondaryStrikeStepDistance, expiryDTE): # contracts = self.algo.OptionChainProvider.GetOptionContractList(symbolArg, self.algo.Time) # try: # if True: canonicalSymbol = self.algo.AddOption(symbolArg) chain = self.algo.CurrentSlice.OptionChains[canonicalSymbol.Symbol] # except: # self.algo.Debug(f"{self.algo.Time} [Error] SelectVerticalSpreadContractsByDelta - Issue w/Option Chains") # return None, None # Filter the put options contracts # ------------------------------------------ # puts = [x for x in chain if x.Right == OptionRight.Put and abs(x.Greeks.Delta) > 0 and \ # abs(x.Greeks.Delta) < primaryStrikeDeltaArg and x.BidPrice > self.MIN_PREMIUM] filteredContracts = [x for x in chain if x.Right == OptionRight.Put] # min(contracts, key=lambda x: abs(x.Greeks.Delta-(-0.30))) if(len(filteredContracts) < 2): self.algo.Debug(f"{self.algo.Time} [Error] SelectVerticalSpreadContractsByDelta - Less than 2 filtered contracts") return None, None # sort the contracts according to their expiration dates and choose the ATM options # contracts = sorted(sorted(puts, key = lambda x: x.BidPrice, reverse=True), # key = lambda x: x.Expiry) # # sort contracts by expiry dates and select expiration closest to desired expiration # -------------------------------------------- expiryDate = self.GetTargetExpiryByDTE( expiryDTE ) contractsSortedByExpiration = sorted(filteredContracts, key=lambda p: abs(p.Expiry - expiryDate), reverse=False) closestExpirationDate = contractsSortedByExpiration[0].Expiry # get all contracts for selected expiration # ------------------------------------------------ contractsMatchingExpiryDTE = [contract for contract in contractsSortedByExpiration if contract.Expiry == closestExpirationDate] contractsSortedByDelta = sorted(contractsMatchingExpiryDTE, key=lambda p:p.Greeks.Delta, reverse=True) # todo: use more effective means to get closest delta # eg: use following lambda function filters. # longContract = min(contractsSortedByDelta, key=lambda x: abs(x.Greeks.Delta-primaryStrikeDeltaArg)) # shortContract = min(contractsSortedByDelta, key=lambda x: abs(x.Greeks.Delta-someShortDelta)) # ----------------------------------------------------------------------------------- ## get list of deltas from list of contracts deltaList =list( abs(contractObject.Greeks.Delta) for contractObject in contractsSortedByDelta) closestDeltaIndex = GetIndexOfClosestMatchFromSortedList(deltaList, primaryStrikeDeltaArg) primaryContract = contractsSortedByDelta[closestDeltaIndex] # secondaryContract = contractsSortedByDelta[closestDeltaIndex+secondaryStrikeStepDistance] try: secondaryContract = contractsSortedByDelta[closestDeltaIndex+secondaryStrikeStepDistance] except IndexError as theError: # investigate why secondaryContract fires this error " IndexError : list index out of range" self.algo.Debug(f"{self.algo.Time}[ERROR] SelectVerticalSpreadContractsByDelta - {symbolArg}- Cannot select secondary contract - IndexError : list index out of range - ") secondaryContract = None # ------------- # try: # primaryContract = contractsSortedByDelta[0] # secondaryContract = contractsSortedByDelta[1] # pc = primaryContract # sc = secondaryContract # except: # self.algo.Debug(f"{self.algo.Time} [Error] Exception thrown occured while selecting contract from filtered array") # return None, None return primaryContract, secondaryContract # ====================================================================================================== # Select VerticalSpread Contracts, given a symbol, expiration, # desired primary strike and strike steps # # eg to get adjacent put strikes, call: # SelectVerticalSpreadContracts( GOOG, 250, -1, 12/12/20) or ( GOOG, 250, +1, 12/12/20) # ====================================================================================================== def SelectVerticalSpreadContracts(self, optionRightArg, symbolArg, primaryStrikePriceArg, secondaryStrikeStepDistance, expiryDTE): contracts = self.algo.OptionChainProvider.GetOptionContractList(symbolArg, self.algo.Time) if(len(contracts) < 2): return None, None # get all contracts that match type # ------------------------------------ filteredContracts = [symbol for symbol in contracts if symbol.ID.OptionRight == optionRightArg] # sort contracts by expiry dates and select expiration closest to desired expiration # -------------------------------------------- expiryDate = self.GetTargetExpiryByDTE( expiryDTE ) contractsSortedByExpiration = sorted(filteredContracts, key=lambda p: abs(p.ID.Date - expiryDate), reverse=False) closestExpirationDate = contractsSortedByExpiration[0].ID.Date # get all contracts for selected expiration # ------------------------------------------------ contractsMatchingExpiryDTE = [contract for contract in contractsSortedByExpiration if contract.ID.Date == closestExpirationDate] contractsSortedByStrike = sorted(contractsMatchingExpiryDTE, key=lambda p:p.ID.StrikePrice) # sort contracts and select the one closest to desired strike # ----------------------------------------------------------- ## get list of strikes from list of contracts contractObjectList =( contract.ID for contract in contractsSortedByStrike) strikeList =list( contractID.StrikePrice for contractID in contractObjectList) # realStrikeList = list(strikeList) closestStrikeIndex = GetIndexOfClosestMatchFromSortedList(strikeList, primaryStrikePriceArg) primaryContract = contractsSortedByStrike[closestStrikeIndex] try: secondaryContract = contractsSortedByStrike[closestStrikeIndex+secondaryStrikeStepDistance] except: # investigate why secondaryContract fires this error " IndexError : list index out of range" self.algo.Debug("investigate why secondaryContract fires this error -- IndexError : list index out of range") secondaryContract = None pc = primaryContract sc = secondaryContract return primaryContract, secondaryContract # ================================================================== # Select a Contract, given a symbol, expiration, and delta # ================================================================== def SelectContractByDelta(self, symbolArg, strikeDeltaArg, expiryDTE, optionRightArg= OptionRight.Call): canonicalSymbol = self.algo.AddOption(symbolArg) chain = self.algo.CurrentSlice.OptionChains[canonicalSymbol.Symbol] expiryDate = self.GetTargetExpiryByDTE( expiryDTE ) # Filter the Call/Put options contracts # ---------------------------------- filteredContracts = [x for x in chain if x.Right == optionRightArg] # sort the contracts according to their closeness to our desired expiry # ----------------------------------------------------------------- contractsSortedByExpiration = sorted(filteredContracts, key=lambda p: abs(p.Expiry - expiryDate), reverse=False) closestExpirationDate = contractsSortedByExpiration[0].Expiry # get all contracts for selected expiration contractsMatchingExpiryDTE = [contract for contract in contractsSortedByExpiration if contract.Expiry == closestExpirationDate] # Use lambda and min to give us the key with the contract with the closest delta closestContract = min(contractsMatchingExpiryDTE, key=lambda x: abs(abs(x.Greeks.Delta)-strikeDeltaArg)) return closestContract # todo [DONE]: use more effective means to get closest delta # eg: use following lambda function filters. # longContract = min(contractsSortedByDelta, key=lambda x: abs(x.Greeks.Delta-primaryStrikeDeltaArg)) # ----------------------------------------------------------------------------------- # ================================================================== # Select a Contract, given a symbol, desired strike and expiration # ================================================================== def SelectContract(self, symbolArg, strikePriceArg, expirationArg, optionRightArg): contracts = self.algo.OptionChainProvider.GetOptionContractList(symbolArg, self.algo.Time) if(len(contracts) == 0): return None # get all contracts that match type # ------------------------------------ filteredContracts = [symbol for symbol in contracts if symbol.ID.OptionRight == optionRightArg] # sort contracts by expiry dates and select expiration closest to desired expiration # -------------------------------------------- contractsSortedByExpiration = sorted(filteredContracts, key=lambda p: abs(p.ID.Date - expirationArg), reverse=False) closestExpirationDate = contractsSortedByExpiration[0].ID.Date # get all contracts for selected expiration # ------------------------------------------------ contractsFilteredByExpiration = [contract for contract in contractsSortedByExpiration if contract.ID.Date == closestExpirationDate] # sort contracts and select the one closest to desired strike # ----------------------------------------------------------- contractsSortedByStrike = sorted(contractsFilteredByExpiration, key=lambda p: abs(p.ID.StrikePrice - strikePriceArg), reverse=False) theOptionContract = contractsSortedByStrike[0] return theOptionContract def OnOrderEvent(self, orderEvent): if( orderEvent.Status == OrderStatus.Filled ): self.UpdatePositionAfterFill(orderEvent.Symbol, orderEvent) # ============================================================ # After an order is filled, update the OpenPositions list # for each spread, store the fill price and time of fill # # https://lean-api-docs.netlify.app/classQuantConnect_1_1Orders_1_1OrderEvent.html#a6ca201759d0f83a785abb6c123e2a378 # ============================================================ def UpdatePositionAfterFill(self, symbol, orderEvent): # look through our strategyPositions for strategyPosition in self.OpenPositions: # if same underlying if symbol.Underlying == strategyPosition.Underlying: # for each spread leg for optionLegData in strategyPosition.OptionLegs: # if option # if same strike and expiration # Just check symbol itself instead if( ( optionLegData.Expiration == symbol.ID.Date ) and ( optionLegData.Strike == symbol.ID.StrikePrice) and ( optionLegData.Right == symbol.ID.OptionRight) ): # TODO: maybe just check symbol instead # ( optionLegData.Symbol == symbol ) if( orderEvent.IsAssignment ): self.algo.Debug(f"{self.algo.Time} [ERROR] Assignment Occured") strategyPosition.LegWasAssigned = True strategyPosition.HandleAssignment( orderEvent.FillQuantity,orderEvent.Symbol ) elif(strategyPosition.AllLegsFilled): # all legs were already filed,so this is likely a closing order. # ignore pass else: # A leg was filled. Update strategy position accordingly strategyPosition.AddfilledLeg(symbol, optionLegData, orderEvent) return return # =============================================== def LiquidateOpenSpread (self, strategyPosition, liquidateMsg): strategyPosition.LiquidateSpread(liquidateMsg) self.OpenPositions.remove(strategyPosition) # ============================================================ # Roll or Close open spreads, if warranted. # Reference to calculate spread profit / loss # https://www.quantconnect.com/forum/discussion/5739/calculating-profit-loss-of-one-leg-or-two-leg-option-strategies/p1 # ============================================================ def ManageOpenOptionSpreads(self): for strategyPosition in self.OpenPositions[:]: # if the spread has *not* had all its legs filled, skip it. # --------------------------------------------------------- if( not strategyPosition.AllLegsFilled): continue # Manage Credit spreads # --------------------- if ( strategyPosition.SpreadType == OptionSpreadType.BULL_PUT_SPREAD ) or \ ( strategyPosition.SpreadType == OptionSpreadType.BEAR_CALL_SPREAD ): stockPrice = self.algo.Securities[strategyPosition.Underlying].Price spreadExpiry = strategyPosition.ExpirationDate spreadOpenDate = strategyPosition.timeFilled # pctOfMaxLoss = OptionsMoneyManager.GetUnrealizedPctOfMaxLoss(strategyPosition) # if the expiry date of this positions (any of them) is # less than 'X' days away, liquidate the whole position # ------------------------------------------------------ if(self.managerConfig.exitAfterDays != 0): daysInTrade = (self.algo.Time - spreadOpenDate ).days if ( daysInTrade >= self.managerConfig.exitAfterDays): pctOfMaxprofit = OptionsMoneyManager.GetUnrealizedPctOfMaxReward(strategyPosition) liquidateMsg = f"[EXIT] {daysInTrade} DITs [{strategyPosition.Id}] @ {str(pctOfMaxprofit*100)}% Profit | Stock @ ${str(stockPrice)}" self.LiquidateOpenSpread(strategyPosition, liquidateMsg) continue # if the expiry date of this positions (any of them) is # less than 'X' days away, liquidate the whole position # ------------------------------------------------------ if(self.managerConfig.exitAtDTE != -1): daysTillExp = (spreadExpiry - self.algo.Time).days if ( daysTillExp <= self.managerConfig.exitAtDTE): # > > > > > > > > > > > > > > > > > > > > > > > > > > > > if (strategyPosition.algo.Time.date() < datetime.datetime(2016, 4, 6).date()) and \ (strategyPosition.algo.Time.date() > datetime.datetime(2016, 4, 4).date()): leg1 = strategyPosition.algo.Securities[list(strategyPosition.filledLegs.keys())[0]] if( "160415P00034000" in str(leg1.Symbol)): debugThat = "there" # > > > > > > > > > > > > > > > > > > > > > > > > > > > > # > > > > > > > > > > > > > > > > > > > > > > > # > > > > > > > > > > > > > > > > > > > > > > > # Debug # try: # liquidateMsg # leg1 = strategyPosition.algo.Securities[list(strategyPosition.filledLegs.keys())[0]] # if( "160415P00034000" in str(leg1.Symbol)): # stop="here" # except: # another = "stop" # exc_type, exc_value, exc_tb = sys.exc_info() # formatted_error = traceback.format_exception(exc_type, exc_value, exc_tb) # self.algo.Debug('{0}{1}{2}'.format(formatted_error[0], formatted_error[1], formatted_error[2])) # > > > > > > > > > > > > > > > > > > > > > > > # > > > > > > > > > > > > > > > > > > > > > > > pctOfMaxprofit = OptionsMoneyManager.GetUnrealizedPctOfMaxReward(strategyPosition) liquidateMsg = f"[EXIT] DTE [{strategyPosition.Id}]|Stock @ ${str(stockPrice)}|{str(pctOfMaxprofit*100)}% of MaxR " self.LiquidateOpenSpread(strategyPosition, liquidateMsg) continue # if spread has hit x% profit, liquidate # --------------------------------------------------- if(self.managerConfig.roiTargetInPct > 0): pctOfMaxprofit = OptionsMoneyManager.GetUnrealizedPctOfMaxReward(strategyPosition) # > > > > > > > > > > > > > > > > > > > > > > > > > > > > if (strategyPosition.algo.Time.date() < datetime.datetime(2016, 4, 6).date()) and \ (strategyPosition.algo.Time.date() > datetime.datetime(2016, 4, 4).date()): leg1 = strategyPosition.algo.Securities[list(strategyPosition.filledLegs.keys())[0]] if( "160415P00034000" in str(leg1.Symbol)): debugThat = "there" # > > > > > > > > > > > > > > > > > > > > > > > > > > > > if (pctOfMaxprofit >= self.managerConfig.roiTargetInPct): liquidateMsg = f"[EXIT] TP [{strategyPosition.Id}] @ {str(pctOfMaxprofit*100)}% Profit | Stock @ ${str(stockPrice)}" self.LiquidateOpenSpread(strategyPosition, liquidateMsg) continue # For now, no stop losses for put credit spreads. # Makes sense, since option price movement is volatile. # # Keeping below old code for use with debit spreads... perhaps. # # if spread contract has hit x% loss, liquidate # --------------------------------------------------- # elif( (self.managerConfig.maxLossInPct > 0) and \ # (profitPct <= (-1 * self.managerConfig.maxLossInPct)) ): # orderMsg = f"[EXIT] SL @ {str(profitPct*100)}% Profit | Stock @ ${str(stockPrice)}" # strategyPosition.LiquidateSpread(orderMsg) # continue # ########################################################################### else: self.algo.Debug(f"{self.algo.Time} [ERROR] Strategy was not a Credit Spread") self.algo.Quit(f"{self.algo.Time} [ERROR] Strategy was not a Credit Spread") return # ============================================================ # Roll or Close open positions, if warranted. # ============================================================ def ManageOpenOptionSingles(self): # check for open contracts and close them if warranted. # ------------------------------------------------------ for symbol in self.algo.Securities.Keys: portfolioPosition = self.algo.Securities[symbol] if portfolioPosition.Invested: # Manage Option positions # -------------------------------- if( portfolioPosition.Type == SecurityType.Option ): # TODO: Make sure this is truly a 'single' contract and not part of a spread # -------------------------------------------------------------------------------- # Set debug values # --------------------------------------------------- currTime = self.algo.Time stockPrice = portfolioPosition.Underlying.Price profitPct = round(portfolioPosition.Holdings.UnrealizedProfitPercent,2) daysTillExp = (portfolioPosition.Expiry - self.algo.Time).days callOrPutString = "CALL" if (portfolioPosition.Right == OptionRight.Call) else "PUT" # if the expiry date of this position # is less than 'X' days away, liquidate # ---------------------------------------- if ((self.managerConfig.exitAtDTE != -1) and ((portfolioPosition.Expiry - self.algo.Time).days < self.managerConfig.exitAtDTE)): orderMsg = f"[SELL] {callOrPutString} | {self.managerConfig.exitAtDTE} DTE Exit @ {str(profitPct*100) }% Profit | Stock @ ${str(stockPrice)} | {str(daysTillExp)} DTE" self.algo.Liquidate(symbol, orderMsg) return None # Handle call positions # eg: rolling, etc. # ---------------------------------- if (portfolioPosition.Right == OptionRight.Call): place = "holder" # Handle Put positions # ---------------------------------- elif (portfolioPosition.Right == OptionRight.Put): # Exit position if ITM # ---------------------------------------------------------------------- if (((portfolioPosition.Right == OptionRight.Call) and (stockPrice > portfolioPosition.StrikePrice)) or ((portfolioPosition.Right == OptionRight.Put) and (stockPrice < portfolioPosition.StrikePrice)) ): orderMsg = f"[SELL] {callOrPutString} | ITM Exit @ {str(profitPct*100)}% Profit | Stock: ${str(stockPrice)}" self.algo.Liquidate(symbol, orderMsg) return None # # queue the next order # # GetTargetStrikeGivenDistance # # callStrike, expiration = Get # # GetCallOrPut(self, symbolArg, callStrike, expiration, optionRightArg = OptionRight.Call ): # if current contract has hit x% return, liquidate # --------------------------------------------------- elif( (self.managerConfig.roiTargetInPct > 0) and (profitPct >= self.managerConfig.roiTargetInPct) ): orderMsg = f"[SELL] {callOrPutString} | TP Exit @ {str(profitPct*100)}% Profit | Stock @ ${str(stockPrice)}" self.algo.Liquidate(symbol, orderMsg) return None # if current contract has hit x% loss, liquidate # --------------------------------------------------- elif( (self.managerConfig.maxLossInPct > 0) and (profitPct <= (-1 * self.managerConfig.maxLossInPct)) ): orderMsg = f"[SELL] {callOrPutString} | SL Exit @ {str(profitPct*100)}% Profit | Stock @ ${str(stockPrice)}" self.algo.Liquidate(symbol, orderMsg) return None # ================================================================== # Liquidate all positions of the given type,for the given symbol. # ================================================================== def LiquidateOptionsOfType(self, symbolArg, optionRightArg = OptionRight.Call, orderMsgArg="Liquidated"): for symbolKey in self.algo.Securities.Keys: portfolioPosition = self.algo.Securities[symbolKey] if portfolioPosition.Invested: # Manage Option positions # -------------------------------- if( portfolioPosition.Type == SecurityType.Option ) and \ ( portfolioPosition.Underlying.Symbol == symbolArg) and \ ( portfolioPosition.Right == optionRightArg): profitPct = round(portfolioPosition.Holdings.UnrealizedProfitPercent * 100,2) orderMsgArg = f"{orderMsgArg} | Profit: {profitPct}%" self.algo.Debug(f"{self.algo.Time} - {orderMsgArg}") self.algo.Liquidate(symbolKey, orderMsgArg) # ================================================================== # Initialize Greeks # ------------------- # Set the pricing mdodel for the option subcsription. # Subsequently, the options returned will have greeks. # ================================================================== def InitGreeksForOption(self, theOptionSubscription): # initialize greeks # ------------------------- # set our strike/expiry filter for this option chain theOptionSubscription.SetFilter(self.OptionsFilterFunction) # set the pricing model for Greeks and volatility # find more pricing models https://www.quantconnect.com/lean/documentation/topic27704.html # --------- # for greeks and pricer (needs some warmup) - https://github.com/QuantConnect/Lean/blob/21cd972e99f70f007ce689bdaeeafe3cb4ea9c77/Common/Securities/Option/OptionPriceModels.cs#L81 theOptionSubscription.PriceModel = OptionPriceModels.CrankNicolsonFD() # both European & American, automatically def OptionsFilterFunction(self, optionsContractsChain): strikeCount = 100 # no of strikes around ATM => for universe selection minExpiryDTE = 10 # min num of days to expiration => for uni selection maxExpiryDTE = 90 # max num of days to expiration => for uni selection return optionsContractsChain.IncludeWeeklys()\ .Strikes(-strikeCount, strikeCount)\ .Expiration(timedelta(minExpiryDTE), timedelta(maxExpiryDTE)) ############################################################################################################################################################################################################################ ####################################################### ##### Class: Options Manager Configuration Object ##### ####################################################### class OptionsManagerConfiguration(): def __init__(self, enterAtDTE=180, exitAtDTE=0, exitAfterDays=0,roiTargetInPct=0.0, maxLossInPct=0.0, rollPositions=True ): self.enterAtDTE = enterAtDTE self.exitAtDTE = exitAtDTE self.exitAfterDays = exitAfterDays self.roiTargetInPct = roiTargetInPct self.maxLossInPct = maxLossInPct self.rollPositions = rollPositions ############################################################################################################################################################################################################################
#region imports from AlgorithmImports import * #endregion ################################################################################# ## Money management # ################################################################################# # # Guidance # -------- # To determine the risk amount of a credit spread, take the width of the spread and # subtract the credit amount. The potential reward on a credit spread is the amount # of the credit received minus transaction costs. To illustrate, let’s say you sold # the XYZ 36-strike put and bought the XYZ 34-strike put (the “XYZ 36-34 put vertical”) # for a $0.52 credit. To calculate the risk per contract, you’d subtract the credit # received ($0.52) from the width of the vertical ($2.00), which equals $1.48 or # $148 per contract (plus transaction costs). Your potential reward would be your # credit of $0.52 or $52 per contract (minus transaction costs). # # https://tickertape.tdameritrade.com/trading/calculate-risk-defined-risk-vertical-spread-16053 # ============================================================================================= from StrategyPosition import * import sys import traceback import datetime import inspect class OptionsMoneyManager(): def __init__(self): return # Applicable to a Credit spread. Check # the % of max possible profit earned so far (unrealized) # ========================================================== @staticmethod def GetUnrealizedPctOfMaxReward(strategyPosition): if(strategyPosition.AllLegsFilled ): # > > > > > > > > > > > > > > > > > > > > > > > > > > > > # if (strategyPosition.algo.Time.date() < datetime.datetime(2016, 4, 6).date()) and \ # (strategyPosition.algo.Time.date() > datetime.datetime(2016, 4, 4).date()): # leg1 = strategyPosition.algo.Securities[list(strategyPosition.filledLegs.keys())[0]] # debugThat = "there" # > > > > > > > > > > > > > > > > > > > > > > > > > > > > # Perform calculations for Credit spreads # --------------------------------------- if ( strategyPosition.SpreadType == OptionSpreadType.BULL_PUT_SPREAD ) or \ ( strategyPosition.SpreadType == OptionSpreadType.BEAR_CALL_SPREAD ): unrealizedProfit = OptionsMoneyManager.GetUnrealizedProfit(strategyPosition) maxPossibleReward = OptionsMoneyManager.GetMaxReward(strategyPosition) if (maxPossibleReward > 0): # strategyPosition.algo.Quit() unrealizedProfitPct = ( unrealizedProfit / maxPossibleReward ) if( unrealizedProfitPct > 1 ): # this should never happen. debugBreakpointHere = None return unrealizedProfitPct else: # todo: # this should never happen. handle some other way return 0 # # no access to 'self' here. Handle some other way # # self.algo.Debug("MAX REWARD IS ZERO") return None @staticmethod def GetUnrealizedProfitPct(strategyPosition): if(strategyPosition.AllLegsFilled ): # cant rely on portfolio[contractSymbol].holdingscost, because that contract # may be held for multiple open strategy positions, not just this one. # ----------------------------------------- # folio = strategyPosition.algo.Portfolio # cost = sum([folio[symbol].HoldingsCost for symbol in strategyPosition.filledLegs.keys]) cost = strategyPosition.netCost unrealizedProfit = OptionsMoneyManager.GetUnrealizedProfit(strategyPosition) unrealizedProfitPct = unrealizedProfit / -cost return unrealizedProfitPct return None @staticmethod def GetUnrealizedProfit(strategyPosition): if(strategyPosition.AllLegsFilled ): # > > > > > > > > > > > > > > > > > > > > > > > > > > > > if (strategyPosition.algo.Time.date() < datetime.datetime(2016, 4, 6).date()) and \ (strategyPosition.algo.Time.date() > datetime.datetime(2016, 4, 4).date()): leg1 = strategyPosition.algo.Securities[list(strategyPosition.filledLegs.keys())[0]] debugThat = "there" # > > > > > > > > > > > > > > > > > > > > > > > > > > > > # Perform calculations for Credit spreads # --------------------------------------- if ( strategyPosition.SpreadType == OptionSpreadType.BULL_PUT_SPREAD ) or \ ( strategyPosition.SpreadType == OptionSpreadType.BEAR_CALL_SPREAD ): folio = strategyPosition.algo.Portfolio unrealizedProfit = sum([folio[legSymbo].UnrealizedProfit for legSymbo in strategyPosition.filledLegs]) return unrealizedProfit return None # ============================================== @staticmethod def GetMaxRisk(strategyPosition, simulated=False): if(strategyPosition.AllLegsFilled or simulated): # If it's a credit spread vs debit spread # ----------------------------------------- if ( strategyPosition.SpreadType == OptionSpreadType.BULL_PUT_SPREAD ) or \ ( strategyPosition.SpreadType == OptionSpreadType.BEAR_CALL_SPREAD ): # return spread width minus credit received # ----------------------------------------- legOneStrike = strategyPosition.OptionLegs[0].Strike legTwoStrike = strategyPosition.OptionLegs[1].Strike spreadWidth = abs(legOneStrike - legTwoStrike) spreadCost = strategyPosition.ExpectedCost if simulated else strategyPosition.netCost creditReceived = abs(spreadCost) maxPossibleLoss = spreadWidth - creditReceived return maxPossibleLoss else: # Debit spread # ------------ spreadCost = strategyPosition.ExpectedCost if simulated else strategyPosition.netCost maxPossibleLoss = abs(spreadCost) return maxPossibleLoss return None # ==================================================== @staticmethod def GetMaxReward(strategyPosition, simulated=False): if(strategyPosition.AllLegsFilled or simulated): # If it's a credit spread... # (debit spreads have no cap on reward) # ----------------------------------------- if ( strategyPosition.SpreadType == OptionSpreadType.BULL_PUT_SPREAD ) or \ ( strategyPosition.SpreadType == OptionSpreadType.BEAR_CALL_SPREAD ): # The potential reward on a credit spread is the amount # of the credit received minus transaction costs. # ------------------------------------------------------ fees = strategyPosition.ExpectedFees if simulated else strategyPosition.orderFees cost = strategyPosition.ExpectedCost if simulated else strategyPosition.netCost maxPossibleReward = abs(cost) - fees return maxPossibleReward # cant rely on portfolio[contractSymbol].holdingscost, because that contract # may be held for multiple open strategy positions, not just this one. # ----------------------------------------- # folio = strategyPosition.algo.Portfolio # cost = sum([folio[legSymbol].HoldingsCost for legSymbol in strategyPosition.filledLegs]) # # > > > > > > > > > > > > > > > > > > > > > > > > > # # Debug # leg1 = strategyPosition.algo.Securities[list(strategyPosition.filledLegs.keys())[0]] # leg2 = strategyPosition.algo.Securities[list(strategyPosition.filledLegs.keys())[1]] # tt = strategyPosition.algo.Time # if( strategyPosition.netCost != round(cost,2)): # # this should never happen # if( "NVDA+45.0P-45.5P|2016-06-17" in strategyPosition.Id ): # stop = "here" # if (maxPossibleReward <= 0): # # this should never happen # # raise Exception('Bad', 'Scenario') # # self.algo.Quit() # stop = "here" # > > > > > > > > > > > > > > > > > > > > > > > > > return None # Risk-Reward (RR) is the ratio of credit (max gain)/risk (max loss). # Simply divide the credit by the amount risked (spread width minus credit). # ===================================================================== @staticmethod def GetRiskRewardRatio(strategyPosition, simulated=False): if(strategyPosition.AllLegsFilled or simulated): maxRisk = OptionsMoneyManager.GetMaxRisk(strategyPosition, simulated) maxReward = OptionsMoneyManager.GetMaxReward(strategyPosition, simulated) rratio = abs(maxReward/maxRisk) return rratio return None # Buying Power Effect # Using logic from here: # https://support.tastyworks.com/support/solutions/articles/43000435260-short-credit-vertical-spread # # Additional reference in QC post: # https://www.quantconnect.com/forum/discussion/11552/examples-of-how-to-use-optionstrategypositiongroupbuyingpowermodel/p1 # ===================================================================== @staticmethod def GetBuyingPowerEffect (strategyPosition, simulated=False): if(strategyPosition.AllLegsFilled or simulated): # If it's a credit spread, handle it # ----------------------------------------- if ( strategyPosition.SpreadType == OptionSpreadType.BULL_PUT_SPREAD ) or \ ( strategyPosition.SpreadType == OptionSpreadType.BEAR_CALL_SPREAD ): legOneStrike = strategyPosition.OptionLegs[0].Strike legTwoStrike = strategyPosition.OptionLegs[1].Strike spreadWidth = abs(legOneStrike - legTwoStrike) spreadCost = strategyPosition.ExpectedCost if simulated else strategyPosition.netCost marginReqPerSpead = spreadWidth * 1 * 100 creditReceived = abs(spreadCost) * 100 bpEffect = creditReceived - marginReqPerSpead return bpEffect else: # can only handle credit spreads for now raise Exception('Wrong_Use_Of_GetBPEffect', 'Wrong_Use_Of_GetBPEffect') # =========================================================================== @staticmethod def GetAffordableQuantity(strategyPosition, marginRemaining, algo ): # If it's a credit spread, handle it # ----------------------------------------- if ( strategyPosition.SpreadType == OptionSpreadType.BULL_PUT_SPREAD ) or \ ( strategyPosition.SpreadType == OptionSpreadType.BEAR_CALL_SPREAD ): # (self, theShortContract, theLongContract): quantities = [] # theShortContract = strategyPosition. # theLongContract = strategyPosition. # for each spread leg ooo = [x.Symbol for x in strategyPosition.OptionLegs] for optionLegData in strategyPosition.OptionLegs: algo.Debug(str(optionLegData)) return 0 # for contract in [theShortContract, theLongContract]: # security = algo.Securities[contract.Symbol] # marginParameters = InitialMarginParameters(security, 1) # marginRequirement = security.BuyingPowerModel.GetInitialMarginRequirement(marginParameters).Value # affordableQuantity = int(marginRemaining / abs(marginRequirement)) # quantities.append(affordableQuantity) return min(quantities) # ===================================================================== # IsSpreadTradeWorthy # --------------------- # Check if the Risk Reward is Worth It. # # Note: This Logic taken from bollinger bend author, and is probably only # good for bollinger bend. Need to vet the rationale # : # # "I don't recommend using these guidelines for all strategies at all deltas. # I suspect that the relationship between RR/Delta as delta increases is not # linear and the ideal ratio would not simply be [above 1] as you get into # higher deltas. [this is pure speculation]" # # # With this particular strategy, within the delta ranges of 15-25 and 20-30 on the # signals, the RR should fall within a range of roughly 0.2-0.6. # # RR/Delta Guideline: # The RR/Delta calculation is done by dividing the RR by the Delta # (expressed as a decimal value - for example, 30 delta is 0.3). # So 0.3 RR/ 0.3 Delta = 1. And so on. # Aim to have RR/Delta average across hundreds of trades as 1 or higher. # Meaning the RR is equal to or greater than the delta. # ( generally: always get 1 or higher RR/Delta on each trade ). # That is completely doable and many are doing it. But the guideline # is actually just to average this across hundreds of trades so if you # fall below one on some trades don’t worry about it. # # # ======== # A key factor in the strategy is the Risk Reward/Delta ratio. # This more than anything determines the Tharpe expectancy # (ie expectancy of profit over hundreds of trades) and Sharpe ratio (measuring # how well you are using your money) of the strategy. Ideally, RR/delta should be 1 or # greater within the recommended delta ranges. Note: THIS RULE DOES NOT HOLD UP AS DELTA # GETS HIGHER than 30 AND IT SHOULD NOT BE TAKEN AS A GENERAL RULE FOR ALL CREDIT SPREADS # (outside of this strategy) AT ANY DELTA. # # Example 1: Signal 1 trade, 100/99 strikes, # 0.3 delta, 0.3 credit. RR = 0.42; # RR/Delta = 0.42/0.3 = 1.42. (ok) # # ===================================================================== @staticmethod def IsSpreadTradeWorthy(strategyPosition): # If it's a put credit spread. # (debit spreads have no cap on reward) # ----------------------------------------- if ( strategyPosition.SpreadType == OptionSpreadType.BULL_PUT_SPREAD ): riskRewardratio = OptionsMoneyManager.GetRiskRewardRatio(strategyPosition, True) rrDelta = abs(riskRewardratio/strategyPosition.PositionDelta) return (rrDelta >= 1)
#region imports from AlgorithmImports import * #endregion class OptionSpreadType(Enum): BULL_PUT_SPREAD = "Bull Put Spread" BEAR_PUT_SPREAD = "Bear Put Spread" BULL_CALL_SPREAD = "Bull call Spread" BEAR_CALL_SPREAD = "Bear Call Spread" IRON_CONDOR = "IRON CONDOR" ####################################################### ##### Class: StrategyPosition ##### ####################################################### class StrategyPosition: def __init__(self, algo, strategyID, optionStrategy, underlyingSymbol): self.algo = algo self.timeSubmitted = algo.Time self.timeFilled = None self.timeLiquidated = None self.SpreadType = None self.strategy = optionStrategy self.Id = strategyID self.filledLegs = {} self.PendingOrders = [] self.netCost = 0.0 # if negative, this is a credit spread self.orderFees = 0.0 self.LegWasAssigned = False self.PositionDelta = 0.0 self.ExpectedFees = 0.0 self.ExpectedCost = 0.0 @property def Underlying(self): return self.strategy.Underlying @property def OptionLegs(self): return self.strategy.OptionLegs @property def AllLegsFilled(self): if ( len(self.OptionLegs) == 0 ) or ( len(self.filledLegs) == 0): return False else: # loop through and make sure each optionleg is present in 'filled legs' # ------------------------------------------------------------------------ # todo: find a better way to do this. Perhaps check portfolio data, # like holdingsCost or some other property. Be wary of holdingscost, # since a symbol can be held for different legs for optionLeg in self.OptionLegs : # if it is not in filled legs, return false if( not (optionLeg in self.filledLegs.values()) ): return False return True # Loop through filled legs and check if a leg's holdings cost == 0.0 # When this happens, is it an anomaly? should we quit? # # Note-to-self: # I believe this expected behaviour. # --------------------------------------------------------------------- # The net holdingCost for a spread leg, eg the short put of a credit spread # can be 0.0, even when we have positions of that leg. Thsi happen in # scenarios where we have multiple spreads, where that exists in one # spread as a SHORT, and in another spread as a LONG # # eg: Imagine these two spreads, opened one after the other # bullPutSpreadA = buy [nvda $305 put] / sell [nvda $310 put] # bullPutSpreadB = buy [nvda $300 put ] / sell [nvda $305 put] # # In this case, the net holdingsCost for [nvda $305 put] will be 0.0 # even though we have two positions for this contract (one short one long) # # --------------------------------------------------------------------------- # for optionSymbol, optionLegData in self.filledLegs.copy().items(): # # Debug message # if(self.algo.Portfolio[optionSymbol].HoldingsCost == 0): # curframe = inspect.currentframe() # calframe = inspect.getouterframes(curframe, 2) # callerName = calframe[1][3] # self.algo.Debug(f"{self.algo.Time} [ERROR] {optionSymbol} 0.0 HoldingCost - AllLegsFilled (from {callerName})") # self.algo.Quit() @property def ExpirationDate(self): minExpiry = min([self.algo.Securities[symbol].Expiry for symbol in self.filledLegs.keys()]) return minExpiry # ================================================================ def AddfilledLeg( self, legSymbol, optionLegData, orderEvent): self.filledLegs[legSymbol] = optionLegData legFillCost = orderEvent.FillPrice * orderEvent.FillQuantity * 100 legFillCost = legFillCost if (orderEvent.Direction == OrderDirection.Buy) else -abs(legFillCost) self.netCost = self.netCost + legFillCost self.orderFees = self.orderFees + orderEvent.OrderFee.Value.Amount # > > > > > > > > > > > > > > > > # if( "160415P00034000" in str(legSymbol)): # stop = "here" # > > > > > > > > > > > > > > > > # Handle Error # ------------ if(legFillCost == 0.0): theDir = 'BUY' if (orderEvent.Direction == OrderDirection.Buy) else 'SELL' self.algo.Debug(f"{self.algo.Time} [ERROR] Zero Fill Cost for {theDir} {legSymbol}") self.algo.Quit() # If all legs filled, clear pending orders, update fill time # -------------------------------------------------------- if self.AllLegsFilled: self.PendingOrders = [] if( self.timeFilled is None): self.timeFilled = self.algo.Time # > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > # Debug # folio = self.algo.Portfolio # cost = sum([folio[theLegSymbol].HoldingsCost for theLegSymbol in self.filledLegs]) # leg1 = self.algo.Securities[list(self.filledLegs.keys())[0]] # leg2 = self.algo.Securities[list(self.filledLegs.keys())[1]] # tt = self.algo.Time # if( "NVDA+45.0P-45.5P|2016-06-17" in self.Id ): # stop="here" # if( self.netCost != round(cost,2)): # # this should never happen # if( "NVDA+45.0P-45.5P|2016-06-17" in self.Id ): # stop="here" # > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > # ================================================ def HandleAssignment(self, assignmentQuantity, symbol ): # Todo: Figure out why we cant sell assigned shares. Currently workng # around this in main.py, checking for assigned shares every morning # -------------------------------------------------------------------------- # self.algo.Sell(symbol.Underlying,assignmentQuantity * 100 ) # self.algo.MarketOnOpenOrder(symbol.Underlying, -assignmentQuantity * 100 ) self.LiquidateSpread("[!][EXIT] Assignment triggered Liquidation") # for optionSymbol, optionLegData in self.filledLegs.copy().items(): # if( symbol != optionSymbol ): # if( not (optionLeg in self.filledLegs.values()) ): # return False # check the optionsymbol # liquidate corresponding fillquantity of calls # iff assignment quantity matches fillquantity, # ================================================ def LiquidateSpread(self, orderMsg=""): # > > > > > > > > > > > > > > > > > > > > > > > > > > > > if ("NVDA+45.0P-45.5P|2016-06-17" in self.Id): stop = "here" debugThat = "there" # > > > > > > > > > > > > > > > > > > > > > > > > > > > > for optionSymbol, optionLegData in self.filledLegs.copy().items(): # Todo: Need a more precise way to liquidate just this position # This liquidates ALL holdings of leg symbol # but this symbol may be used in other spreads # that we dont want to close yet. self.algo.Liquidate(optionSymbol, orderMsg) self.filledLegs.pop(optionSymbol) self.timeLiquidated = self.algo.Time # Cancel any pending orders as well # -------------------------------------- # todo: Figure out why this errors out. # # for pendingOrder in self.PendingOrders[:]: # pendingOrder.Cancel(f"Liquidated Spread - cancel pending orders - {orderMsg}") # self.PendingOrders.remove(pendingOrder) # ================================================================ # Method to select the most optimal spread, mer max reward-to-risk # Select most optimal spread # Returns 2 contracts to trade # Inspired by this comment # https://www.quantconnect.com/forum/discussion/8712/internfund-begins-live-trading/p1/comment-24539 # ================================================================== # TODO: Implement this. def SelectBestSpread(self, longContracts,shortContracts,OptionSpreadType=OptionSpreadType.BULL_PUT_SPREAD ): # inspired by this post from QC staff: # https://www.quantconnect.com/forum/discussion/8712/internfund-begins-live-trading/p1/comment-24539 # -------------------------------------------------------------- # Note, contracts is a list of items from the optionchain, like below # # for symbol, chain in data.OptionChains.items(): # # contracts = [contract for contract in chain] # # # # self.TradeOptions(contracts, underlying_price) # ------------------------------------------------------------- # # def TradeOptions(self, contracts, underlying_price): # # Get all put contracts # all_puts = [contract for contract in contracts if contract.Right == OptionRight.Put] # # Get all unique expiries # expires = set([contract.Expiry for contract in all_puts]) # rankings_df = pd.DataFrame() # for expiry in expires: # puts = [contract for contract in all_puts if contract.Expiry == expiry] # if len(puts) == 0: # continue # for option_combinations in combinations(puts, 2): # buy_put = option_combinations[0] # sell_put = option_combinations[1] # strike_width = buy_put.Strike - sell_put.Strike # net_cost = buy_put.LastPrice - sell_put.LastPrice # # We ignore the contract multiplier here # max_loss = net_cost # max_profit = strike_width - net_cost # # Calculate factors # inverted_profit_loss_ratio = max_loss / max_profit if max_profit != 0 else float('inf') # break_even_distance = underlying_price - buy_put.Strike + net_cost # days_to_expiry = (expiry - self.Time).days # # Save factor results # row = pd.DataFrame({'inverted_profit_loss_ratio' : [inverted_profit_loss_ratio], # 'break_even_distance' : [break_even_distance], # 'days_to_expiry' : [days_to_expiry]}, # index=[option_combinations]) # rankings_df = rankings_df.append(row) # if not rankings_df.empty: # # Rank put contracts by factors # selected_contracts = rankings_df.rank().sum(axis=1).idxmin() # # Create Bear Put Spread # buy_symbol = selected_contracts[0].Symbol # sell_symbol = selected_contracts[1].Symbol # quantity = self.CalculateOrderQuantity(buy_symbol, 0.5) # if quantity > 0: # self.Buy(buy_symbol, quantity) # self.Sell(sell_symbol, quantity) return ###############################
from AlgorithmImports import * # # # # TODO: # [ ] 1. Make trades individually (no combo orders) # [ ] 2. Go live on tradier paper # [ ] 5. have QC buy / selll at worst price (bid/ask) # [ ] 3. Accurately track PnL with Money manager # [ ] 4. exit at 30% of max profit # [ ] 5. Refactor # # # class ZeroDTE(QCAlgorithm): def Initialize(self): ## Backtest Params self.SetStartDate(2023, 3, 1) self.SetEndDate(2023, 9, 29) self.SetCash(10000) ## Subscribe to asset and set benchmark self.ticker = "SPY" assetTimeFrame = Resolution.Hour self.symbol = self.AddEquity(self.ticker, assetTimeFrame).Symbol # self.SetBrokerageModel( TradierBrokerageModel(AccountType. Margin)) self.minsAfterOpen = int(self.GetParameter("minsAfterOpen")) self.chainLength = int(self.GetParameter("chainLength")) self.wingWidth = int(self.GetParameter("wingWidth")) self.pctFromPrice = float(self.GetParameter("pctFromPrice")) self.useComboOrders = False # True self.max_profit, self.max_loss = 0,0 self.SetBenchmark(self.ticker) option = self.AddOption(self.ticker, assetTimeFrame) self.optionSymbol = option.Symbol option.SetFilter(lambda universe: universe.IncludeWeeklys().Strikes(-(self.chainLength//2), self.chainLength//2).Expiration(0, 0)) # avoid getting assigned self.SetSecurityInitializer(CompositeSecurityInitializer(self.SecurityInitializer, FuncSecurityInitializer(self.CustomSecurityInitializer))) self.ScheduleRoutines() def ScheduleRoutines(self): self.Schedule.On( self.DateRules.EveryDay(self.symbol), self.TimeRules.AfterMarketOpen(self.symbol, self.minsAfterOpen), self.CheckForEntries) def OnData(self, data): if self.IsWarmingUp or (self.ticker not in data ) or (data[self.ticker] is None): return # self.CheckForEntries() if self.Portfolio.Invested: # Check if we are holding underlying, if so, exit. for symbol, holding in self.Portfolio.items(): # Check if the holding is not an option if (holding.Invested) and (holding.Type != SecurityType.Option): self.Debug(f"Holding {symbol.Value} is a {holding.Type}. Liquidate.") self.Liquidate(tag=f"Holding {symbol.Value} is a {holding.Type}") def CheckForEntries(self): # self.Plot("active securities", "Count", len(self.ActiveSecurities)) if not self.Portfolio.Invested: data = self.CurrentSlice if self.IsWarmingUp or (self.ticker not in data ) or (data[self.ticker] is None): return # if not (self.gapm.IsReady and self.gapmWindow.IsReady and \ # self.emaFast.IsReady and self.emaSlow.IsReady): # return if True: self.OpenOptionsTrade(data) return def PriceBelowShortStrike(self): optionHoldings = self.GetOptionsHoldings() shortHolding = [x for x in optionHoldings if x.IsShort][0] shortPrice = shortHolding.Security.StrikePrice return self.CurrentSlice[self.ticker].Close < shortPrice def ExpiresTomorrow(self): expiration = None for symbol in self.Securities.Keys: portfolioPosition = self.Securities[symbol] if portfolioPosition.Invested: if( portfolioPosition.Type == SecurityType.Option ): expiration = portfolioPosition.Expiry if(expiration is None): return False else: return (expiration.date() == (self.Time + timedelta(days=1)).date()) def MaxLossTargetReached(self): profitPct = self.GetUnrealizedProfitPct() return (profitPct < -50) # return (profitPct > 25) def ProfitLossTargetReached(self): profitPct = self.GetUnrealizedProfitPct() return (profitPct > 50) or (profitPct < -50) # return (profitPct > 25) def GetOptionsHoldings(self): optionHoldings = [] for symbol in self.Securities.Keys: portfolioPosition = self.Securities[symbol] if portfolioPosition.Invested: if( portfolioPosition.Type == SecurityType.Option ): profitPct = round(portfolioPosition.Holdings.UnrealizedProfitPercent,2) optionHoldings.append(portfolioPosition.Holdings) return optionHoldings # Use OptionsMoneyManager def GetUnrealizedProfitPct(self): optionHoldings = self.GetOptionsHoldings() # for symbol in self.Securities.Keys: # # portfolioPosition = self.Securities[symbol] # # if portfolioPosition.Invested: # if( portfolioPosition.Type == SecurityType.Option ): # profitPct = round(portfolioPosition.Holdings.UnrealizedProfitPercent,2) # optionHoldings.append(portfolioPosition.Holdings) if (len(optionHoldings) != 2 ): self.Liquidate(tag="error with optin legs and unrealized pct") return 0 raise Exception("Need 2 option legs.") if (optionHoldings[0].IsLong == optionHoldings[1].IsLong ): raise Exception("Both options position are in the same direction (long/short).") if (optionHoldings[0].IsLong): long_price = optionHoldings[0].AveragePrice short_price = optionHoldings[1].AveragePrice long_profit = optionHoldings[0].UnrealizedProfitPercent short_profit = optionHoldings[1].UnrealizedProfitPercent else: long_price = optionHoldings[1].AveragePrice short_price = optionHoldings[0].AveragePrice long_profit = optionHoldings[1].UnrealizedProfitPercent short_profit = optionHoldings[0].UnrealizedProfitPercent w_long = long_price / (long_price + short_price) w_short = short_price / (long_price + short_price) netPctChange = (w_long * long_profit) + (w_short * short_profit) return (round(netPctChange,3) * 100) def OpenOptionsTrade(self, slice): # Get the OptionChain chain = slice.OptionChains.get(self.optionSymbol, None) if not chain: return # Get the furthest expiration date of the contracts expiry = sorted(chain, key = lambda x: x.Expiry, reverse=True)[0].Expiry qty = 1 if(self.useComboOrders): # Get ComboMarketOrders to trade. Doesnt Work with Tradier gotBPS, bps_strategy = self.OpenBullPutSpread(slice, chain, expiry) gotBCS, bcs_strategy = self.OpenBearCallSpread(slice, chain, expiry) else: # Get Contracts to trade gotBPS, short_put, long_put = self.OpenBullPutSpread(slice, chain, expiry) gotBCS, short_call, long_call = self.OpenBearCallSpread(slice, chain, expiry) # if we got contracts, trade them if( gotBPS and gotBCS ): if(self.useComboOrders): self.Buy(bps_strategy, qty, tag=f"Open PUT BPS @ ${self.CurrentSlice[self.ticker].Close}") self.Buy(bcs_strategy, qty, tag=f"Open CALL BCS @ ${self.CurrentSlice[self.ticker].Close}") else: ## Without ComboMarketOrders # --------------------------- bpsLegs = f" -{short_put.ID.StrikePrice} +{long_put.ID.StrikePrice}" bcsLegs = f" -{short_call.ID.StrikePrice} +{long_call.ID.StrikePrice}" bpsTag = f"Open PUT BPS @ ${self.CurrentSlice[self.ticker].Close} | {bpsLegs}" bcsTag = f"Open CALL BCS @ ${self.CurrentSlice[self.ticker].Close} | {bcsLegs}" # self.Debug(f"BPS: -{short_put.ID.StrikePrice} +{long_put.ID.StrikePrice}") # self.Debug(f"BCS: -{short_call.ID.StrikePrice} +{long_call.ID.StrikePrice}") self.MarketOrder(short_put, -qty, tag=bpsTag) self.MarketOrder(long_put, qty, tag=bpsTag) self.MarketOrder(short_call, -qty, tag=bcsTag) self.MarketOrder(long_call, qty, tag=bcsTag) # SetHoldings() # self.Sell() # self.SetHoldings([PortfolioTarget(short_put, -qty), PortfolioTarget(long_put, qty)], tag=bpsTag) # self.SetHoldings([PortfolioTarget(short_call, -qty), PortfolioTarget(long_call, qty)], tag=bcsTag) self.max_profit, self.max_loss = self.calculate_max_profit_loss(qty, short_put, long_put, short_call, long_call) pass if(not gotBCS and not gotBPS): self.Debug(f"{self.Time} | NO STRIKES: No favorable strikes found. Skipping trade") elif (not gotBCS): self.Debug(f"{self.Time} | NO CALLS: No favorable strikes found. Skipping trade") elif (not gotBPS): self.Debug(f"{self.Time} | NO PUTS: No favorable strikes found. Skipping trade") def OpenBullPutSpread(self, slice, chain, expiry): # Select the put Option contracts with the furthest expiry puts = [i for i in chain if i.Expiry == expiry and i.Right == OptionRight.Put] if len(puts) == 0: if(self.useComboOrders): return False, None else: return False, None, None # Select the ITM and OTM contract strikes from the remaining contracts put_strikes = sorted([x.Strike for x in puts]) # ------------------------------------------------ # if(len(put_strikes)) < (self.chainLength): # # self.Debug(f"{self.Time} | Not enough strikes. skipping trade") # return False, None # write a function: assignStrikes(pctFromPrice, n), that does the following. # given a list of sorted numbers, write python code that will select 2 of them and store into variables as follows , # long_put_strike = the value that is at most "pctFromPrice" percent distance # short_put_strike = the value that is 'n' positions above the above value selected for long_put_strike. # If either of these cannot be set, then return a tuple: 'False, None' # if both can be set, return True # self.pctFromPrice # ## Beginning of the chain (low values) # long_put_strike = put_strikes[0] # short_put_strike = put_strikes[0+self.wingWidth] # ------------------------------- price = slice[self.ticker].Close if len(put_strikes) < self.wingWidth + 1: if(self.useComboOrders): return False, None else: return False, None, None # short_put_index = 0 # while short_put_index < len(put_strikes) and ((price - put_strikes[short_put_index])/price * 100) > self.pctFromPrice: # short_put_index += 1 #----- # while short_put_index < len(put_strikes) and ((price - put_strikes[short_put_index])/price * 100) <= self.pctFromPrice: # short_put_index += 1 #---- # short_put_index = len(put_strikes) - 1 # while short_put_index >= 0 and ((price - put_strikes[short_put_index])/price * 100) > self.pctFromPrice: # short_put_index += 1 #------- short_put_index = 0 while short_put_index < len(put_strikes) and ((price - put_strikes[short_put_index])/price * 100) > self.pctFromPrice: short_put_index += 1 # if short_put_index == len(put_strikes) or ((short_put_index + self.wingWidth) >= len(put_strikes)): if (short_put_index - self.wingWidth) < 0: if(self.useComboOrders): return False, None else: return False, None, None short_put_strike = put_strikes[short_put_index] long_put_strike = put_strikes[short_put_index - self.wingWidth] if self.useComboOrders: option_strategy = OptionStrategies.BullPutSpread(self.optionSymbol, short_put_strike, long_put_strike, expiry) return True, option_strategy else: short_contract = [i for i in puts if i.Strike == short_put_strike][0] long_contract = [i for i in puts if i.Strike == long_put_strike][0] return True, short_contract.Symbol, long_contract.Symbol ## --------------------------------------------------------------------------------- def OpenBearCallSpread(self, slice, chain, expiry): # Select the call Option contracts with the furthest expiry calls = [i for i in chain if i.Expiry == expiry and i.Right == OptionRight.Call] if len(calls) == 0: if(self.useComboOrders): return False, None else: return False, None, None # Select the ITM and OTM contract strikes from the remaining contracts call_strikes = sorted([x.Strike for x in calls]) price = slice[self.ticker].Close if len(call_strikes) < self.wingWidth + 1: if(self.useComboOrders): return False, None else: return False, None, None short_call_index = 0 while short_call_index < len(call_strikes) and ((call_strikes[short_call_index] - price)/price * 100) <= self.pctFromPrice: short_call_index += 1 if short_call_index == len(call_strikes) or ((short_call_index + self.wingWidth) >= len(call_strikes)): if(self.useComboOrders): return False, None else: return False, None, None short_call_strike = call_strikes[short_call_index] long_call_strike = call_strikes[short_call_index + self.wingWidth] if(self.useComboOrders): option_strategy = OptionStrategies.BearCallSpread(self.optionSymbol, short_call_strike, long_call_strike, expiry) return True, option_strategy else: short_contract = [i for i in calls if i.Strike == short_call_strike][0] long_contract = [i for i in calls if i.Strike == long_call_strike][0] return True, short_contract.Symbol, long_contract.Symbol def CustomSecurityInitializer(self, security): if Extensions.IsOption(security.Symbol.SecurityType): security.SetOptionAssignmentModel(NullOptionAssignmentModel()) security.SetFeeModel(ConstantFeeModel(0)) security.SetMarketPrice(self.GetLastKnownPrice(security)) def calculate_max_profit_loss(self, quantity, short_put, long_put, short_call, long_call): """ Calculate the maximum profit and loss for an iron condor trade. Args: - quantity: Number of iron condors - long_put_ask: Ask price of the long put - short_put_bid: Bid price of the short put - long_call_ask: Ask price of the long call - short_call_bid: Bid price of the short call - short_put_strike, long_put_strike, short_call_strike, long_call_strike: Strike prices of the options Returns: - max_profit: Maximum profit from the trade - max_loss: Maximum loss from the trade """ # Calculate net credit received when opening the trade (consider 100 shares per option for standard contracts) # quantity = # Number of iron condors short_put_bid = self.Securities[short_put].BidPrice # Bid price of the short put long_put_ask = self.Securities[long_put].AskPrice # Ask price of the long put short_call_bid = self.Securities[short_call].BidPrice # Bid price of the short call long_call_ask = self.Securities[long_call].AskPrice # Ask price of the long call short_put_strike = short_put.ID.StrikePrice long_put_strike = long_put.ID.StrikePrice short_call_strike = short_call.ID.StrikePrice long_call_strike = long_call.ID.StrikePrice net_credit = ((short_put_bid - long_put_ask) + (short_call_bid - long_call_ask)) * 100 * quantity # Calculate the maximum loss put_strike_difference = short_put_strike - long_put_strike call_strike_difference = long_call_strike - short_call_strike strike_difference = min(put_strike_difference, call_strike_difference) * 100 max_loss = (strike_difference - net_credit) * quantity max_profit = net_credit return max_profit, max_loss def calculate_percentage_max_profit(self,current_net_credit, initial_net_credit, quantity): """ Calculate the percentage of the maximum profit achieved so far based on the current market value of the positions. Args: - current_net_credit: The current net credit of the open positions (if closed now) - initial_net_credit: The net credit received when the trade was opened - quantity: Number of iron condors Returns: - percentage_of_max_profit: The percentage of the maximum profit achieved """ # The difference in credit, multiplied by 100 for standard contracts, gives the realized profit so far realized_profit = (initial_net_credit - current_net_credit) * quantity # The percentage of the maximum profit achieved percentage_of_max_profit = (realized_profit / initial_net_credit) * 100 return percentage_of_max_profit