Overall Statistics |
Total Orders 32 Average Win 2.08% Average Loss -1.06% Compounding Annual Return 79.426% Drawdown 1.300% Expectancy 0.484 Start Equity 100000 End Equity 105036 Net Profit 5.036% Sharpe Ratio 6.081 Sortino Ratio 0 Probabilistic Sharpe Ratio 99.992% Loss Rate 50% Win Rate 50% Profit-Loss Ratio 1.97 Alpha 0.272 Beta 0.108 Annual Standard Deviation 0.054 Annual Variance 0.003 Information Ratio -1.8 Tracking Error 0.1 Treynor Ratio 3.017 Total Fees $64.00 Estimated Strategy Capacity $360000.00 Lowest Capacity Asset SPXW 32DZDFXYLF3M6|SPX 31 Portfolio Turnover 0.50% |
#region imports from AlgorithmImports import * #endregion from Initialization import SetupBaseStructure from Alpha.Utils import Scanner, Order, Stats from Tools import ContractUtils, Logger, Underlying from Strategy import Leg, Position, OrderType, WorkingOrder """ NOTE: We can't use multiple inheritance in Python because this is a managed class. We will use composition instead so in order to call the methods of SetupBaseStructure we'll call then using self.setup.methodName(). ---------------------------------------------------------------------------------------------------------------------------------------- The base class for all the alpha models. It is used to setup the base structure of the algorithm and to run the strategies. This class has some configuration capabilities that can be used to setup the strategies more easily by just changing the configuration parameters. Here are the default values for the configuration parameters: scheduleStartTime: time(9, 30, 0) scheduleStopTime: None scheduleFrequency: timedelta(minutes = 5) maxActivePositions: 1 dte: 0 dteWindow: 0 ---------------------------------------------------------------------------------------------------------------------------------------- The workflow of the algorithm is the following: `Update` method gets called every minute - If the market is closed, the algorithm exits - If algorithm is warming up, the algorithm exits - The Scanner class is used to filter the option chain - If the chain is empty, the algorithm exits - The CreateInsights method is called - Inside the GetOrder method is called """ class Base(AlphaModel): # Internal counter for all the orders orderCount = 0 DEFAULT_PARAMETERS = { # The start time at which the algorithm will start scheduling the strategy execution # (to open new positions). No positions will be opened before this time "scheduleStartTime": time(9, 30, 0), # The stop time at which the algorithm will look to open a new position. "scheduleStopTime": None, # time(13, 0, 0), # Periodic interval with which the algorithm will check to open new positions "scheduleFrequency": timedelta(minutes=5), # Minimum time distance between opening two consecutive trades "minimumTradeScheduleDistance": timedelta(days=1), # If True, the order is not placed if the legs are already part of an existing position. "checkForDuplicatePositions": True, # Maximum number of open positions at any given time "maxActivePositions": 1, # Maximum quantity used to scale each position. If the target premium cannot be reached within this # quantity (i.e. premium received is too low), the position is not going to be opened "maxOrderQuantity": 1, # If True, the order is submitted as long as it does not exceed the maxOrderQuantity. "validateQuantity": True, # Days to Expiration "dte": 0, # The size of the window used to filter the option chain: options expiring in the range [dte-dteWindow, dte] will be selected "dteWindow": 0, # DTE Threshold. This is ignored if self.dte < self.dteThreshold "dteThreshold": 21, # Controls whether to use the furthest (True) or the earliest (False) expiration date when multiple expirations are available in the chain "useFurthestExpiry": True, # Controls whether to consider the DTE of the last closed position when opening a new one: # If True, the Expiry date of the new position is selected such that the open DTE is the nearest to the DTE of the closed position "dynamicDTESelection": False, # Coarse filter for the Universe selection. It selects nStrikes on both sides of the ATM strike for each available expiration "nStrikesLeft": 200, # 200 SPX @ 3820 & 3910C w delta @ 1.95 => 90/5 = 18 "nStrikesRight": 200, # 200 # Controls what happens when an open position reaches/crosses the dteThreshold ( -> DTE(openPosition) <= dteThreshold) # - If True, the position is closed as soon as the dteThreshold is reached, regardless of whether the position is profitable or not # - If False, once the dteThreshold is reached, the position is closed as soon as it is profitable "forceDteThreshold": False, # DIT Threshold. This is ignored if self.dte < self.ditThreshold "ditThreshold": None, "hardDitThreshold": None, # Controls what happens when an open position reaches/crosses the ditThreshold ( -> DIT(openPosition) >= ditThreshold) # - If True, the position is closed as soon as the ditThreshold is reached, regardless of whether the position is profitable or not # - If False, once the ditThreshold is reached, the position is closed as soon as it is profitable # - If self.hardDitThreashold is set, the position is closed once the hardDitThreashold is # crossed, regardless of whether forceDitThreshold is True or False "forceDitThreshold": False, # Slippage used to set Limit orders "slippage": 0.0, # Used when validateBidAskSpread = True. if the ratio between the bid-ask spread and the # mid-price is higher than this parameter, the order is not executed "bidAskSpreadRatio": 0.3, # If True, the order mid-price is validated to make sure the Bid-Ask spread is not too wide. # - The order is not submitted if the ratio between Bid-Ask spread of the entire order and its mid-price is more than self.bidAskSpreadRatio "validateBidAskSpread": False, # Control whether to allow multiple positions to be opened for the same Expiration date "allowMultipleEntriesPerExpiry": False, # Controls whether to include details on each leg (open/close fill price and descriptive statistics about mid-price, Greeks, and IV) "includeLegDetails": False, # The frequency (in minutes) with which the leg details are updated (used only if includeLegDetails = True) "legDatailsUpdateFrequency": 30, # Controls whether to track the details on each leg across the life of the trade "trackLegDetails": False, # Controls which greeks are included in the output log # "greeksIncluded": ["Delta", "Gamma", "Vega", "Theta", "Rho", "Vomma", "Elasticity"], "greeksIncluded": [], # Controls whether to compute the greeks for the strategy. If True, the greeks will be computed and stored in the contract under BSMGreeks. "computeGreeks": False, # The time (on expiration day) at which any position that is still open will closed "marketCloseCutoffTime": time(15, 45, 0), # Limit Order Management "useLimitOrders": True, # Adjustment factor applied to the Mid-Price to set the Limit Order: # - Credit Strategy: # Adj = 0.3 --> sets the Limit Order price 30% higher than the current Mid-Price # - Debit Strategy: # Adj = -0.2 --> sets the Limit Order price 20% lower than the current Mid-Price "limitOrderRelativePriceAdjustment": 0, # Set expiration for Limit orders. This tells us how much time a limit order will stay in pending mode before it gets a fill. "limitOrderExpiration": timedelta(hours=8), # Alternative method to set the absolute price (per contract) of the Limit Order. This method is used if a number is specified # Unless you know that your price target can get a fill, it is advisable to use a relative adjustment or you may never get your order filled # - Credit Strategy: # AbsolutePrice = 1.5 --> sets the Limit Order price at exactly 1.5$ # - Debit Strategy: # AbsolutePrice = -2.3 --> sets the Limit Order price at exactly -2.3$ "limitOrderAbsolutePrice": None, # Target <credit|debit> premium amount: used to determine the number of contracts needed to reach the desired target amount # - targetPremiumPct --> target premium is expressed as a percentage of the total Portfolio Net Liq (0 < targetPremiumPct < 1) # - targetPremium --> target premium is a fixed dollar amount # If both are specified, targetPremiumPct takes precedence. If none of them are specified, # the number of contracts specified by the maxOrderQuantity parameter is used. "targetPremiumPct": None, # You can't have one without the other in this case below. # Minimum premium accepted for opening a new position. Setting this to None disables it. "minPremium": None, # Maximum premium accepted for opening a new position. Setting this to None disables it. "maxPremium": None, "targetPremium": None, # Defines how the profit target is calculated. Valid options are (case insensitive): # - Premium: the profit target is a percentage of the premium paid/received. # - Theta: the profit target is calculated based on the theta value of the position evaluated # at self.thetaProfitDays from the time of entering the trade # - TReg: the profit target is calculated as a percentage of the TReg (MaxLoss + openPremium) # - Margin: the profit target is calculted as a percentage of the margin requirement (calculated based on # self.portfolioMarginStress percentage upside/downside movement of the underlying) "profitTargetMethod": "Premium", # Profit Target Factor (Multiplier of the premium received/paid when the position was opened) "profitTarget": 0.6, # Number of days into the future at which the theta of the position is calculated. Used if profitTargetMethod = "Theta" "thetaProfitDays": None, # Delta and Wing size used for Naked Put/Call and Spreads "delta": 10, "wingSize": 10, # Put/Call delta for Iron Condor "putDelta": 10, "callDelta": 10, # Net delta for Straddle, Iron Fly and Butterfly (using ATM strike if netDelta = None) "netDelta": None, # Put/Call Wing size for Iron Condor, Iron Fly "putWingSize": 10, "callWingSize": 10, # Butterfly specific parameters "butteflyType": None, "butterflyLeftWingSize": 10, "butterflyRightWingSize": 10, # useSlice determines if we should use the chainOption slice data instead of optionProvider. Default is set to FALSE "useSlice": True, } def __init__(self, context): self.context = context # Set default name (use the class name) self.name = type(self).__name__ # Set the Strategy Name (optional) self.nameTag = self.name # Set the logger self.logger = Logger(context, className=type(self).__name__, logLevel=context.logLevel) self.order = Order(context, self) # This adds all the parameters to the class. We can also access them via self.parameter("parameterName") self.context.structure.AddConfiguration(parent=self, **self.getMergedParameters()) # Initialize the contract utils self.contractUtils = ContractUtils(context) # Initialize the stats dictionary # This will hold any details related to the underlying. # For example, the underlying price at the time of opening of day self.stats = Stats() self.logger.debug(f'{self.name} -> __init__') @staticmethod def getNextOrderId(): Base.orderCount += 1 return Base.orderCount @classmethod def getMergedParameters(cls): # Merge the DEFAULT_PARAMETERS from both classes return {**cls.DEFAULT_PARAMETERS, **getattr(cls, "PARAMETERS", {})} @classmethod def parameter(cls, key, default=None): return cls.getMergedParameters().get(key, default) def update(self, algorithm: QCAlgorithm, data: Slice) -> List[Insight]: insights = [] # Start the timer self.context.executionTimer.start('Alpha.Base -> Update') self.logger.debug(f'{self.name} -> update -> start') self.logger.debug(f'Is Warming Up: {self.context.IsWarmingUp}') self.logger.debug(f'Is Market Open: {self.context.IsMarketOpen(self.underlyingSymbol)}') self.logger.debug(f'Time: {self.context.Time}') # Exit if the algorithm is warming up or the market is closed (avoid processing orders on the last minute as these will be executed the following day) if self.context.IsWarmingUp or\ not self.context.IsMarketOpen(self.underlyingSymbol) or\ self.context.Time.time() >= time(16, 0, 0): return insights self.logger.debug(f'Did Alpha UPDATE after warmup?!?') # This thing just passes the data to the performance tool so we can keep track of all # symbols. This should not be needed if the culprit of the slonwess of backtesting is sorted. self.context.performance.OnUpdate(data) # Update the stats dictionary self.syncStats() # Check if the workingOrders are still OK to execute self.context.structure.checkOpenPositions() # Run the strategies to open new positions filteredChain, lastClosedOrderTag = Scanner(self.context, self).Call(data) self.logger.debug(f'Did Alpha SCAN') self.logger.debug(f'Last Closed Order Tag: {lastClosedOrderTag}') if filteredChain is not None: if self.stats.hasOptions == False: self.logger.info(f"Found options {self.context.Time.strftime('%A, %Y-%m-%d %H:%M')}") self.stats.hasOptions = True insights = self.CreateInsights(filteredChain, lastClosedOrderTag, data) elif self.stats.hasOptions is None and self.context.Time.time() >= time(9, 35, 0): self.stats.hasOptions = False self.logger.info(f"No options data for {self.context.Time.strftime('%A, %Y-%m-%d %H:%M')}") self.logger.debug(f"NOTE: Why could this happen? A: The filtering of the chain caused no contracts to be returned. Make sure to make a check on this.") # Stop the timer self.context.executionTimer.stop('Alpha.Base -> Update') return Insight.Group(insights) # Get the order with extra filters applied by the strategy def GetOrder(self, chain): raise NotImplementedError("GetOrder() not implemented") # Previous method CreateOptionPosition.py#OpenPosition def CreateInsights(self, chain, lastClosedOrderTag=None, data = Slice) -> List[Insight]: insights = [] # Call the getOrder method of the class implementing OptionStrategy order = self.getOrder(chain, data) # Execute the order # Exit if there is no order to process if order is None: return insights # Start the timer self.context.executionTimer.start('Alpha.Base -> CreateInsights') # Get the context context = self.context order = [order] if not isinstance(order, list) else order for o in order: self.logger.debug(f"CreateInsights -> strategyId: {o['strategyId']}, strikes: {o['strikes']}") for single_order in order: position, workingOrder = self.buildOrderPosition(single_order, lastClosedOrderTag) self.logger.debug(f"CreateInsights -> position: {position}") self.logger.debug(f"CreateInsights -> workingOrder: {workingOrder}") if position is None: continue if self.hasDuplicateLegs(single_order): self.logger.debug(f"CreateInsights -> Duplicate legs found in order: {single_order}") continue orderId = position.orderId orderTag = position.orderTag insights.extend(workingOrder.insights) # Add this position to the global dictionary context.allPositions[orderId] = position context.openPositions[orderTag] = orderId # Keep track of all the working orders context.workingOrders[orderTag] = {} # Map each contract to the openPosition dictionary (key: expiryStr) context.workingOrders[orderTag] = workingOrder self.logger.debug(f"CreateInsights -> insights: {insights}") # Stop the timer self.context.executionTimer.stop('Alpha.Base -> CreateInsights') return insights def buildOrderPosition(self, order, lastClosedOrderTag=None): # Get the context context = self.context # Get the list of contracts contracts = order["contracts"] self.logger.debug(f"buildOrderPosition -> contracts: {len(contracts)}") # Exit if there are no contracts if (len(contracts) == 0): return [None, None] useLimitOrders = self.useLimitOrders useMarketOrders = not useLimitOrders # Current timestamp currentDttm = self.context.Time strategyId = order["strategyId"] contractSide = order["contractSide"] # midPrices = order["midPrices"] strikes = order["strikes"] # IVs = order["IV"] expiry = order["expiry"] targetPremium = order["targetPremium"] maxOrderQuantity = order["maxOrderQuantity"] orderQuantity = order["orderQuantity"] bidAskSpread = order["bidAskSpread"] orderMidPrice = order["orderMidPrice"] limitOrderPrice = order["limitOrderPrice"] maxLoss = order["maxLoss"] targetProfit = order.get("targetProfit", None) # Expiry String expiryStr = expiry.strftime("%Y-%m-%d") self.logger.debug(f"buildOrderPosition -> expiry: {expiry}, expiryStr: {expiryStr}") # Validate the order prior to submit if ( # We have a minimum order quantity orderQuantity == 0 # The sign of orderMidPrice must be consistent with whether this is a credit strategy (+1) or debit strategy (-1) or np.sign(orderMidPrice) != 2 * int(order["creditStrategy"]) - 1 # Exit if the order quantity exceeds the maxOrderQuantity or (self.validateQuantity and orderQuantity > maxOrderQuantity) # Make sure the bid-ask spread is not too wide before opening the position. # Only for Market orders. In case of limit orders, this validation is done at the time of execution of the Limit order or (useMarketOrders and self.validateBidAskSpread and abs(bidAskSpread) > self.bidAskSpreadRatio * abs(orderMidPrice))): return [None, None] self.logger.debug(f"buildOrderPosition -> orderMidPrice: {orderMidPrice}, orderQuantity: {orderQuantity}, maxOrderQuantity: {maxOrderQuantity}") # Get the current price of the underlying underlyingPrice = self.contractUtils.getUnderlyingLastPrice(contracts[0]) # Get the Order Id and add it to the order dictionary orderId = self.getNextOrderId() # Create unique Tag to keep track of the order when the fill occurs orderTag = f"{strategyId}-{orderId}" strategyLegs = [] self.logger.debug(f"buildOrderPosition -> strategyLegs: {strategyLegs}") for contract in contracts: key = order["contractSideDesc"][contract.Symbol] leg = Leg( key=key, strike=strikes[key], expiry=order["contractExpiry"][key], contractSide=contractSide[contract.Symbol], symbol=contract.Symbol, contract=contract, ) strategyLegs.append(leg) position = Position( orderId=orderId, orderTag=orderTag, strategy=self, strategyTag=self.nameTag, strategyId=strategyId, legs=strategyLegs, expiry=expiry, expiryStr=expiryStr, targetProfit=targetProfit, linkedOrderTag=lastClosedOrderTag, contractSide=contractSide, openDttm=currentDttm, openDt=currentDttm.strftime("%Y-%m-%d"), openDTE=(expiry.date() - currentDttm.date()).days, limitOrder=useLimitOrders, targetPremium=targetPremium, orderQuantity=orderQuantity, maxOrderQuantity=maxOrderQuantity, openOrderMidPrice=orderMidPrice, openOrderMidPriceMin=orderMidPrice, openOrderMidPriceMax=orderMidPrice, openOrderBidAskSpread=bidAskSpread, openOrderLimitPrice=limitOrderPrice, # underlyingPriceAtOrderOpen=underlyingPrice, underlyingPriceAtOpen=underlyingPrice, openOrder=OrderType( limitOrderExpiryDttm=context.Time + self.limitOrderExpiration, midPrice=orderMidPrice, limitOrderPrice=limitOrderPrice, bidAskSpread=bidAskSpread, maxLoss=maxLoss ) ) self.logger.debug(f"buildOrderPosition -> position: {position}") # Create combo orders by using the provided method instead of always calling MarketOrder. insights = [] # Create the orders for contract in contracts: # Get the contract side (Long/Short) orderSide = contractSide[contract.Symbol] insight = Insight.Price( contract.Symbol, position.openOrder.limitOrderExpiryDttm, InsightDirection.Down if orderSide == -1 else InsightDirection.Up ) insights.append(insight) self.logger.debug(f"buildOrderPosition -> insights: {insights}") # Map each contract to the openPosition dictionary (key: expiryStr) workingOrder = WorkingOrder( positionKey=orderId, insights=insights, limitOrderPrice=limitOrderPrice, orderId=orderId, strategy=self, strategyTag=self.nameTag, useLimitOrder=useLimitOrders, orderType="open", fills=0 ) self.logger.debug(f"buildOrderPosition -> workingOrder: {workingOrder}") return [position, workingOrder] def hasDuplicateLegs(self, order): # Check if checkForDuplicatePositions is enabled if not self.checkForDuplicatePositions: return False # Get the context context = self.context # Get the list of contracts contracts = order["contracts"] openPositions = context.openPositions """ workingOrders = context.workingOrders # Get a list of orderIds from openPositions and workingOrders orderIds = list(openPositions.keys()) + [workingOrder.orderId for workingOrder in workingOrders.values()] # Iterate through the list of orderIds for orderId in orderIds: """ # Iterate through open positions for orderTag, orderId in list(openPositions.items()): position = context.allPositions[orderId] # Check if the expiry matches if position.expiryStr != order["expiry"].strftime("%Y-%m-%d"): continue # Check if the strategy matches (if allowMultipleEntriesPerExpiry is False) if not self.allowMultipleEntriesPerExpiry and position.strategyId == order["strategyId"]: return True # Compare legs position_legs = set((leg.strike, leg.contractSide) for leg in position.legs) order_legs = set((contract.Strike, order["contractSide"][contract.Symbol]) for contract in contracts) if position_legs == order_legs: return True return False """ This method is called every minute to update the stats dictionary. """ def syncStats(self): # Get the current day currentDay = self.context.Time.date() # Update the underlyingPriceAtOpen to be set at the start of each day underlying = Underlying(self.context, self.underlyingSymbol) if currentDay != self.stats.currentDay: self.logger.trace(f"Previous day: {self.stats.currentDay} data {self.stats.underlyingPriceAtOpen}, {self.stats.highOfTheDay}, {self.stats.lowOfTheDay}") self.stats.underlyingPriceAtOpen = underlying.Price() # Update the high/low of the day self.stats.highOfTheDay = underlying.Close() self.stats.lowOfTheDay = underlying.Close() # Add a dictionary to keep track of whether the price has touched the EMAs self.stats.touchedEMAs = {} self.logger.debug(f"Updating stats for {currentDay} Open: {self.stats.underlyingPriceAtOpen}, High: {self.stats.highOfTheDay}, Low: {self.stats.lowOfTheDay}") self.stats.currentDay = currentDay self.stats.hasOptions = None # This is like poor mans consolidator frequency = 5 # minutes # Continue the processing only if we are at the specified schedule if self.context.Time.minute % frequency != 0: return None # This should add the data for the underlying symbol chart. self.context.charting.updateCharts(symbol = self.underlyingSymbol) # Update the high/low of the day self.stats.highOfTheDay = max(self.stats.highOfTheDay, underlying.Close()) self.stats.lowOfTheDay = min(self.stats.lowOfTheDay, underlying.Close()) # The method will be called each time a consolidator is receiving data. We have a default one of 5 minutes # so if we need something to happen every 5 minutes this can be used for that. def dataConsolidated(self, sender, consolidated): pass def OnSecuritiesChanged(self, algorithm: QCAlgorithm, changes: SecurityChanges) -> None: # Security additions and removals are pushed here. # This can be used for setting up algorithm state. # changes.AddedSecurities # changes.RemovedSecurities pass
#region imports from AlgorithmImports import * #endregion from .Base import Base class CCModel(Base): PARAMETERS = { # The start time at which the algorithm will start scheduling the strategy execution (to open new positions). No positions will be opened before this time "scheduleStartTime": time(9, 30, 0), # The stop time at which the algorithm will look to open a new position. "scheduleStopTime": time(16, 0, 0), # Periodic interval with which the algorithm will check to open new positions "scheduleFrequency": timedelta(minutes = 5), # Maximum number of open positions at any given time "maxActivePositions": 1, # Control whether to allow multiple positions to be opened for the same Expiration date "allowMultipleEntriesPerExpiry": False, # Minimum time distance between opening two consecutive trades "minimumTradeScheduleDistance": timedelta(minutes=10), # Days to Expiration "dte": 7, # The size of the window used to filter the option chain: options expiring in the range [dte-dteWindow, dte] will be selected "dteWindow": 14, "useLimitOrders": True, "limitOrderRelativePriceAdjustment": 0.2, # Alternative method to set the absolute price (per contract) of the Limit Order. This method is used if a number is specified "limitOrderAbsolutePrice": 0.30, "limitOrderExpiration": timedelta(minutes=15), # Coarse filter for the Universe selection. It selects nStrikes on both sides of the ATM # strike for each available expiration # Example: 200 SPX @ 3820 & 3910C w delta @ 1.95 => 90/5 = 18 "nStrikesLeft": 35, "nStrikesRight": 35, # TODO fix this and set it based on buying power. # "maxOrderQuantity": 25, "validateQuantity": False, "targetPremiumPct": 0.015, # Minimum premium accepted for opening a new position. Setting this to None disables it. "minPremium": 0.05, # Maximum premium accepted for opening a new position. Setting this to None disables it. "maxPremium": 0.8, # Profit Target Factor (Multiplier of the premium received/paid when the position was opened) "profitTarget": 0.4, "bidAskSpreadRatio": 0.4, "validateBidAskSpread": True, "marketCloseCutoffTime": None, #time(15, 45, 0), # Put/Call Wing size for Iron Condor, Iron Fly # "targetPremium": 500, } def __init__(self, context): # Call the Base class __init__ method super().__init__(context) # You can change the name here self.name = "CCModel" self.nameTag = "CCModel" self.ticker = "TSLA" self.context.structure.AddUnderlying(self, self.ticker) def getOrder(self, chain, data): if data.ContainsKey(self.underlyingSymbol): self.logger.debug(f"CCModel -> getOrder: Data contains key {self.underlyingSymbol}") # Based on maxActivePositions set to 1. We should already check if there is an open position or # working order. If there is, then this will not even run. call = self.order.getNakedOrder( chain, 'call', fromPrice = self.minPremium, toPrice = self.maxPremium, sell=True ) self.logger.debug(f"CCModel -> getOrder: Call: {call}") if call is not None: return [call] else: return None else: return None
#region imports from AlgorithmImports import * #endregion from .Base import Base from Data.GoogleSheetsData import GoogleSheetsData class FPLModel(Base): PARAMETERS = { # The start time at which the algorithm will start scheduling the strategy execution (to open new positions). No positions will be opened before this time "scheduleStartTime": time(9, 20, 0), # The stop time at which the algorithm will look to open a new position. "scheduleStopTime": None, # time(13, 0, 0), # Periodic interval with which the algorithm will check to open new positions "scheduleFrequency": timedelta(minutes = 5), # Maximum number of open positions at any given time "maxActivePositions": 2, # Days to Expiration "dte": 0, # The size of the window used to filter the option chain: options expiring in the range [dte-dteWindow, dte] will be selected "dteWindow": 0, "useLimitOrders": True, "limitOrderRelativePriceAdjustment": 0.2, "limitOrderAbsolutePrice": 0.5, "limitOrderExpiration": timedelta(hours=1), # Coarse filter for the Universe selection. It selects nStrikes on both sides of the ATM # strike for each available expiration # Example: 200 SPX @ 3820 & 3910C w delta @ 1.95 => 90/5 = 18 "nStrikesLeft": 18, "nStrikesRight": 18, "maxOrderQuantity": 40, # Minimum premium accepted for opening a new position. Setting this to None disables it. "minPremium": 0.50, # Maximum premium accepted for opening a new position. Setting this to None disables it. "maxPremium": 1.5, "profitTarget": 0.5, "bidAskSpreadRatio": 0.4, "validateBidAskSpread": True, "marketCloseCutoffTime": None, #time(15, 45, 0), # "targetPremium": 500, } def __init__(self, context): # Call the Base class __init__ method super().__init__(context) # You can change the name here self.name = "FPLModel" self.nameTag = "FPL" self.ticker = "SPX" self.context.structure.AddUnderlying(self, self.ticker) self.customSymbol = self.context.AddData(GoogleSheetsData, "SPXT", Resolution.Minute).Symbol def getOrder(self, chain, data): if data.ContainsKey(self.customSymbol): self.logger.info(f'L: just got a new trade!! {data[self.customSymbol]}') print(f'P: just got a new trade!! {data[self.customSymbol]}') trade_instructions = data[self.customSymbol] tradeType = trade_instructions.Type condor = False self.logger.info(f'L: instructions: {trade_instructions}') print(f'P: instructions: {trade_instructions}') if tradeType == 'Call Credit Spreads': action = 'call' strike = trade_instructions.CallStrike elif tradeType == 'Put Credit Spreads': action = 'put' strike = trade_instructions.PutStrike elif tradeType == 'Iron Condor': callStrike = trade_instructions.CallStrike putStrike = trade_instructions.PutStrike condor = True else: return None if condor: return self.order.getIronCondorOrder( chain, callStrike = callStrike, putStrike = putStrike, callWingSize = 5, putWingSize = 5 ) else: return self.order.getSpreadOrder( chain, action, strike=strike, wingSize=5, sell=True ) else: return None # if not chain.ContainsKey('SPXTRADES'): # return [] # customTrades = chain['SPXTRADES'] # if customTrades is None: # return [] # # Check if the current time is past the instructed time # if self.context.Time < customTrades.Time: # return [] # # Use the customTrades data to generate insights # tradeType = customTrades.Type # call_strike = customTrades.CallStrike # put_strike = customTrades.PutStrike # minimum_premium = customTrades.MinimumPremium # self.Log(f'{data.EndTime}: Close: {data.Close}') # self.Plot(self.custom_data_symbol, 'Price', data.Close) # strike = self.context.underlyingPrice() + self.parameters["distance"]
# region imports from AlgorithmImports import * # endregion from .Base import Base class IBS(Base): PARAMETERS = { # The start time at which the algorithm will start scheduling the strategy execution (to open new positions). No positions will be opened before this time "scheduleStartTime": time(9, 30, 0), # The stop time at which the algorithm will look to open a new position. "scheduleStopTime": time(16, 0, 0), # Periodic interval with which the algorithm will check to open new positions "scheduleFrequency": timedelta(minutes = 5), # Maximum number of open positions at any given time "maxActivePositions": 10, # Control whether to allow multiple positions to be opened for the same Expiration date "allowMultipleEntriesPerExpiry": True, # Minimum time distance between opening two consecutive trades "minimumTradeScheduleDistance": timedelta(minutes=10), # Days to Expiration "dte": 0, # The size of the window used to filter the option chain: options expiring in the range [dte-dteWindow, dte] will be selected "dteWindow": 0, "useLimitOrders": True, "limitOrderRelativePriceAdjustment": 0.2, # Alternative method to set the absolute price (per contract) of the Limit Order. This method is used if a number is specified "limitOrderAbsolutePrice": 1.0, "limitOrderExpiration": timedelta(minutes=10), # Coarse filter for the Universe selection. It selects nStrikes on both sides of the ATM # strike for each available expiration # Example: 200 SPX @ 3820 & 3910C w delta @ 1.95 => 90/5 = 18 "nStrikesLeft": 18, "nStrikesRight": 18, # TODO fix this and set it based on buying power. # "maxOrderQuantity": 200, # COMMENT OUT this one below because it caused the orderQuantity to be 162 and maxOrderQuantity to be 10 so it would not place trades. "targetPremiumPct": 0.01, "validateQuantity": False, # Minimum premium accepted for opening a new position. Setting this to None disables it. "minPremium": 0.9, # Maximum premium accepted for opening a new position. Setting this to None disables it. "maxPremium": 1.2, # Profit Target Factor (Multiplier of the premium received/paid when the position was opened) "profitTarget": 1.0, "bidAskSpreadRatio": 0.4, "validateBidAskSpread": True, "marketCloseCutoffTime": time(15, 45, 0), # Put/Call Wing size for Iron Condor, Iron Fly "putWingSize": 10, "callWingSize": 10, # "targetPremium": 500, } def __init__(self, context): # Call the Base class __init__ method super().__init__(context) # You can change the name here self.name = "SPXic" self.nameTag = "SPXic" self.ticker = "SPX" self.context.structure.AddUnderlying(self, self.ticker) self.logger.debug(f"{self.__class__.__name__} -> __init__ -> AddUnderlying") self._ibs = InternalBarStrength() self.context.register_indicator(self.underlyingSymbol, self._ibs, Resolution.DAILY) def getOrder(self, chain, data): self.logger.debug(f"{self.__class__.__name__} -> getOrder -> start") self.logger.debug(f"SPXic -> getOrder -> data.ContainsKey(self.underlyingSymbol): {data.ContainsKey(self.underlyingSymbol)}") self.logger.debug(f"SPXic -> getOrder -> Underlying Symbol: {self.underlyingSymbol}") current_time = self.context.Time.time() market_open_time = time(9, 35, 0) # self.logger.debug(f"SPXic -> getOrder -> Current Time: {current_time}") # self.logger.debug(f"SPXic -> getOrder -> Market Open Time: {market_open_time}") # Check if it's market open time if current_time != market_open_time: # self.logger.debug(f"SPXic -> getOrder -> Not market open time, returning None") return None if data.ContainsKey(self.underlyingSymbol): ibs_value = self._ibs.current.value self.logger.debug(f"SPXic -> getOrder -> IBS Value: {ibs_value}") if ibs_value > 0.2: self.logger.debug(f"SPXic -> getOrder -> IBS Value > 0.2, returning None") return None self.logger.debug(f"SPXic -> getOrder: Data contains key {self.underlyingSymbol}") put = self.order.getSpreadOrder( chain, 'put', fromPrice=self.minPremium, toPrice=self.maxPremium, wingSize=self.putWingSize, sell=True ) self.logger.debug(f"SPXic -> getOrder: Put order details: {put}") if put is not None: return [put] else: self.logger.debug(f"SPXic -> getOrder: Data does not contain key {self.underlyingSymbol}, returning None") return None
#region imports from AlgorithmImports import * #endregion from .Base import Base class SPXButterfly(Base): PARAMETERS = { # The start time at which the algorithm will start scheduling the strategy execution (to open new positions). No positions will be opened before this time "scheduleStartTime": time(9, 30, 0), # The stop time at which the algorithm will look to open a new position. "scheduleStopTime": time(16, 0, 0), # Periodic interval with which the algorithm will check to open new positions "scheduleFrequency": timedelta(minutes = 15), # Maximum number of open positions at any given time "maxActivePositions": 30, # Control whether to allow multiple positions to be opened for the same Expiration date "allowMultipleEntriesPerExpiry": True, # Minimum time distance between opening two consecutive trades "minimumTradeScheduleDistance": timedelta(minutes=10), # Days to Expiration "dte": 0, # The size of the window used to filter the option chain: options expiring in the range [dte-dteWindow, dte] will be selected "dteWindow": 0, "useLimitOrders": True, "limitOrderRelativePriceAdjustment": 0.2, # Alternative method to set the absolute price (per contract) of the Limit Order. This method is used if a number is specified "limitOrderExpiration": timedelta(minutes=20), # Coarse filter for the Universe selection. It selects nStrikes on both sides of the ATM # strike for each available expiration # Example: 200 SPX @ 3820 & 3910C w delta @ 1.95 => 90/5 = 18 "nStrikesLeft": 18, "nStrikesRight": 18, # TODO fix this and set it based on buying power. "maxOrderQuantity": 1000, "targetPremiumPct": 0.015, # Minimum premium accepted for opening a new position. Setting this to None disables it. "minPremium": None, # Maximum premium accepted for opening a new position. Setting this to None disables it. "maxPremium": None, # Profit Target Factor (Multiplier of the premium received/paid when the position was opened) # "profitTarget": 1.0, "bidAskSpreadRatio": 0.4, "validateBidAskSpread": True, "marketCloseCutoffTime": time(16, 10, 0), # Put/Call Wing size for Iron Condor, Iron Fly "butterflyLeftWingSize": 35, "butterflyRightWingSize": 35, # "targetPremium": 500, } def __init__(self, context): # Call the Base class __init__ method super().__init__(context) # You can change the name here self.name = "SPXButterfly" self.nameTag = "SPXButterfly" self.ticker = "SPX" self.context.structure.AddUnderlying(self, self.ticker) def getOrder(self, chain, data): # Open trades at 13:00 if data.ContainsKey(self.underlyingSymbol): trade_times = [time(9, 45, 0)] current_time = self.context.Time.time() if current_time not in trade_times: return None fly = self.order.getIronFlyOrder( chain, callWingSize=self.butterflyLeftWingSize, putWingSize=self.butterflyRightWingSize, sell=True ) if fly is not None: return fly else: return None else: return None
#region imports from AlgorithmImports import * #endregion from .Base import Base class SPXCondor(Base): PARAMETERS = { # The start time at which the algorithm will start scheduling the strategy execution (to open new positions). No positions will be opened before this time "scheduleStartTime": time(9, 30, 0), # The stop time at which the algorithm will look to open a new position. "scheduleStopTime": time(16, 0, 0), # Periodic interval with which the algorithm will check to open new positions "scheduleFrequency": timedelta(minutes = 15), # Maximum number of open positions at any given time "maxActivePositions": 30, # Control whether to allow multiple positions to be opened for the same Expiration date "allowMultipleEntriesPerExpiry": True, # Minimum time distance between opening two consecutive trades "minimumTradeScheduleDistance": timedelta(minutes=10), # Days to Expiration "dte": 0, # The size of the window used to filter the option chain: options expiring in the range [dte-dteWindow, dte] will be selected "dteWindow": 0, "useLimitOrders": True, "limitOrderRelativePriceAdjustment": 0.2, # Alternative method to set the absolute price (per contract) of the Limit Order. This method is used if a number is specified "limitOrderAbsolutePrice": 0.90, "limitOrderExpiration": timedelta(minutes=5), # Coarse filter for the Universe selection. It selects nStrikes on both sides of the ATM # strike for each available expiration # Example: 200 SPX @ 3820 & 3910C w delta @ 1.95 => 90/5 = 18 "nStrikesLeft": 18, "nStrikesRight": 18, # TODO fix this and set it based on buying power. "maxOrderQuantity": 1000, "targetPremiumPct": 0.015, # Minimum premium accepted for opening a new position. Setting this to None disables it. "minPremium": None, # Maximum premium accepted for opening a new position. Setting this to None disables it. "maxPremium": None, # Profit Target Factor (Multiplier of the premium received/paid when the position was opened) "profitTarget": 1.0, "bidAskSpreadRatio": 0.4, "validateBidAskSpread": True, "marketCloseCutoffTime": None, #time(15, 45, 0), # Put/Call Wing size for Iron Condor, Iron Fly "putWingSize": 5, "callWingSize": 5, # "targetPremium": 500, } def __init__(self, context): # Call the Base class __init__ method super().__init__(context) # You can change the name here self.name = "SPXCondor" self.nameTag = "SPXCondor" self.ticker = "SPX" self.context.structure.AddUnderlying(self, self.ticker) def getOrder(self, chain, data): # Best time to open the trade: 9:45 + 10:15 + 12:30 + 13:00 + 13:30 + 13:45 + 14:00 + 15:00 + 15:15 + 15:45 # https://tradeautomationtoolbox.com/byob-ticks/?save=admZ4dG if data.ContainsKey(self.underlyingSymbol): trade_times = [time(9, 45, 0), time(13, 10, 0), time(15, 15, 0)] current_time = self.context.Time.time() if current_time not in trade_times: return None strike = self.order.strategyBuilder.getATMStrike(chain) condor = self.order.getIronCondorOrder( chain, callStrike=strike + 30, callWingSize=self.callWingSize, putStrike=strike - 30, putWingSize=self.putWingSize, sell=True ) if condor is not None: return condor else: return None else: return None
#region imports from AlgorithmImports import * #endregion from .Base import Base class SPXic(Base): PARAMETERS = { # The start time at which the algorithm will start scheduling the strategy execution (to open new positions). No positions will be opened before this time "scheduleStartTime": time(9, 30, 0), # The stop time at which the algorithm will look to open a new position. "scheduleStopTime": time(16, 0, 0), # Periodic interval with which the algorithm will check to open new positions "scheduleFrequency": timedelta(minutes = 5), # Maximum number of open positions at any given time "maxActivePositions": 10, # Control whether to allow multiple positions to be opened for the same Expiration date "allowMultipleEntriesPerExpiry": True, # Minimum time distance between opening two consecutive trades "minimumTradeScheduleDistance": timedelta(minutes=10), # Days to Expiration "dte": 0, # The size of the window used to filter the option chain: options expiring in the range [dte-dteWindow, dte] will be selected "dteWindow": 0, "useLimitOrders": True, "limitOrderRelativePriceAdjustment": 0.2, # Alternative method to set the absolute price (per contract) of the Limit Order. This method is used if a number is specified "limitOrderAbsolutePrice": 1.0, "limitOrderExpiration": timedelta(minutes=5), # Coarse filter for the Universe selection. It selects nStrikes on both sides of the ATM # strike for each available expiration # Example: 200 SPX @ 3820 & 3910C w delta @ 1.95 => 90/5 = 18 "nStrikesLeft": 18, "nStrikesRight": 18, # TODO fix this and set it based on buying power. # "maxOrderQuantity": 200, # COMMENT OUT this one below because it caused the orderQuantity to be 162 and maxOrderQuantity to be 10 so it would not place trades. "targetPremiumPct": 0.01, "validateQuantity": False, # Minimum premium accepted for opening a new position. Setting this to None disables it. "minPremium": 0.9, # Maximum premium accepted for opening a new position. Setting this to None disables it. "maxPremium": 1.2, # Profit Target Factor (Multiplier of the premium received/paid when the position was opened) "profitTarget": 1.0, "bidAskSpreadRatio": 0.4, "validateBidAskSpread": True, "marketCloseCutoffTime": time(15, 45, 0), # Put/Call Wing size for Iron Condor, Iron Fly "putWingSize": 10, "callWingSize": 10, # "targetPremium": 500, } def __init__(self, context): # Call the Base class __init__ method super().__init__(context) # You can change the name here self.name = "SPXic" self.nameTag = "SPXic" self.ticker = "SPX" self.context.structure.AddUnderlying(self, self.ticker) self.logger.debug(f"{self.__class__.__name__} -> __init__ -> AddUnderlying") def getOrder(self, chain, data): self.logger.debug(f"{self.__class__.__name__} -> getOrder -> start") self.logger.debug(f"SPXic -> getOrder -> data.ContainsKey(self.underlyingSymbol): {data.ContainsKey(self.underlyingSymbol)}") self.logger.debug(f"SPXic -> getOrder -> Underlying Symbol: {self.underlyingSymbol}") # Best time to open the trade: 9:45 + 10:15 + 12:30 + 13:00 + 13:30 + 13:45 + 14:00 + 15:00 + 15:15 + 15:45 # https://tradeautomationtoolbox.com/byob-ticks/?save=admZ4dG if data.ContainsKey(self.underlyingSymbol): self.logger.debug(f"SPXic -> getOrder: Data contains key {self.underlyingSymbol}") # trade_times = [time(9, 45, 0), time(10, 15, 0), time(12, 30, 0), time(13, 0, 0), time(13, 30, 0), time(13, 45, 0), time(14, 0, 0), time(15, 0, 0), time(15, 15, 0), time(15, 45, 0)] trade_times = [time(9, 45, 0), time(10, 15, 0), time(12, 30, 0), time(13, 0, 0), time(13, 30, 0), time(13, 45, 0), time(14, 0, 0)] # trade_times = [time(hour, minute, 0) for hour in range(9, 15) for minute in range(0, 60, 30) if not (hour == 15 and minute > 0)] # Remove the microsecond from the current time current_time = self.context.Time.time().replace(microsecond=0) self.logger.debug(f"SPXic -> getOrder -> current_time: {current_time}") self.logger.debug(f"SPXic -> getOrder -> trade_times: {trade_times}") self.logger.debug(f"SPXic -> getOrder -> current_time in trade_times: {current_time in trade_times}") if current_time not in trade_times: return None call = self.order.getSpreadOrder( chain, 'call', fromPrice=self.minPremium, toPrice=self.maxPremium, wingSize=self.callWingSize, sell=True ) put = self.order.getSpreadOrder( chain, 'put', fromPrice=self.minPremium, toPrice=self.maxPremium, wingSize=self.putWingSize, sell=True ) self.logger.debug(f"SPXic -> getOrder: Call: {call}") self.logger.debug(f"SPXic -> getOrder: Put: {put}") if call is not None and put is not None: return [call, put] else: return None else: return None
#region imports from AlgorithmImports import * #endregion import numpy as np from .OrderBuilder import OrderBuilder from Tools import ContractUtils, BSM, Logger from Strategy import Position class Order: def __init__(self, context, base): self.context = context self.base = base # Set the logger self.logger = Logger(context, className=type(self).__name__, logLevel=context.logLevel) # Initialize the BSM pricing model self.bsm = BSM(context) # Initialize the contract utils self.contractUtils = ContractUtils(context) # Initialize the Strategy Builder self.strategyBuilder = OrderBuilder(context) # Function to evaluate the P&L of the position def fValue(self, spotPrice, contracts, sides=None, atTime=None, openPremium=None): # Compute the theoretical value at the given Spot price and point in time prices = np.array( [ self.bsm.bsmPrice( contract, sigma=contract.BSMImpliedVolatility, spotPrice=spotPrice, atTime=atTime, ) for contract in contracts ] ) # Total value of the position value = openPremium + sum(prices * np.array(sides)) return value def getPayoff(self, spotPrice, contracts, sides): # Exit if there are no contracts to process if len(contracts) == 0: return 0 # Initialize the counter n = 0 # initialize the payoff payoff = 0 for contract in contracts: # direction: Call -> +1, Put -> -1 direction = 2*int(contract.Right == OptionRight.Call)-1 # Add the payoff of the current contract payoff += sides[n] * max(0, direction * (spotPrice - contract.Strike)) # Increment the counter n += 1 # Return the payoff return payoff def computeOrderMaxLoss(self, contracts, sides): # Exit if there are no contracts to process if len(contracts) == 0: return 0 # Get the current price of the underlying UnderlyingLastPrice = self.contractUtils.getUnderlyingLastPrice(contracts[0]) # Evaluate the payoff at the extreme (spotPrice = 0) maxLoss = self.getPayoff(0, contracts, sides) # Evaluate the payoff at each strike for contract in contracts: maxLoss = min(maxLoss, self.getPayoff(contract.Strike, contracts, sides)) # Evaluate the payoff at the extreme (spotPrice = 10x higher) maxLoss = min(maxLoss, self.getPayoff(UnderlyingLastPrice*10, contracts, sides)) # Cap the payoff at zero: we are only interested in losses maxLoss = min(0, maxLoss) # Return the max loss return maxLoss def getMaxOrderQuantity(self): # Get the context context = self.context # Get the maximum order quantity parameter maxOrderQuantity = self.base.maxOrderQuantity # Get the targetPremiumPct targetPremiumPct = self.base.targetPremiumPct # Check if we are using dynamic premium targeting if targetPremiumPct != None: # Scale the maxOrderQuantity consistently with the portfolio growth maxOrderQuantity = round(maxOrderQuantity * (1 + context.Portfolio.TotalProfit / context.initialAccountValue)) # Make sure we don't go below the initial parameter value maxOrderQuantity = max(self.base.maxOrderQuantity, maxOrderQuantity) # Return the result return maxOrderQuantity def isDuplicateOrder(self, contracts, sides): # Loop through all working orders of this strategy for orderTag in list(self.context.workingOrders): # Get the current working order workingOrder = self.context.workingOrders.get(orderTag) # Check if the number of contracts of this working order is the same as the number of contracts in the input list if workingOrder and workingOrder.insights == len(contracts): # Initialize the isDuplicate flag. Assume it's duplicate unless we find a mismatch isDuplicate = True # Loop through each pair (contract, side) for contract, side in zip(contracts, sides): # Get the details of the contract contractInfo = workingOrder.get(contract.Symbol) # If we cannot find this contract then it's not a duplicate if contractInfo == None: isDuplicate = False break # Get the orderSide and expiryStr properties orderSide = contractInfo.get("orderSide") expiryStr = contractInfo.get("expiryStr") # Check for a mismatch if (orderSide != side # Found the contract but it's on a different side (Sell/Buy) or expiryStr != contract.Expiry.strftime("%Y-%m-%d") # Found the contract but it's on a different Expiry ): # It's not a duplicate. Brake this innermost loop isDuplicate = False break # Exit if we found a duplicate if isDuplicate: return isDuplicate # If we got this far, there are no duplicates return False def limitOrderPrice(self, sides, orderMidPrice): # Get the limitOrderAbsolutePrice limitOrderAbsolutePrice = self.base.limitOrderAbsolutePrice # Get the minPremium and maxPremium to determine the limit price based on that. minPremium = self.base.minPremium maxPremium = self.base.maxPremium # Get the limitOrderRelativePriceAdjustment limitOrderRelativePriceAdjustment = self.base.limitOrderRelativePriceAdjustment or 0.0 # Compute Limit Order price if limitOrderAbsolutePrice is not None: if abs(orderMidPrice) < 1e-5: limitOrderRelativePriceAdjustment = 0 else: # Compute the relative price adjustment (needed to adjust each leg with the same proportion) limitOrderRelativePriceAdjustment = limitOrderAbsolutePrice / orderMidPrice - 1 # Use the specified absolute price limitOrderPrice = limitOrderAbsolutePrice else: # Set the Limit Order price (including slippage) limitOrderPrice = orderMidPrice * (1 + limitOrderRelativePriceAdjustment) # Compute the total slippage totalSlippage = sum(list(map(abs, sides))) * self.base.slippage # Add slippage to the limit order limitOrderPrice -= totalSlippage # Adjust the limit order price based on minPremium and maxPremium if minPremium is not None and limitOrderPrice < minPremium: limitOrderPrice = minPremium if maxPremium is not None and limitOrderPrice > maxPremium: limitOrderPrice = maxPremium return limitOrderPrice # Create dictionary with the details of the order to be submitted def getOrderDetails(self, contracts, sides, strategy, sell=True, strategyId=None, expiry=None, sidesDesc=None): # Exit if there are no contracts to process if not contracts: return # Exit if we already have a working order for the same set of contracts and sides if self.isDuplicateOrder(contracts, sides): return # Get the context context = self.context # Set the Strategy Id (if not specified) strategyId = strategyId or strategy.replace(" ", "") # Get the Expiration from the first contract (unless otherwise specified expiry = expiry or contracts[0].Expiry # Get the last trading day for the given expiration date (in case it falls on a holiday) expiryLastTradingDay = self.context.lastTradingDay(expiry) # Set the date/time threshold by which the position must be closed (on the last trading day before expiration) expiryMarketCloseCutoffDttm = None if self.base.marketCloseCutoffTime != None: expiryMarketCloseCutoffDttm = datetime.combine(expiryLastTradingDay, self.base.marketCloseCutoffTime) # Dictionary to map each contract symbol to the side (short/long) contractSide = {} # Dictionary to map each contract symbol to its description contractSideDesc = {} # Dictionary to map each contract symbol to the actual contract object contractDictionary = {} # Dictionaries to keep track of all the strikes, Delta and IV strikes = {} delta = {} gamma = {} vega = {} theta = {} rho = {} vomma = {} elasticity = {} IV = {} midPrices = {} contractExpiry = {} # Compute the Greeks for each contract (if not already available) if self.base.computeGreeks: self.bsm.setGreeks(contracts) # Compute the Mid-Price and Bid-Ask spread for the full order orderMidPrice = 0.0 bidAskSpread = 0.0 # Get the slippage parameter (if available) slippage = self.base.slippage or 0.0 # Get the maximum order quantity maxOrderQuantity = self.getMaxOrderQuantity() # Get the targetPremiumPct targetPremiumPct = self.base.targetPremiumPct # Check if we are using dynamic premium targeting if targetPremiumPct != None: # Make sure targetPremiumPct is bounded to the range [0, 1]) targetPremiumPct = max(0.0, min(1.0, targetPremiumPct)) # Compute the target premium as a percentage of the total net portfolio value targetPremium = context.Portfolio.TotalPortfolioValue * targetPremiumPct else: targetPremium = self.base.targetPremium # Check if we have a description for the contracts if sidesDesc == None: # Temporary dictionaries to lookup a description optionTypeDesc = {OptionRight.Put: "Put", OptionRight.Call: "Call"} optionSideDesc = {-1: "short", 1: "long"} # create a description for each contract: <long|short><Call|Put> sidesDesc = list(map(lambda contract, side: f"{optionSideDesc[np.sign(side)]}{optionTypeDesc[contract.Right]}", contracts, sides)) n = 0 for contract in contracts: # Contract Side: +n -> Long, -n -> Short orderSide = sides[n] # Contract description (<long|short><Call|Put>) orderSideDesc = sidesDesc[n] # Store it in the dictionary contractSide[contract.Symbol] = orderSide contractSideDesc[contract.Symbol] = orderSideDesc contractDictionary[contract.Symbol] = contract # Set the strike in the dictionary -> "<short|long><Call|Put>": <strike> strikes[f"{orderSideDesc}"] = contract.Strike # Add the contract expiration time and add 16 hours to the market close contractExpiry[f"{orderSideDesc}"] = contract.Expiry + timedelta(hours = 16) if hasattr(contract, "BSMGreeks"): # Set the Greeks and IV in the dictionary -> "<short|long><Call|Put>": <greek|IV> delta[f"{orderSideDesc}"] = contract.BSMGreeks.Delta gamma[f"{orderSideDesc}"] = contract.BSMGreeks.Gamma vega[f"{orderSideDesc}"] = contract.BSMGreeks.Vega theta[f"{orderSideDesc}"] = contract.BSMGreeks.Theta rho[f"{orderSideDesc}"] = contract.BSMGreeks.Rho vomma[f"{orderSideDesc}"] = contract.BSMGreeks.Vomma elasticity[f"{orderSideDesc}"] = contract.BSMGreeks.Elasticity IV[f"{orderSideDesc}"] = contract.BSMImpliedVolatility # Get the latest mid-price midPrice = self.contractUtils.midPrice(contract) # Store the midPrice in the dictionary -> "<short|long><Call|Put>": midPrice midPrices[f"{orderSideDesc}"] = midPrice # Compute the bid-ask spread bidAskSpread += self.contractUtils.bidAskSpread(contract) # Adjusted mid-price (include slippage). Take the sign of orderSide to determine the direction of the adjustment # adjustedMidPrice = midPrice + np.sign(orderSide) * slippage # Keep track of the total credit/debit or the order orderMidPrice -= orderSide * midPrice # Increment counter n += 1 limitOrderPrice = self.limitOrderPrice(sides=sides, orderMidPrice=orderMidPrice) # Round the prices to the nearest cent orderMidPrice = round(orderMidPrice, 2) limitOrderPrice = round(limitOrderPrice, 2) # Determine which price is used to compute the order quantity if self.base.useLimitOrders: # Use the Limit Order price qtyMidPrice = limitOrderPrice else: # Use the contract mid-price qtyMidPrice = orderMidPrice if targetPremium == None: # No target premium was provided. Use maxOrderQuantity orderQuantity = maxOrderQuantity else: # Make sure we are not exceeding the available portfolio margin targetPremium = min(context.Portfolio.MarginRemaining, targetPremium) # Determine the order quantity based on the target premium if abs(qtyMidPrice) <= 1e-5: orderQuantity = 1 else: orderQuantity = abs(targetPremium / (qtyMidPrice * 100)) # Different logic for Credit vs Debit strategies if sell: # Credit order # Sell at least one contract orderQuantity = max(1, round(orderQuantity)) else: # Debit order # Make sure the total price does not exceed the target premium orderQuantity = math.floor(orderQuantity) # Get the current price of the underlying security = context.Securities[self.base.underlyingSymbol] underlyingPrice = context.GetLastKnownPrice(security).Price # Compute MaxLoss maxLoss = self.computeOrderMaxLoss(contracts, sides) # Get the Profit Target percentage is specified (default is 50%) profitTargetPct = self.base.parameter("profitTarget", 0.5) # Compute T-Reg margin based on the MaxLoss TReg = min(0, orderMidPrice + maxLoss) * orderQuantity portfolioMarginStress = self.context.portfolioMarginStress if self.base.computeGreeks: # Compute the projected P&L of the position following a % movement of the underlying up or down portfolioMargin = min( 0, self.fValue(underlyingPrice * (1-portfolioMarginStress), contracts, sides=sides, atTime=context.Time, openPremium=midPrice), self.fValue(underlyingPrice * (1+portfolioMarginStress), contracts, sides=sides, atTime=context.Time, openPremium=midPrice) ) * orderQuantity order = { "strategyId": strategyId, "expiry": expiry, "orderMidPrice": orderMidPrice, "limitOrderPrice": limitOrderPrice, "bidAskSpread": bidAskSpread, "orderQuantity": orderQuantity, "maxOrderQuantity": maxOrderQuantity, "targetPremium": targetPremium, "strikes": strikes, "sides": sides, "sidesDesc": sidesDesc, "contractSide": contractSide, "contractSideDesc": contractSideDesc, "contracts": contracts, "contractExpiry": contractExpiry, "creditStrategy": sell, "maxLoss": maxLoss, "expiryLastTradingDay": expiryLastTradingDay, "expiryMarketCloseCutoffDttm": expiryMarketCloseCutoffDttm } # Create order details # order = {"expiry": expiry # , "expiryStr": expiry.strftime("%Y-%m-%d") # , "expiryLastTradingDay": expiryLastTradingDay # , "expiryMarketCloseCutoffDttm": expiryMarketCloseCutoffDttm # , "strategyId": strategyId # , "strategy": strategy # , "sides": sides # , "sidesDesc": sidesDesc # , "contractExpiry": contractExpiry # , "contractSide": contractSide # , "contractSideDesc": contractSideDesc # , "contractDictionary": contractDictionary # , "strikes": strikes # , "midPrices": midPrices # , "delta": delta # , "gamma": gamma # , "vega": vega # , "theta": theta # , "rho": rho # , "vomma": vomma # , "elasticity": elasticity # , "IV": IV # , "contracts": contracts # , "targetPremium": targetPremium # , "maxOrderQuantity": maxOrderQuantity # , "orderQuantity": orderQuantity # , "creditStrategy": sell # , "maxLoss": maxLoss # , "TReg": TReg # , "portfolioMargin": portfolioMargin # , "open": {"orders": [] # , "fills": 0 # , "filled": False # , "stalePrice": False # , "orderMidPrice": orderMidPrice # , "limitOrderPrice": limitOrderPrice # , "qtyMidPrice": qtyMidPrice # , "limitOrder": parameters["useLimitOrders"] # , "limitOrderExpiryDttm": context.Time + parameters["limitOrderExpiration"] # , "bidAskSpread": bidAskSpread # , "fillPrice": 0.0 # } # , "close": {"orders": [] # , "fills": 0 # , "filled": False # , "stalePrice": False # , "orderMidPrice": 0.0 # , "fillPrice": 0.0 # } # } # Determine the method used to calculate the profit target profitTargetMethod = self.base.parameter("profitTargetMethod", "Premium").lower() thetaProfitDays = self.base.parameter("thetaProfitDays", 0) # Set a custom profit target unless we are using the default Premium based methodology if profitTargetMethod != "premium": if profitTargetMethod == "theta" and thetaProfitDays > 0: # Calculate the P&L of the position at T+[thetaProfitDays] thetaPnL = self.fValue(underlyingPrice, contracts, sides=sides, atTime=context.Time + timedelta(days=thetaProfitDays), openPremium=midPrice) # Profit target is a percentage of the P&L calculated at T+[thetaProfitDays] profitTargetAmt = profitTargetPct * abs(thetaPnL) * orderQuantity elif profitTargetMethod == "treg": # Profit target is a percentage of the TReg requirement profitTargetAmt = profitTargetPct * abs(TReg) * orderQuantity elif profitTargetMethod == "margin": # Profit target is a percentage of the margin requirement profitTargetAmt = profitTargetPct * abs(portfolioMargin) * orderQuantity else: pass # Set the target profit for the position order["targetProfit"] = profitTargetAmt return order def getNakedOrder(self, contracts, type, strike = None, delta = None, fromPrice = None, toPrice = None, sell = True): if sell: # Short option contract sides = [-1] strategy = f"Short {type.title()}" else: # Long option contract sides = [1] strategy = f"Long {type.title()}" type = type.lower() if type == "put": # Get all Puts with a strike lower than the given strike and delta lower than the given delta sorted_contracts = self.strategyBuilder.getPuts(contracts, toDelta = delta, toStrike = strike, fromPrice = fromPrice, toPrice = toPrice) elif type == "call": # Get all Calls with a strike higher than the given strike and delta lower than the given delta sorted_contracts = self.strategyBuilder.getCalls(contracts, toDelta = delta, fromStrike = strike, fromPrice = fromPrice, toPrice = toPrice) else: self.logger.error(f"Input parameter type = {type} is invalid. Valid values: Put|Call.") return # Check if we got any contracts if len(sorted_contracts): # Create order details order = self.getOrderDetails([sorted_contracts[0]], sides, strategy, sell) # Return the order return order # Create order details for a Straddle order def getStraddleOrder(self, contracts, strike = None, netDelta = None, sell = True): if sell: # Short Straddle sides = [-1, -1] strategy = "Short Straddle" else: # Long Straddle sides = [1, 1] strategy = "Long Straddle" # Delta strike selection (in case the Iron Fly is not centered on the ATM strike) delta = None # Make sure the netDelta is less than 50 if netDelta != None and abs(netDelta) < 50: delta = 50 + netDelta if strike == None and delta == None: # Standard Straddle: get the ATM contracts legs = self.strategyBuilder.getATM(contracts) else: legs = [] # This is a Straddle centered at the given strike or Net Delta. # Get the Put at the requested delta or strike puts = self.strategyBuilder.getPuts(contracts, toDelta = delta, toStrike = strike) if(len(puts) > 0): put = puts[0] # Get the Call at the same strike as the Put calls = self.strategyBuilder.getCalls(contracts, fromStrike = put.Strike) if(len(calls) > 0): call = calls[0] # Collect both legs legs = [put, call] # Create order details order = self.getOrderDetails(legs, sides, strategy, sell) # Return the order return order # Create order details for a Strangle order def getStrangleOrder(self, contracts, callDelta = None, putDelta = None, callStrike = None, putStrike = None, sell = True): if sell: # Short Strangle sides = [-1, -1] strategy = "Short Strangle" else: # Long Strangle sides = [1, 1] strategy = "Long Strangle" # Get all Puts with a strike lower than the given putStrike and delta lower than the given putDelta puts = self.strategyBuilder.getPuts(contracts, toDelta = putDelta, toStrike = putStrike) # Get all Calls with a strike higher than the given callStrike and delta lower than the given callDelta calls = self.strategyBuilder.getCalls(contracts, toDelta = callDelta, fromStrike = callStrike) # Get the two contracts legs = [] if len(puts) > 0 and len(calls) > 0: legs = [puts[0], calls[0]] # Create order details order = self.getOrderDetails(legs, sides, strategy, sell) # Return the order return order def getSpreadOrder(self, contracts, type, strike = None, delta = None, wingSize = None, sell = True, fromPrice = None, toPrice = None, premiumOrder = "max"): if sell: # Credit Spread sides = [-1, 1] strategy = f"{type.title()} Credit Spread" else: # Debit Spread sides = [1, -1] strategy = f"{type.title()} Debit Spread" # Get the legs of the spread legs = self.strategyBuilder.getSpread(contracts, type, strike = strike, delta = delta, wingSize = wingSize, fromPrice = fromPrice, toPrice = toPrice, premiumOrder = premiumOrder) self.logger.debug(f"getSpreadOrder -> legs: {legs}") self.logger.debug(f"getSpreadOrder -> sides: {sides}") self.logger.debug(f"getSpreadOrder -> strategy: {strategy}") self.logger.debug(f"getSpreadOrder -> sell: {sell}") # Exit if we couldn't get both legs of the spread if len(legs) != 2: return # Create order details order = self.getOrderDetails(legs, sides, strategy, sell) # Return the order return order def getIronCondorOrder(self, contracts, callDelta = None, putDelta = None, callStrike = None, putStrike = None, callWingSize = None, putWingSize = None, sell = True): if sell: # Sell Iron Condor: [longPut, shortPut, shortCall, longCall] sides = [1, -1, -1, 1] strategy = "Iron Condor" else: # Buy Iron Condor: [shortPut, longPut, longCall, shortCall] sides = [-1, 1, 1, -1] strategy = "Reverse Iron Condor" # Get the Put spread puts = self.strategyBuilder.getSpread(contracts, "Put", strike = putStrike, delta = putDelta, wingSize = putWingSize, sortByStrike = True) # Get the Call spread calls = self.strategyBuilder.getSpread(contracts, "Call", strike = callStrike, delta = callDelta, wingSize = callWingSize) # Collect all legs legs = puts + calls # Exit if we couldn't get all legs of the Iron Condor if len(legs) != 4: return # Create order details order = self.getOrderDetails(legs, sides, strategy, sell) # Return the order return order def getIronFlyOrder(self, contracts, netDelta = None, strike = None, callWingSize = None, putWingSize = None, sell = True): if sell: # Sell Iron Fly: [longPut, shortPut, shortCall, longCall] sides = [1, -1, -1, 1] strategy = "Iron Fly" else: # Buy Iron Fly: [shortPut, longPut, longCall, shortCall] sides = [-1, 1, 1, -1] strategy = "Reverse Iron Fly" # Delta strike selection (in case the Iron Fly is not centered on the ATM strike) delta = None # Make sure the netDelta is less than 50 if netDelta != None and abs(netDelta) < 50: delta = 50 + netDelta if strike == None and delta == None: # Standard ATM Iron Fly strike = self.strategyBuilder.getATMStrike(contracts) # Get the Put spread puts = self.strategyBuilder.getSpread(contracts, "Put", strike = strike, delta = delta, wingSize = putWingSize, sortByStrike = True) # Get the Call spread with the same strike as the first leg of the Put spread calls = self.strategyBuilder.getSpread(contracts, "Call", strike = puts[-1].Strike, wingSize = callWingSize) # Collect all legs legs = puts + calls # Exit if we couldn't get all legs of the Iron Fly if len(legs) != 4: return # Create order details order = self.getOrderDetails(legs, sides, strategy, sell) # Return the order return order def getButterflyOrder(self, contracts, type, netDelta = None, strike = None, leftWingSize = None, rightWingSize = None, sell = False): # Make sure the wing sizes are set leftWingSize = leftWingSize or rightWingSize or 1 rightWingSize = rightWingSize or leftWingSize or 1 if sell: # Sell Butterfly: [short<Put|Call>, 2 long<Put|Call>, short<Put|Call>] sides = [-1, 2, -1] strategy = "Credit Butterfly" else: # Buy Butterfly: [long<Put|Call>, 2 short<Put|Call>, long<Put|Call>] sides = [1, -2, 1] strategy = "Debit Butterfly" # Create a custom description for each side to uniquely identify the wings: # Sell Butterfly: [leftShort<Put|Call>, 2 Long<Put|Call>, rightShort<Put|Call>] # Buy Butterfly: [leftLong<Put|Call>, 2 Short<Put|Call>, rightLong<Put|Call>] optionSides = {-1: "Short", 1: "Long"} sidesDesc = list(map(lambda side, prefix: f"{prefix}{optionSides[np.sign(side)]}{type.title()}", sides, ["left", "", "right"])) # Delta strike selection (in case the Butterfly is not centered on the ATM strike) delta = None # Make sure the netDelta is less than 50 if netDelta != None and abs(netDelta) < 50: if type.lower() == "put": # Use Put delta delta = 50 + netDelta else: # Use Call delta delta = 50 - netDelta if strike == None and delta == None: # Standard ATM Butterfly strike = self.strategyBuilder.getATMStrike(contracts) type = type.lower() if type == "put": # Get the Put spread (sorted by strike in ascending order) putSpread = self.strategyBuilder.getSpread(contracts, "Put", strike = strike, delta = delta, wingSize = leftWingSize, sortByStrike = True) # Exit if we couldn't get all legs of the Iron Fly if len(putSpread) != 2: return # Get the middle strike (second entry in the list) middleStrike = putSpread[1].Strike # Find the right wing of the Butterfly (add a small offset to the fromStrike in order to avoid selecting the middle strike as a wing) wings = self.strategyBuilder.getPuts(contracts, fromStrike = middleStrike + 0.1, toStrike = middleStrike + rightWingSize) # Exit if we could not find the wing if len(wings) == 0: return # Combine all the legs legs = putSpread + wings[0] elif type == "call": # Get the Call spread (sorted by strike in ascending order) callSpread = self.strategyBuilder.getSpread(contracts, "Call", strike = strike, delta = delta, wingSize = rightWingSize) # Exit if we couldn't get all legs of the Iron Fly if len(callSpread) != 2: return # Get the middle strike (first entry in the list) middleStrike = callSpread[0].Strike # Find the left wing of the Butterfly (add a small offset to the toStrike in order to avoid selecting the middle strike as a wing) wings = self.strategyBuilder.getCalls(contracts, fromStrike = middleStrike - leftWingSize, toStrike = middleStrike - 0.1) # Exit if we could not find the wing if len(wings) == 0: return # Combine all the legs legs = wings[0] + callSpread else: self.logger.error(f"Input parameter type = {type} is invalid. Valid values: Put|Call.") return # Exit if we couldn't get both legs of the spread if len(legs) != 3: return # Create order details order = self.getOrderDetails(legs, sides, strategy, sell = sell, sidesDesc = sidesDesc) # Return the order return order def getCustomOrder(self, contracts, types, deltas = None, sides = None, sidesDesc = None, strategy = "Custom", sell = None): # Make sure the Sides parameter has been specified if not sides: self.logger.error("Input parameter sides cannot be null. No order will be returned.") return # Make sure the Sides and Deltas parameters are of the same length if not deltas or len(deltas) != len(sides): self.logger.error(f"Input parameters deltas = {deltas} and sides = {sides} must have the same length. No order will be returned.") return # Convert types into a list if it is a string if isinstance(types, str): types = [types] * len(sides) # Make sure the Sides and Types parameters are of the same length if not types or len(types) != len(sides): self.logger.error(f"Input parameters types = {types} and sides = {sides} must have the same length. No order will be returned.") return legs = [] midPrice = 0 for side, type, delta in zip(sides, types, deltas): # Get all Puts with a strike lower than the given putStrike and delta lower than the given putDelta deltaContracts = self.strategyBuilder.getContracts(contracts, type = type, toDelta = delta, reverse = type.lower() == "put") # Exit if we could not find the contract if not deltaContracts: return # Append the contract to the list of legs legs = legs + [deltaContracts[0]] # Update the mid-price midPrice -= self.contractUtils.midPrice(deltaContracts[0]) * side # Automatically determine if this is a credit or debit strategy (unless specified) if sell is None: sell = midPrice > 0 # Create order details order = self.getOrderDetails(legs, sides, strategy, sell = sell, sidesDesc = sidesDesc) # Return the order return order
# region imports from AlgorithmImports import * # endregion from Tools import Logger, ContractUtils, BSM """ This is an Order builder class. It will get the proper contracts i need to create the order per parameters. """ class OrderBuilder: # \param[in] context is a reference to the QCAlgorithm instance. The following attributes are used from the context: # - slippage: (Optional) controls how the mid-price of an order is adjusted to include slippage. # - targetPremium: (Optional) used to determine how many contracts to buy/sell. # - maxOrderQuantity: (Optional) Caps the number of contracts that are bought/sold (Default: 1). # If targetPremium == None -> This is the number of contracts bought/sold. # If targetPremium != None -> The order is executed only if the number of contracts required # to reach the target credit/debit does not exceed the maxOrderQuantity def __init__(self, context): # Set the context (QCAlgorithm object) self.context = context # Initialize the BSM pricing model self.bsm = BSM(context) # Set the logger self.logger = Logger(context, className=type(self).__name__, logLevel=context.logLevel) # Initialize the contract utils self.contractUtils = ContractUtils(context) # Returns True/False based on whether the option contract is of the specified type (Call/Put) def optionTypeFilter(self, contract, type = None): if type is None: return True type = type.lower() if type == "put": return contract.Right == OptionRight.Put elif type == "call": return contract.Right == OptionRight.Call else: return True # Return the ATM contracts (Put/Call or both) def getATM(self, contracts, type = None): # Initialize result atm_contracts = [] # Sort the contracts based on how close they are to the current price of the underlying. # Filter them by the selected contract type (Put/Call or both) sorted_contracts = sorted([contract for contract in contracts if self.optionTypeFilter(contract, type) ] , key = lambda x: abs(x.Strike - self.contractUtils.getUnderlyingLastPrice(x)) , reverse = False ) # Check if any contracts were returned after the filtering if len(sorted_contracts) > 0: if type == None or type.lower() == "both": # Select the first two contracts (one Put and one Call) Ncontracts = min(len(sorted_contracts), 2) else: # Select the first contract (either Put or Call, based on the type specified) Ncontracts = 1 # Extract the selected contracts atm_contracts = sorted_contracts[0:Ncontracts] # Return result return atm_contracts def getATMStrike(self, contracts): ATMStrike = None # Get the ATM contracts atm_contracts = self.getATM(contracts) # Check if any contracts were found if len(atm_contracts) > 0: # Get the Strike of the first contract ATMStrike = atm_contracts[0].Strike # Return result return ATMStrike # Returns the Strike of the contract with the closest Delta # Assumptions: # - Input list contracts must be sorted by ascending strike # - All contracts in the list must be of the same type (Call|Put) def getDeltaContract(self, contracts, delta = None): # Skip processing if the option type or Delta has not been specified if delta == None or not contracts: return leftIdx = 0 rightIdx = len(contracts)-1 # Compute the Greeks for the contracts at the extremes self.bsm.setGreeks([contracts[leftIdx], contracts[rightIdx]]) # ####################################################### # Check if the requested Delta is outside of the range # ####################################################### if contracts[rightIdx].Right == OptionRight.Call: # Check if the furthest OTM Call has a Delta higher than the requested Delta if abs(contracts[rightIdx].BSMGreeks.Delta) > delta/100.0: # The requested delta is outside the boundary, return the strike of the furthest OTM Call return contracts[rightIdx] # Check if the furthest ITM Call has a Delta lower than the requested Delta elif abs(contracts[leftIdx].BSMGreeks.Delta) < delta/100.0: # The requested delta is outside the boundary, return the strike of the furthest ITM Call return contracts[leftIdx] else: # Check if the furthest OTM Put has a Delta higher than the requested Delta if abs(contracts[leftIdx].BSMGreeks.Delta) > delta/100.0: # The requested delta is outside the boundary, return the strike of the furthest OTM Put return contracts[leftIdx] # Check if the furthest ITM Put has a Delta lower than the requested Delta elif abs(contracts[rightIdx].BSMGreeks.Delta) < delta/100.0: # The requested delta is outside the boundary, return the strike of the furthest ITM Put return contracts[rightIdx] # The requested Delta is inside the range, use the Bisection method to find the contract with the closest Delta while (rightIdx-leftIdx) > 1: # Get the middle point middleIdx = round((leftIdx + rightIdx)/2.0) middleContract = contracts[middleIdx] # Compute the greeks for the contract in the middle self.bsm.setGreeks(middleContract) contractDelta = contracts[middleIdx].BSMGreeks.Delta # Determine which side we need to continue the search if(abs(contractDelta) > delta/100.0): if middleContract.Right == OptionRight.Call: # The requested Call Delta is on the right side leftIdx = middleIdx else: # The requested Put Delta is on the left side rightIdx = middleIdx else: if middleContract.Right == OptionRight.Call: # The requested Call Delta is on the left side rightIdx = middleIdx else: # The requested Put Delta is on the right side leftIdx = middleIdx # At this point where should only be two contracts remaining: choose the contract with the closest Delta deltaContract = sorted([contracts[leftIdx], contracts[rightIdx]] , key = lambda x: abs(abs(x.BSMGreeks.Delta) - delta/100.0) , reverse = False )[0] return deltaContract def getDeltaStrike(self, contracts, delta = None): deltaStrike = None # Get the contract with the closest Delta deltaContract = self.getDeltaContract(contracts, delta = delta) # Check if we got any contract if deltaContract != None: # Get the strike deltaStrike = deltaContract.Strike # Return the strike return deltaStrike def getFromDeltaStrike(self, contracts, delta = None, default = None): fromDeltaStrike = default # Get the call with the closest Delta deltaContract = self.getDeltaContract(contracts, delta = delta) # Check if we found the contract if deltaContract: if abs(deltaContract.BSMGreeks.Delta) >= delta/100.0: # The contract is in the required range. Get the Strike fromDeltaStrike = deltaContract.Strike else: # Calculate the offset: +0.01 in case of Puts, -0.01 in case of Calls offset = 0.01 * (2*int(deltaContract.Right == OptionRight.Put)-1) # The contract is outside of the required range. Get the Strike and add (Put) or subtract (Call) a small offset so we can filter for contracts above/below this strike fromDeltaStrike = deltaContract.Strike + offset return fromDeltaStrike def getToDeltaStrike(self, contracts, delta = None, default = None): toDeltaStrike = default # Get the put with the closest Delta deltaContract = self.getDeltaContract(contracts, delta = delta) # Check if we found the contract if deltaContract: if abs(deltaContract.BSMGreeks.Delta) <= delta/100.0: # The contract is in the required range. Get the Strike toDeltaStrike = deltaContract.Strike else: # Calculate the offset: +0.01 in case of Calls, -0.01 in case of Puts offset = 0.01 * (2*int(deltaContract.Right == OptionRight.Call)-1) # The contract is outside of the required range. Get the Strike and add (Call) or subtract (Put) a small offset so we can filter for contracts above/below this strike toDeltaStrike = deltaContract.Strike + offset return toDeltaStrike def getPutFromDeltaStrike(self, contracts, delta = None): return self.getFromDeltaStrike(contracts, delta = delta, default = 0.0) def getCallFromDeltaStrike(self, contracts, delta = None): return self.getFromDeltaStrike(contracts, delta = delta, default = float('Inf')) def getPutToDeltaStrike(self, contracts, delta = None): return self.getToDeltaStrike(contracts, delta = delta, default = float('Inf')) def getCallToDeltaStrike(self, contracts, delta = None): return self.getToDeltaStrike(contracts, delta = delta, default = 0) def getContracts(self, contracts, type = None, fromDelta = None, toDelta = None, fromStrike = None, toStrike = None, fromPrice = None, toPrice = None, reverse = False): # Make sure all constraints are set fromStrike = fromStrike or 0 fromPrice = fromPrice or 0 toStrike = toStrike or float('inf') toPrice = toPrice or float('inf') # Get the Put contracts, sorted by ascending strike. Apply the Strike/Price constraints puts = [] if type == None or type.lower() == "put": puts = sorted([contract for contract in contracts if self.optionTypeFilter(contract, "Put") # Strike constraint and (fromStrike <= contract.Strike <= toStrike) # The option contract is tradable and self.contractUtils.getSecurity(contract).IsTradable # Option price constraint (based on the mid-price) and (fromPrice <= self.contractUtils.midPrice(contract) <= toPrice) ] , key = lambda x: x.Strike , reverse = False ) # Get the Call contracts, sorted by ascending strike. Apply the Strike/Price constraints calls = [] if type == None or type.lower() == "call": calls = sorted([contract for contract in contracts if self.optionTypeFilter(contract, "Call") # Strike constraint and (fromStrike <= contract.Strike <= toStrike) # The option contract is tradable and self.contractUtils.getSecurity(contract).IsTradable # Option price constraint (based on the mid-price) and (fromPrice <= self.contractUtils.midPrice(contract) <= toPrice) ] , key = lambda x: x.Strike , reverse = False ) deltaFilteredPuts = puts deltaFilteredCalls = calls # Check if we need to filter by Delta if (fromDelta or toDelta): # Find the strike range for the Puts based on the From/To Delta putFromDeltaStrike = self.getPutFromDeltaStrike(puts, delta = fromDelta) putToDeltaStrike = self.getPutToDeltaStrike(puts, delta = toDelta) # Filter the Puts based on the delta-strike range deltaFilteredPuts = [contract for contract in puts if putFromDeltaStrike <= contract.Strike <= putToDeltaStrike ] # Find the strike range for the Calls based on the From/To Delta callFromDeltaStrike = self.getCallFromDeltaStrike(calls, delta = fromDelta) callToDeltaStrike = self.getCallToDeltaStrike(calls, delta = toDelta) # Filter the Puts based on the delta-strike range. For the calls, the Delta decreases with increasing strike, so the order of the filter is inverted deltaFilteredCalls = [contract for contract in calls if callToDeltaStrike <= contract.Strike <= callFromDeltaStrike ] # Combine the lists and Sort the contracts by their strike in the specified order. result = sorted(deltaFilteredPuts + deltaFilteredCalls , key = lambda x: x.Strike , reverse = reverse ) # Return result return result def getPuts(self, contracts, fromDelta = None, toDelta = None, fromStrike = None, toStrike = None, fromPrice = None, toPrice = None): # Sort the Put contracts by their strike in reverse order. Filter them by the specified criteria (Delta/Strike/Price constrains) return self.getContracts(contracts , type = "Put" , fromDelta = fromDelta , toDelta = toDelta , fromStrike = fromStrike , toStrike = toStrike , fromPrice = fromPrice , toPrice = toPrice , reverse = True ) def getCalls(self, contracts, fromDelta = None, toDelta = None, fromStrike = None, toStrike = None, fromPrice = None, toPrice = None): # Sort the Call contracts by their strike in ascending order. Filter them by the specified criteria (Delta/Strike/Price constrains) return self.getContracts(contracts , type = "Call" , fromDelta = fromDelta , toDelta = toDelta , fromStrike = fromStrike , toStrike = toStrike , fromPrice = fromPrice , toPrice = toPrice , reverse = False ) # Get the wing contract at the requested distance # Assumptions: # - The input contracts are sorted by increasing distance from the ATM (ascending order for Calls, descending order for Puts) # - The first contract in the list is assumed to be one of the legs of the spread, and it is used to determine the distance for the wing def getWing(self, contracts, wingSize = None): # Make sure the wingSize is specified wingSize = wingSize or 0 # Initialize output wingContract = None if len(contracts) > 1 and wingSize > 0: # Get the short strike firstLegStrike = contracts[0].Strike # keep track of the wing size based on the long contract being selected currentWings = 0 # Loop through all contracts for contract in contracts[1:]: # Select the long contract as long as it is within the specified wing size if abs(contract.Strike - firstLegStrike) <= wingSize: currentWings = abs(contract.Strike - firstLegStrike) wingContract = contract else: # We have exceeded the wing size, check if the distance to the requested wing size is closer than the contract previously selected if (abs(contract.Strike - firstLegStrike) - wingSize < wingSize - currentWings): wingContract = contract break ### Loop through all contracts ### if wingSize > 0 return wingContract # Get Spread contracts (Put or Call) def getSpread(self, contracts, type, strike = None, delta = None, wingSize = None, sortByStrike = False, fromPrice = None, toPrice = None, premiumOrder = 'max'): # Type is a required parameter if type == None: self.logger.error(f"Input parameter type = {type} is invalid. Valid values: 'Put'|'Call'") return type = type.lower() if type == "put": # Get all Puts with a strike lower than the given strike and delta lower than the given delta sorted_contracts = self.getPuts(contracts, toDelta = delta, toStrike = strike) elif type == "call": # Get all Calls with a strike higher than the given strike and delta lower than the given delta sorted_contracts = self.getCalls(contracts, toDelta = delta, fromStrike = strike) else: self.logger.error(f"Input parameter type = {type} is invalid. Valid values: 'Put'|'Call'") return # Initialize the result and the best premium best_spread = [] best_premium = -float('inf') if premiumOrder == 'max' else float('inf') self.logger.debug(f"wingSize: {wingSize}, premiumOrder: {premiumOrder}, fromPrice: {fromPrice}, toPrice: {toPrice}, sortByStrike: {sortByStrike}, strike: {strike}") if strike is not None: wing = self.getWing(sorted_contracts, wingSize = wingSize) self.logger.debug(f"STRIKE: wing: {wing}") # Check if we have any contracts if(len(sorted_contracts) > 0): # Add the first leg best_spread.append(sorted_contracts[0]) if wing != None: # Add the wing best_spread.append(wing) else: # Iterate over sorted contracts for i in range(len(sorted_contracts) - 1): # Get the wing wing = self.getWing(sorted_contracts[i:], wingSize = wingSize) self.logger.debug(f"NO STRIKE: wing: {wing}") if wing is not None: # Calculate the net premium net_premium = abs(self.contractUtils.midPrice(sorted_contracts[i]) - self.contractUtils.midPrice(wing)) self.logger.debug(f"fromPrice: {fromPrice} <= net_premium: {net_premium} <= toPrice: {toPrice}") # Check if the net premium is within the specified price range if fromPrice <= net_premium <= toPrice: # Check if this spread has a better premium if (premiumOrder == 'max' and net_premium > best_premium) or (premiumOrder == 'min' and net_premium < best_premium): best_spread = [sorted_contracts[i], wing] best_premium = net_premium # By default, the legs of a spread are sorted based on their distance from the ATM strike. # - For Call spreads, they are already sorted by increasing strike # - For Put spreads, they are sorted by decreasing strike # In some cases it might be more convenient to return the legs ordered by their strike (i.e. in case of Iron Condors/Flys) if sortByStrike: best_spread = sorted(best_spread, key = lambda x: x.Strike, reverse = False) return best_spread # Get Put Spread contracts def getPutSpread(self, contracts, strike = None, delta = None, wingSize = None, sortByStrike = False): return self.getSpread(contracts, "Put", strike = strike, delta = delta, wingSize = wingSize, sortByStrike = sortByStrike) # Get Put Spread contracts def getCallSpread(self, contracts, strike = None, delta = None, wingSize = None, sortByStrike = True): return self.getSpread(contracts, "Call", strike = strike, delta = delta, wingSize = wingSize, sortByStrike = sortByStrike)
#region imports from AlgorithmImports import * #endregion from Tools import BSM, Logger class Scanner: def __init__(self, context, base): self.context = context self.base = base # Initialize the BSM pricing model self.bsm = BSM(context) # Dictionary to keep track of all the available expiration dates at any given date self.expiryList = {} # Set the logger self.logger = Logger(context, className = type(self).__name__, logLevel = context.logLevel) def Call(self, data) -> [Dict, str]: # Start the timer self.context.executionTimer.start('Alpha.Utils.Scanner -> Call') self.logger.trace(f'{self.base.name} -> Call -> start') if self.isMarketClosed(): self.logger.trace(" -> Market is closed.") return None, None self.logger.debug(f'Market not closed') if not self.isWithinScheduledTimeWindow(): self.logger.trace(" -> Not within scheduled time window.") return None, None self.logger.debug(f'Within scheduled time window') if self.hasReachedMaxActivePositions(): self.logger.trace(" -> Already reached max active positions.") return None, None self.logger.trace(f'Not max active positions') # Get the option chain chain = self.base.dataHandler.getOptionContracts(data) self.logger.trace(f'Number of contracts in chain: {len(chain) if chain else 0}') # Exit if we got no chains if chain is None: self.logger.debug(" -> No chains inside currentSlice!") return None, None self.logger.trace('We have chains inside currentSlice') self.syncExpiryList(chain) self.logger.debug(f'Expiry List: {self.expiryList}') # Exit if we haven't found any Expiration cycles to process if not self.expiryList: self.logger.trace(" -> No expirylist.") return None, None self.logger.debug(f'We have expirylist {self.expiryList}') # Run the strategy filteredChain, lastClosedOrderTag = self.Filter(chain) self.logger.trace(f'Filtered Chain Count: {len(filteredChain) if filteredChain else 0}') self.logger.debug(f'Last Closed Order Tag: {lastClosedOrderTag}') # Stop the timer self.context.executionTimer.stop('Alpha.Utils.Scanner -> Call') return filteredChain, lastClosedOrderTag # Filter the contracts to buy and sell based on the defined AlphaModel/Strategy def Filter(self, chain): # Start the timer self.context.executionTimer.start("Alpha.Utils.Scanner -> Filter") # Get the context context = self.context self.logger.debug(f'Context: {context}') # DTE range dte = self.base.dte dteWindow = self.base.dteWindow # Controls whether to select the furthest or the earliest expiry date useFurthestExpiry = self.base.useFurthestExpiry # Controls whether to enable dynamic selection of the expiry date dynamicDTESelection = self.base.dynamicDTESelection # Controls whether to allow multiple entries for the same expiry date allowMultipleEntriesPerExpiry = self.base.allowMultipleEntriesPerExpiry self.logger.debug(f'Allow Multiple Entries Per Expiry: {allowMultipleEntriesPerExpiry}') # Set the DTE range (make sure values are not negative) minDte = max(0, dte - dteWindow) maxDte = max(0, dte) self.logger.debug(f'Min DTE: {minDte}') self.logger.debug(f'Max DTE: {maxDte}') # Get the minimum time distance between consecutive trades minimumTradeScheduleDistance = self.base.parameter("minimumTradeScheduleDistance", timedelta(hours=0)) # Make sure the minimum required amount of time has passed since the last trade was opened if (self.context.lastOpenedDttm is not None and context.Time < (self.context.lastOpenedDttm + minimumTradeScheduleDistance)): return None, None self.logger.debug(f'Min Trade Schedule Distance: {minimumTradeScheduleDistance}') # Check if the expiryList was specified as an input if self.expiryList is None: # List of expiry dates, sorted in reverse order self.expiryList = sorted(set([ contract.Expiry for contract in chain if minDte <= (contract.Expiry.date() - context.Time.date()).days <= maxDte ]), reverse=True) self.logger.debug(f'Expiry List: {self.expiryList}') # Log the list of expiration dates found in the chain self.logger.debug(f"Expiration dates in the chain: {len(self.expiryList)}") for expiry in self.expiryList: self.logger.debug(f" -> {expiry}") self.logger.debug(f'Expiry List: {self.expiryList}') # Exit if we haven't found any Expiration cycles to process if not self.expiryList: # Stop the timer self.context.executionTimer.stop() return None, None self.logger.debug('No expirylist') # Get the DTE of the last closed position lastClosedDte = None lastClosedOrderTag = None if self.context.recentlyClosedDTE: while (self.context.recentlyClosedDTE): # Pop the oldest entry in the list (FIFO) lastClosedTradeInfo = self.context.recentlyClosedDTE.pop(0) if lastClosedTradeInfo["closeDte"] >= minDte: lastClosedDte = lastClosedTradeInfo["closeDte"] lastClosedOrderTag = lastClosedTradeInfo["orderTag"] # We got a good entry, get out of the loop break self.logger.debug(f'Last Closed DTE: {lastClosedDte}') self.logger.debug(f'Last Closed Order Tag: {lastClosedOrderTag}') # Check if we need to do dynamic DTE selection if dynamicDTESelection and lastClosedDte is not None: # Get the expiration with the nearest DTE as that of the last closed position expiry = sorted(self.expiryList, key=lambda expiry: abs((expiry.date( ) - context.Time.date()).days - lastClosedDte), reverse=False)[0] else: # Determine the index used to select the expiry date: # useFurthestExpiry = True -> expiryListIndex = 0 (takes the first entry -> furthest expiry date since the expiry list is sorted in reverse order) # useFurthestExpiry = False -> expiryListIndex = -1 (takes the last entry -> earliest expiry date since the expiry list is sorted in reverse order) expiryListIndex = int(useFurthestExpiry) - 1 # Get the expiry date expiry = list(self.expiryList.get(self.context.Time.date()))[expiryListIndex] # expiry = list(self.expiryList.keys())[expiryListIndex] self.logger.debug(f'Expiry: {expiry}') # Convert the date to a string expiryStr = expiry.strftime("%Y-%m-%d") filteredChain = None openPositionsExpiries = [self.context.allPositions[orderId].expiryStr for orderId in self.context.openPositions.values()] # Proceed if we have not already opened a position on the given expiration (unless we are allowed to open multiple positions on the same expiry date) if (allowMultipleEntriesPerExpiry or expiryStr not in openPositionsExpiries): # Filter the contracts in the chain, keep only the ones expiring on the given date filteredChain = self.filterByExpiry(chain, expiry=expiry) self.logger.debug(f'Number of items in Filtered Chain: {len(filteredChain) if filteredChain else 0}') # Stop the timer self.context.executionTimer.stop("Alpha.Utils.Scanner -> Filter") return filteredChain, lastClosedOrderTag def isMarketClosed(self) -> bool: # Exit if the algorithm is warming up or the market is closed return self.context.IsWarmingUp or not self.context.IsMarketOpen(self.base.underlyingSymbol) def isWithinScheduledTimeWindow(self) -> bool: # Compute the schedule start datetime scheduleStartDttm = datetime.combine(self.context.Time.date(), self.base.scheduleStartTime) self.logger.debug(f'Schedule Start Datetime: {scheduleStartDttm}') # Exit if we have not reached the schedule start datetime if self.context.Time < scheduleStartDttm: self.logger.debug('Current time is before the schedule start datetime') return False # Check if we have a schedule stop datetime if self.base.scheduleStopTime is not None: # Compute the schedule stop datetime scheduleStopDttm = datetime.combine(self.context.Time.date(), self.base.scheduleStopTime) self.logger.debug(f'Schedule Stop Datetime: {scheduleStopDttm}') # Exit if we have exceeded the stop datetime if self.context.Time > scheduleStopDttm: self.logger.debug('Current time is after the schedule stop datetime') return False minutesSinceScheduleStart = round((self.context.Time - scheduleStartDttm).seconds / 60) self.logger.debug(f'Minutes Since Schedule Start: {minutesSinceScheduleStart}') scheduleFrequencyMinutes = round(self.base.scheduleFrequency.seconds / 60) self.logger.debug(f'Schedule Frequency Minutes: {scheduleFrequencyMinutes}') # Exit if we are not at the right scheduled interval isWithinWindow = minutesSinceScheduleStart % scheduleFrequencyMinutes == 0 self.logger.debug(f'Is Within Scheduled Time Window: {isWithinWindow}') return isWithinWindow def hasReachedMaxActivePositions(self) -> bool: # Filter openPositions and workingOrders by strategyTag openPositionsByStrategy = {tag: pos for tag, pos in self.context.openPositions.items() if self.context.allPositions[pos].strategyTag == self.base.nameTag} workingOrdersByStrategy = {tag: order for tag, order in self.context.workingOrders.items() if order.strategyTag == self.base.nameTag} # Do not open any new positions if we have reached the maximum for this strategy return (len(openPositionsByStrategy) + len(workingOrdersByStrategy)) >= self.base.maxActivePositions def syncExpiryList(self, chain): # The list of expiry dates will change once a day (at most). See if we have already processed this list for the current date if self.context.Time.date() in self.expiryList: # Get the expiryList from the dictionary expiry = self.expiryList.get(self.context.Time.date()) else: # Start the timer self.context.executionTimer.start("Alpha.Utils.Scanner -> syncExpiryList") # Set the DTE range (make sure values are not negative) minDte = max(0, self.base.dte - self.base.dteWindow) maxDte = max(0, self.base.dte) # Get the list of expiry dates, sorted in reverse order expiry = sorted( set( [contract.Expiry for contract in chain if minDte <= (contract.Expiry.date() - self.context.Time.date()).days <= maxDte] ), reverse=True ) # Only add the list to the dictionary if we found at least one expiry date if expiry: # Add the list to the dictionary self.expiryList[self.context.Time.date()] = expiry else: self.logger.debug(f"No expiry dates found in the chain! {self.context.Time.strftime('%Y-%m-%d %H:%M')}')}}") # Stop the timer self.context.executionTimer.stop("Alpha.Utils.Scanner -> syncExpiryList") def filterByExpiry(self, chain, expiry=None, computeGreeks=False): # Start the timer self.context.executionTimer.start("Alpha.Utils.Scanner -> filterByExpiry") # Check if the expiry date has been specified if expiry is not None: # Filter contracts based on the requested expiry date filteredChain = [ contract for contract in chain if contract.Expiry.date() == expiry.date() ] else: # No filtering filteredChain = chain # Check if we need to compute the Greeks for every single contract (this is expensive!) # By default, the Greeks are only calculated while searching for the strike with the # requested delta, so there should be no need to set computeGreeks = True if computeGreeks: self.bsm.setGreeks(filteredChain) # Stop the timer self.context.executionTimer.stop("Alpha.Utils.Scanner -> filterByExpiry") # Return the filtered contracts return filteredChain
#region imports from AlgorithmImports import * #endregion class Stats: def __init__(self): self._stats = {} def __setattr__(self, key, value): if key == '_stats': super().__setattr__(key, value) else: self._stats[key] = value def __getattr__(self, key): return self._stats.get(key, None) def __delattr__(self, key): if key in self._stats: del self._stats[key] else: raise AttributeError(f"No such attribute: {key}")
#region imports from AlgorithmImports import * #endregion # Your New Python File from .Scanner import Scanner from .Order import Order from .OrderBuilder import OrderBuilder from .Stats import Stats
#region imports from AlgorithmImports import * #endregion # Your New Python File from .FPLModel import FPLModel from .SPXic import SPXic from .CCModel import CCModel from .SPXButterfly import SPXButterfly from .SPXCondor import SPXCondor from .IBS import IBS
#region imports from AlgorithmImports import * from collections import deque from scipy import stats from numpy import mean, array #endregion # Indicator from https://www.satyland.com/atrlevels by Saty # Use like this: # # self.spy = self.AddEquity("SPY", Resolution.Daily).Symbol # self.ATRLevels = ATRLevels("ATRLevels", length = 14) # algorithm.RegisterIndicator(self.ticker, self.ATRLevels, Resolution.Daily) # self.algorithm.WarmUpIndicator(self.ticker, self.ATRLevels, Resolution.Daily) # // Set the appropriate timeframe based on trading mode # timeframe_func() => # timeframe = "D" # if trading_type == day_trading # timeframe := "D" # else if trading_type == multiday_trading # timeframe := "W" # else if trading_type == swing_trading # timeframe := "M" # else if trading_type == position_trading # timeframe := "3M" # else # timeframe := "D" class ATRLevels(PythonIndicator): TriggerPercentage = 0.236 MiddlePercentage = 0.618 def __init__(self, name, length = 14): # default indicator definition super().__init__() self.Name = name self.Value = 0 self.Time = datetime.min # set automatic warmup period + 1 day self.WarmUpPeriod = length + 1 self.length = length self.ATR = AverageTrueRange(self.length) # Holds 2 values the current close and the previous day/period close. self.PreviousCloseQueue = deque(maxlen=2) # Indicator to hold the period close, high, low, open self.PeriodHigh = Identity('PeriodHigh') self.PeriodLow = Identity('PeriodLow') self.PeriodOpen = Identity('PeriodOpen') @property def IsReady(self) -> bool: return self.ATR.IsReady def Update(self, input) -> bool: # update all the indicators with the new data dataPoint = IndicatorDataPoint(input.Symbol, input.EndTime, input.Close) bar = TradeBar(input.Time, input.Symbol, input.Open, input.High, input.Low, input.Close, input.Volume) ## Update SMA with data time and volume # symbolSMAv.Update(tuple.Index, tuple.volume) # symbolRSI.Update(tuple.Index, tuple.close) # symbolADX.Update(bar) # symbolATR.Update(bar) # symbolSMA.Update(tuple.Index, tuple.close) self.ATR.Update(bar) self.PreviousCloseQueue.appendleft(dataPoint) self.PeriodHigh.Update(input.Time, input.High) self.PeriodLow.Update(input.Time, input.Low) self.PeriodOpen.Update(input.Time, input.Open) if self.ATR.IsReady and len(self.PreviousCloseQueue) == 2: self.Time = input.Time self.Value = self.PreviousClose().Value return self.IsReady # Returns the previous close value of the period. # @return [Float] def PreviousClose(self): if len(self.PreviousCloseQueue) == 1: return None return self.PreviousCloseQueue[0] # Bear level method. This is represented usually as a yellow line right under the close line. # @return [Float] def LowerTrigger(self): return self.PreviousClose().Value - (self.TriggerPercentage * self.ATR.Current.Value) # biggest value 1ATR # Lower Midrange level. This is under the lowerTrigger (yellow line) and above the -1ATR line(lowerATR) # @return [Float] def LowerMiddle(self): return self.PreviousClose().Value - (self.MiddlePercentage * self.ATR.Current.Value) # Lower -1ATR level. # @return [Float] def LowerATR(self): return self.PreviousClose().Value - self.ATR.Current.Value # Lower Extension level. # @return [Float] def LowerExtension(self): return self.LowerATR() - (self.TriggerPercentage * self.ATR.Current.Value) # Lower Midrange Extension level. # @return [Float] def LowerMiddleExtension(self): return self.LowerATR() - (self.MiddlePercentage * self.ATR.Current.Value) # Lower -2ATR level. # @return [Float] def Lower2ATR(self): return self.LowerATR() - self.ATR.Current.Value # Lower -2ATR Extension level. # @return [Float] def Lower2ATRExtension(self): return self.Lower2ATR() - (self.TriggerPercentage * self.ATR.Current.Value) # Lower -2ATR Midrange Extension level. # @return [Float] def Lower2ATRMiddleExtension(self): return self.Lower2ATR() - (self.MiddlePercentage * self.ATR.Current.Value) # Lower -3ATR level. # @return [Float] def Lower3ATR(self): return self.Lower2ATR() - self.ATR.Current.Value def BearLevels(self): return [ self.LowerTrigger(), self.LowerMiddle(), self.LowerATR(), self.LowerExtension(), self.LowerMiddleExtension(), self.Lower2ATR(), self.Lower2ATRExtension(), self.Lower2ATRMiddleExtension(), self.Lower3ATR() ] # Bull level method. This is represented usually as a blue line right over the close line. # @return [Float] def UpperTrigger(self): return self.PreviousClose().Value + (self.TriggerPercentage * self.ATR.Current.Value) # biggest value 1ATR # Upper Midrange level. # @return [Float] def UpperMiddle(self): return self.PreviousClose().Value + (self.MiddlePercentage * self.ATR.Current.Value) # Upper 1ATR level. # @return [Float] def UpperATR(self): return self.PreviousClose().Value + self.ATR.Current.Value # Upper Extension level. # @return [Float] def UpperExtension(self): return self.UpperATR() + (self.TriggerPercentage * self.ATR.Current.Value) # Upper Midrange Extension level. # @return [Float] def UpperMiddleExtension(self): return self.UpperATR() + (self.MiddlePercentage * self.ATR.Current.Value) # Upper 2ATR level. def Upper2ATR(self): return self.UpperATR() + self.ATR.Current.Value # Upper 2ATR Extension level. # @return [Float] def Upper2ATRExtension(self): return self.Upper2ATR() + (self.TriggerPercentage * self.ATR.Current.Value) # Upper 2ATR Midrange Extension level. # @return [Float] def Upper2ATRMiddleExtension(self): return self.Upper2ATR() + (self.MiddlePercentage * self.ATR.Current.Value) # Upper 3ATR level. # @return [Float] def Upper3ATR(self): return self.Upper2ATR() + self.ATR.Current.Value def BullLevels(self): return [ self.UpperTrigger(), self.UpperMiddle(), self.UpperATR(), self.UpperExtension(), self.UpperMiddleExtension(), self.Upper2ATR(), self.Upper2ATRExtension(), self.Upper2ATRMiddleExtension(), self.Upper3ATR() ] def NextLevel(self, LevelNumber, bull = False, bear = False): dayOpen = self.PreviousClose().Value allLevels = [dayOpen] + self.BearLevels() + self.BullLevels() allLevels = sorted(allLevels, key = lambda x: x, reverse = False) bearLs = sorted(filter(lambda x: x <= dayOpen, allLevels), reverse = True) bullLs = list(filter(lambda x: x >= dayOpen, allLevels)) if bull: return bullLs[LevelNumber] if bear: return bearLs[LevelNumber] return None def Range(self): return self.PeriodHigh.Current.Value - self.PeriodLow.Current.Value def PercentOfAtr(self): return (self.Range() / self.ATR.Current.Value) * 100 def Warmup(self, history): for index, row in history.iterrows(): self.Update(row) # Method to return a string with the bull and bear levels. # @return [String] def ToString(self): return "Bull Levels: [{}]; Bear Levels: [{}]".format(self.BullLevels(), self.BearLevels())
#region imports from AlgorithmImports import * from .ATRLevels import ATRLevels #endregion # Your New Python File
#region imports from AlgorithmImports import * #endregion import math from datetime import datetime, timedelta """ The GoogleSheetsData class reads data from the Google Sheets CSV link directly during live mode. In backtesting mode, you can use a static CSV file saved in the local directory with the same format as the Google Sheets file. The format should be like this: datetime,type,put_strike,call_strike,minimum_premium 2023-12-23 14:00:00,Iron Condor,300,350,0.50 2023-12-24 14:00:00,Bear Call Spread,0,360,0.60 2023-12-25 14:00:00,Bull Put Spread,310,0,0.70 Replace the google_sheet_csv_link variable in the GetSource method with your actual Google Sheets CSV link. Example for alpha model: class MyAlphaModel(AlphaModel): def Update(self, algorithm, data): if not data.ContainsKey('SPY_TradeInstructions'): return [] trade_instructions = data['SPY_TradeInstructions'] if trade_instructions is None: return [] # Check if the current time is past the instructed time if algorithm.Time < trade_instructions.Time: return [] # Use the trade_instructions data to generate insights type = trade_instructions.Type call_strike = trade_instructions.CallStrike put_strike = trade_instructions.PutStrike minimum_premium = trade_instructions.MinimumPremium insights = [] if type == "Iron Condor": insights.extend(self.GenerateIronCondorInsights(algorithm, call_strike, put_strike, minimum_premium)) elif type == "Bear Call Spread": insights.extend(self.GenerateBearCallSpreadInsights(algorithm, call_strike, minimum_premium)) elif type == "Bull Put Spread": insights.extend(self.GenerateBullPutSpreadInsights(algorithm, put_strike, minimum_premium)) return insights """ class GoogleSheetsData(PythonData): def GetSource(self, config, date, isLiveMode): google_sheet_csv_link = 'https://docs.google.com/spreadsheets/d/e/2PACX-1vS9oNUoYqY-u0WnLuJRCb8pSuQKcLStK8RaTfs5Cm9j6iiYNpx82iJuAc3D32zytXA4EiosfxjWKyJp/pub?gid=509927026&single=true&output=csv' if isLiveMode: return SubscriptionDataSource(google_sheet_csv_link, SubscriptionTransportMedium.Streaming) else: return SubscriptionDataSource(google_sheet_csv_link, SubscriptionTransportMedium.RemoteFile) # if isLiveMode: # # Replace the link below with your Google Sheets CSV link # google_sheet_csv_link = 'https://docs.google.com/spreadsheets/d/e/2PACX-1vS9oNUoYqY-u0WnLuJRCb8pSuQKcLStK8RaTfs5Cm9j6iiYNpx82iJuAc3D32zytXA4EiosfxjWKyJp/pub?gid=509927026&single=true&output=csv' # return SubscriptionDataSource(google_sheet_csv_link, SubscriptionTransportMedium.RemoteFile) # # In backtesting, you can use a static CSV file saved in the local directory # return SubscriptionDataSource("trade_instructions.csv", SubscriptionTransportMedium.LocalFile) def Reader(self, config, line, date, isLiveMode): if not line.strip(): return None columns = line.split(',') if columns[0] == 'datetime': return None trade = GoogleSheetsData() trade.Symbol = config.Symbol trade.Value = float(columns[2]) or float(columns[3]) # Parse the datetime and adjust the timezone trade_time = datetime.strptime(columns[0], "%Y-%m-%d %H:%M:%S") - timedelta(hours=7) # Round up the minute to the nearest 5 minutes minute = 5 * math.ceil(trade_time.minute / 5) # If the minute is 60, set it to 0 and add 1 hour if minute == 60: trade_time = trade_time.replace(minute=0, hour=trade_time.hour+1) else: trade_time = trade_time.replace(minute=minute) trade.Time = trade_time # trade.EndTime = trade.Time + timedelta(hours=4) trade["Type"] = columns[1] trade["PutStrike"] = float(columns[2]) trade["CallStrike"] = float(columns[3]) trade["MinimumPremium"] = float(columns[4]) return trade
#region imports from AlgorithmImports import * #endregion from .Base import Base class AutoExecutionModel(Base): def __init__(self, context): # Call the Base class __init__ method super().__init__(context)
from AlgorithmImports import * from Tools import ContractUtils, Logger from Execution.Utils import MarketOrderHandler, LimitOrderHandler, LimitOrderHandlerWithCombo """ """ class Base(ExecutionModel): DEFAULT_PARAMETERS = { # Retry decrease/increase percentage. Each time we try and get a fill we are going to decrease the limit price # by this percentage. "retryChangePct": 1.0, # Minimum price percentage accepted as limit price. If the limit price set is 0.5 and this value is 0.8 then # the minimum price accepted will be 0.4 "minPricePct": 0.7, # The limit order price initial adjustmnet. This will add some leeway to the limit order price so we can try and get # some more favorable price for the user than the algo set price. So if we set this to 0.1 (10%) and our limit price # is 0.5 then we will try and fill the order at 0.55 first. "orderAdjustmentPct": -0.2, # The increment we are going to use to adjust the limit price. This is used to # properly adjust the price for SPX options. If the limit price is 0.5 and this # value is 0.01 then we are going to try and fill the order at 0.51, 0.52, 0.53, etc. "adjustmentIncrement": None, # 0.01, # Speed of fill. Option taken from https://optionalpha.com/blog/smartpricing-released. # Can be: "Normal", "Fast", "Patient" # "Normal" will retry every 3 minutes, "Fast" every 1 minute, "Patient" every 5 minutes. "speedOfFill": "Fast", # maxRetries is the maximum number of retries we are going to do to try # and get a fill. This is calculated based on the speedOfFill and this # value is just for reference. "maxRetries": 10, } def __init__(self, context): self.context = context # Calculate maxRetries based on speedOfFill speedOfFill = self.parameter("speedOfFill") if speedOfFill == "Patient": self.maxRetries = 7 elif speedOfFill == "Normal": self.maxRetries = 5 elif speedOfFill == "Fast": self.maxRetries = 3 else: raise ValueError("Invalid speedOfFill value") self.targetsCollection = PortfolioTargetCollection() self.contractUtils = ContractUtils(context) # Set the logger self.logger = Logger(context, className=type(self).__name__, logLevel=context.logLevel) self.marketOrderHandler = MarketOrderHandler(context, self) self.limitOrderHandler = LimitOrderHandler(context, self) # self.limitOrderHandler = LimitOrderHandlerWithCombo(context, self) self.logger.debug(f"{self.__class__.__name__} -> __init__") # Gets or sets the maximum spread compare to current price in percentage. # self.acceptingSpreadPercent = Math.Abs(acceptingSpreadPercent) # self.executionTimeThreshold = timedelta(minutes=10) # self.openExecutedOrders = {} self.context.structure.AddConfiguration(parent=self, **self.getMergedParameters()) @classmethod def getMergedParameters(cls): # Merge the DEFAULT_PARAMETERS from both classes return {**cls.DEFAULT_PARAMETERS, **getattr(cls, "PARAMETERS", {})} @classmethod def parameter(cls, key, default=None): return cls.getMergedParameters().get(key, default) def Execute(self, algorithm, targets): self.context.executionTimer.start('Execution.Base -> Execute') # Use this section to check if a target is in the workingOrder dict self.targetsCollection.AddRange(targets) self.logger.debug(f"{self.__class__.__name__} -> Execute -> targets: {targets}") self.logger.debug(f"{self.__class__.__name__} -> Execute -> targets count: {len(targets)}") self.logger.debug(f"{self.__class__.__name__} -> Execute -> workingOrders: {self.context.workingOrders}") self.logger.debug(f"{self.__class__.__name__} -> Execute -> allPositions: {self.context.allPositions}") # Check if the workingOrders are still OK to execute self.context.structure.checkOpenPositions() self.logger.debug(f"{self.__class__.__name__} -> Execute -> checkOpenPositions") for order in list(self.context.workingOrders.values()): position = self.context.allPositions[order.orderId] useLimitOrders = order.useLimitOrder useMarketOrders = not useLimitOrders self.logger.debug(f"Processing order: {order.orderId}") self.logger.debug(f"Order details: {order}") self.logger.debug(f"Position details: {position}") self.logger.debug(f"Use Limit Orders: {useLimitOrders}") self.logger.debug(f"Use Market Orders: {useMarketOrders}") if useMarketOrders: self.marketOrderHandler.call(position, order) elif useLimitOrders: self.limitOrderHandler.call(position, order) # if not self.targetsCollection.IsEmpty: # for target in targets: # order = Helper().findIn( # self.context.workingOrders.values(), # lambda v: any(t == target for t in v.targets) # ) # orders[order.orderId] = order # for order in orders.values(): # position = self.context.allPositions[order.orderId] # useLimitOrders = order.useLimitOrder # useMarketOrders = not useLimitOrders # if useMarketOrders: # self.executeMarketOrder(position, order) # elif useLimitOrders: # self.executeLimitOrder(position, order) self.targetsCollection.ClearFulfilled(algorithm) # Update the charts after execution self.context.charting.updateCharts() self.context.executionTimer.stop('Execution.Base -> Execute')
#region imports from AlgorithmImports import * #endregion from .Base import Base class SPXExecutionModel(Base): PARAMETERS = { # Retry decrease/increase percentage. Each time we try and get a fill we are going to decrease the limit price # by this percentage. "retryChangePct": -0.05, # Minimum price percentage accepted as limit price. If the limit price set is 0.5 and this value is 0.8 then # the minimum price accepted will be 0.4 "minPricePct": 0.5, # The limit order price initial adjustmnet. This will add some leeway to the limit order price so we can try and get # some more favorable price for the user than the algo set price. So if we set this to 0.1 (10%) and our limit price # is 0.5 then we will try and fill the order at 0.55 first. "orderAdjustmentPct": -0.5, # The increment we are going to use to adjust the limit price. This is used to # properly adjust the price for SPX options. If the limit price is 0.5 and this # value is 0.01 then we are going to try and fill the order at 0.51, 0.52, 0.53, etc. "adjustmentIncrement": 0.05, # Speed of fill. Option taken from https://optionalpha.com/blog/smartpricing-released. # Can be: "Normal", "Fast", "Patient" # "Normal" will retry every 3 minutes, "Fast" every 1 minute, "Patient" every 5 minutes. "speedOfFill": "Fast", # maxRetries is the maximum number of retries we are going to do to try # and get a fill. This is calculated based on the speedOfFill and this # value is just for reference. "maxRetries": 10, } def __init__(self, context): # Call the Base class __init__ method super().__init__(context)
from AlgorithmImports import * """ Started discussion on this here: https://chat.openai.com/chat/b5be32bf-850a-44ba-80fc-44f79a7df763 Use like this in main.py file: percent_of_spread = 0.5 timeout = timedelta(minutes=2) self.SetExecution(SmartPricingExecutionModel(percent_of_spread, timeout)) """ class SmartPricingExecutionModel(ExecutionModel): def __init__(self, percent_of_spread, timeout): self.percent_of_spread = percent_of_spread self.timeout = timeout self.order_tickets = dict() def Execute(self, algorithm, targets): for target in targets: symbol = target.Symbol quantity = target.Quantity # If an order already exists for the symbol, skip if symbol in self.order_tickets: continue # Get the bid-ask spread and apply the user-defined percentage security = algorithm.Securities[symbol] if security.BidPrice != 0 and security.AskPrice != 0: spread = security.AskPrice - security.BidPrice adjusted_spread = spread * self.percent_of_spread if quantity > 0: limit_price = security.BidPrice + adjusted_spread else: limit_price = security.AskPrice - adjusted_spread # Submit the limit order with the calculated price ticket = algorithm.LimitOrder(symbol, quantity, limit_price) self.order_tickets[symbol] = ticket # Set the order expiration expiration = algorithm.UtcTime + self.timeout # ticket.Update(new UpdateOrderFields { TimeInForce = TimeInForce.GoodTilDate(expiration) }) def OnOrderEvent(self, algorithm, order_event): if order_event.Status.IsClosed(): order = algorithm.Transactions.GetOrderById(order_event.OrderId) symbol = order.Symbol if symbol in self.order_tickets: del self.order_tickets[symbol]
#region imports from AlgorithmImports import * #endregion from Tools import ContractUtils, Logger, Underlying # Your New Python File class LimitOrderHandler: def __init__(self, context, base): self.context = context self.contractUtils = ContractUtils(context) self.base = base # Set the logger self.logger = Logger(context, className=type(self).__name__, logLevel=context.logLevel) def call(self, position, order): # Start the timer self.context.executionTimer.start() # Get the context context = self.context # Get the Limit order details # Get the order type: open|close orderType = order.orderType # This updates prices and stats for the order position.updateOrderStats(context, orderType) # This updates the stats for the position position.updateStats(context, orderType) execOrder = position[f"{orderType}Order"] ticket = None orderTransactionIds = execOrder.transactionIds self.logger.debug(f"orderTransactionIds: {orderTransactionIds}") self.logger.debug(f"order.lastRetry: {order.lastRetry}") self.logger.debug(f"self.sinceLastRetry(context, order, timedelta(minutes = 1)): {self.sinceLastRetry(context, order, timedelta(minutes = 1))}") # Exit if we are not at the right scheduled interval if orderTransactionIds and (order.lastRetry is None or self.sinceLastRetry(context, order, timedelta(minutes = 1))): """ IMPORTANT!!: Why do we cancel? If we update the ticket with the new price then we risk execution while updating the price of the rest of the combo order causing discrepancies. """ for id in orderTransactionIds: ticket = context.Transactions.GetOrderTicket(id) ticket.Cancel('Cancelled trade and trying with new prices') # store when we last canceled/retried and check with current time if like 2-3 minutes passed before we retry again. self.makeLimitOrder(position, order, retry = True) # NOTE: If combo limit orders will execute limit orders instead of market orders then let's use this method. # self.updateComboLimitOrder(position, orderTransactionIds) elif not orderTransactionIds: self.makeLimitOrder(position, order) # Stop the timer self.context.executionTimer.stop() def makeLimitOrder(self, position, order, retry = False): context = self.context # Get the Limit order details # Get the order type: open|close orderType = order.orderType limitOrderPrice = self.limitOrderPrice(order) execOrder = position[f"{orderType}Order"] # Keep track of the midPrices of this order for faster debugging execOrder.priceProgressList.append(round(execOrder.midPrice, 2)) orderTag = position.orderTag # Get the contracts contracts = [v.contract for v in position.legs] # Get the order quantity orderQuantity = position.orderQuantity # Sign of the order: open -> 1 (use orderSide as is), close -> -1 (reverse the orderSide) orderSign = 2 * int(orderType == "open") - 1 # Get the order sides orderSides = np.array([c.contractSide for c in position.legs]) # Define the legs of the combo order legs = [] isComboOrder = len(contracts) > 1 # Log the parameters used to validate the order self.logger.debug(f"Executing Limit Order to {orderType} the position:") self.logger.debug(f" - orderType: {orderType}") self.logger.debug(f" - orderTag: {orderTag}") self.logger.debug(f" - underlyingPrice: {Underlying(context, position.underlyingSymbol()).Price()}") self.logger.debug(f" - strikes: {[c.Strike for c in contracts]}") self.logger.debug(f" - orderQuantity: {orderQuantity}") self.logger.debug(f" - midPrice: {execOrder.midPrice} (limitOrderPrice: {limitOrderPrice})") self.logger.debug(f" - bidAskSpread: {execOrder.bidAskSpread}") # Calculate the adjustment value based on the difference between the limit price and the total midPrice # TODO: this might have to be changed if we start buying options instead of selling for premium. if orderType == "close": adjustmentValue = self.calculateAdjustmentValueBought( execOrder=execOrder, limitOrderPrice=limitOrderPrice, retries=order.fillRetries, nrContracts=len(contracts) ) else: adjustmentValue = self.calculateAdjustmentValueSold( execOrder=execOrder, limitOrderPrice=limitOrderPrice, retries=order.fillRetries, nrContracts=len(contracts) ) # IMPORTANT!! Because ComboLimitOrder right now still executes market orders we should not use it. We need to use ComboLegLimitOrder and that will work. for n, contract in enumerate(contracts): # Set the order side: -1 -> Sell, +1 -> Buy orderSide = orderSign * orderSides[n] if orderSide != 0: newLimitPrice = self.contractUtils.midPrice(contract) + adjustmentValue if orderSide == -1 else self.contractUtils.midPrice(contract) - adjustmentValue # round the price or we get an error like: # Adjust the limit price to meet brokerage precision requirements increment = self.base.adjustmentIncrement if self.base.adjustmentIncrement is not None else 0.05 newLimitPrice = round(newLimitPrice / increment) * increment newLimitPrice = round(newLimitPrice, 1) # Ensure the price is rounded to two decimal places newLimitPrice = max(newLimitPrice, increment) # make sure the price is never 0. At least the increment. self.logger.info(f"{orderType.upper()} {orderQuantity} {orderTag}, {contract.Symbol}, newLimitPrice: {newLimitPrice}") if isComboOrder: legs.append(Leg.Create(contract.Symbol, orderSide, newLimitPrice)) else: newTicket = context.LimitOrder(contract.Symbol, orderQuantity, newLimitPrice, tag=orderTag) execOrder.transactionIds = [newTicket.OrderId] log_message = f"{orderType.upper()} {orderQuantity} {orderTag}, " log_message += f"{[c.Strike for c in contracts]} @ Mid: {round(execOrder.midPrice, 2)}, " log_message += f"NewLimit: {round(sum([l.OrderPrice * l.Quantity for l in legs]), 2)}, " log_message += f"Limit: {round(limitOrderPrice, 2)}, " log_message += f"DTTM: {execOrder.limitOrderExpiryDttm}, " log_message += f"Spread: ${round(execOrder.bidAskSpread, 2)}, " log_message += f"Bid & Ask: {[(round(self.contractUtils.bidPrice(c), 2), round(self.contractUtils.askPrice(c),2)) for c in contracts]}, " log_message += f"Volume: {[self.contractUtils.volume(c) for c in contracts]}, " log_message += f"OpenInterest: {[self.contractUtils.openInterest(c) for c in contracts]}" if orderType.lower() == 'close': log_message += f", Reason: {position.closeReason}" # To limit logs just log every 25 minutes self.logger.info(log_message) ### for contract in contracts if isComboOrder: # Execute by using a multi leg order if we have multiple sides. newTicket = context.ComboLegLimitOrder(legs, orderQuantity, tag=orderTag) execOrder.transactionIds = [t.OrderId for t in newTicket] # Store the last retry on this order. This is not ideal but the only way to handle combo limit orders on QC as the comboLimitOrder and all the others # as soon as you update one leg it will execute and mess it up. if retry: order.lastRetry = context.Time order.fillRetries += 1 # increment the number of fill tries def limitOrderPrice(self, order): orderType = order.orderType limitOrderPrice = order.limitOrderPrice # Just use a default limit price that is supposed to be the smallest prossible. # The limit order price of 0 can happen if the trade is worthless. if limitOrderPrice == 0 and orderType == 'close': limitOrderPrice = 0.05 return limitOrderPrice def sinceLastRetry(self, context, order, frequency = timedelta(minutes = 3)): if order.lastRetry is None: return True timeSinceLastRetry = context.Time - order.lastRetry minutesSinceLastRetry = timedelta(minutes = round(timeSinceLastRetry.seconds / 60)) return minutesSinceLastRetry % frequency == timedelta(minutes=0) def calculateAdjustmentValueSold(self, execOrder, limitOrderPrice, retries=0, nrContracts=1): if self.base.orderAdjustmentPct is None and self.base.adjustmentIncrement is None: raise ValueError("orderAdjustmentPct or adjustmentIncrement must be set in the parameters") # Adjust the limitOrderPrice limitOrderPrice += self.base.orderAdjustmentPct * limitOrderPrice # Increase the price by orderAdjustmentPct min_price = self.base.minPricePct * limitOrderPrice # Minimum allowed price is % of limitOrderPrice # Calculate the range and step if self.base.adjustmentIncrement is None: # Calculate the step based on the bidAskSpread and the number of retries step = execOrder.bidAskSpread / self.base.maxRetries else: step = self.base.adjustmentIncrement # Start with the preferred price target_price = execOrder.midPrice + step # If we have retries, adjust the target price accordingly if retries > 0: target_price -= retries * step # Ensure the target price does not fall below the minimum limit if target_price < min_price: target_price = min_price # Round the target price to the nearest multiple of adjustmentIncrement target_price = round(target_price / step) * step # Calculate the adjustment value adjustment_value = (target_price - execOrder.midPrice) / nrContracts return adjustment_value def calculateAdjustmentValueBought(self, execOrder, limitOrderPrice, retries=0, nrContracts=1): if self.base.orderAdjustmentPct is None and self.base.adjustmentIncrement is None: raise ValueError("orderAdjustmentPct or adjustmentIncrement must be set in the parameters") # Adjust the limitOrderPrice limitOrderPrice += self.base.orderAdjustmentPct * limitOrderPrice # Increase the price by orderAdjustmentPct increment = self.base.retryChangePct * limitOrderPrice # Increment value for each retry max_price = self.base.minPricePct * limitOrderPrice # Maximum allowed price is % of limitOrderPrice # Start with the preferred price target_price = max_price # If we have retries, increment the target price accordingly if retries > 0: target_price += retries * increment # Ensure the target price does not exceed the maximum limit if target_price > limitOrderPrice: target_price = limitOrderPrice # Calculate the range and step if self.base.adjustmentIncrement is None: # Calculate the step based on the bidAskSpread and the number of retries step = execOrder.bidAskSpread / self.base.maxRetries else: step = self.base.adjustmentIncrement # Round the target price to the nearest multiple of adjustmentIncrement target_price = round(target_price / step) * step # Calculate the adjustment value adjustment_value = (target_price - execOrder.midPrice) / nrContracts return adjustment_value """ def updateComboLimitOrder(self, position, orderTransactionIds): context = self.context for id in orderTransactionIds: ticket = context.Transactions.GetOrderTicket(id) # store when we last canceled/retried and check with current time if like 2-3 minutes passed before we retry again. leg = next((leg for leg in position.legs if ticket.Symbol == leg.symbol), None) contract = leg.contract # To update the limit price of the combo order, you only need to update the limit price of one of the leg orders. # The Update method returns an OrderResponse to signal the success or failure of the update request. if ticket and ticket.Status is not OrderStatus.Filled: newLimitPrice = self.contractUtils.midPrice(contract) + 0.1 if leg.isSold else self.contractUtils.midPrice(contract) - 0.1 update_settings = UpdateOrderFields() update_settings.LimitPrice = newLimitPrice response = ticket.Update(update_settings) # Check if the update was successful if response.IsSuccess: self.logger.debug(f"Order updated successfully for {ticket.Symbol}") """
#region imports from AlgorithmImports import * #endregion from Tools import ContractUtils, Logger, Underlying # Your New Python File class LimitOrderHandlerWithCombo: def __init__(self, context, base): self.context = context self.contractUtils = ContractUtils(context) self.base = base # Set the logger self.logger = Logger(context, className=type(self).__name__, logLevel=context.logLevel) def call(self, position, order): # Start the timer self.context.executionTimer.start() # Get the context context = self.context # Get the Limit order details # Get the order type: open|close orderType = order.orderType # This updates prices and stats for the order position.updateOrderStats(context, orderType) # This updates the stats for the position position.updateStats(context, orderType) execOrder = position[f"{orderType}Order"] ticket = None orderTransactionIds = execOrder.transactionIds self.logger.debug(f"orderTransactionIds: {orderTransactionIds}") self.logger.debug(f"order.lastRetry: {order.lastRetry}") self.logger.debug(f"self.sinceLastRetry(context, order, timedelta(minutes = 1)): {self.sinceLastRetry(context, order, timedelta(minutes = 1))}") # Exit if we are not at the right scheduled interval if orderTransactionIds and (order.lastRetry is None or self.sinceLastRetry(context, order, timedelta(minutes = 1))): self.updateComboLimitOrder(position, order, orderTransactionIds) elif not orderTransactionIds: self.makeLimitOrder(position, order) # Stop the timer self.context.executionTimer.stop() def makeLimitOrder(self, position, order, retry = False): context = self.context orderType = order.orderType limitOrderPrice = self.limitOrderPrice(order) execOrder = position[f"{orderType}Order"] # Keep track of the midPrices of this order for faster debugging execOrder.priceProgressList.append(round(execOrder.midPrice, 2)) orderTag = position.orderTag # Get the contracts contracts = [v.contract for v in position.legs] # Get the order quantity orderQuantity = position.orderQuantity # Sign of the order: open -> 1 (use orderSide as is), close -> -1 (reverse the orderSide) orderSign = 2 * int(orderType == "open") - 1 # Get the order sides orderSides = np.array([c.contractSide for c in position.legs]) # Define the legs of the combo order legs = [] for n, contract in enumerate(contracts): # Set the order side: -1 -> Sell, +1 -> Buy orderSide = orderSign * orderSides[n] if orderSide != 0: legs.append(Leg.Create(contract.Symbol, orderSide)) # Calculate the new limit price newLimitPrice = self.calculateNewLimitPrice(position, execOrder, limitOrderPrice, order.fillRetries, len(contracts), orderType) # Log the parameters used to validate the order self.logOrderDetails(position, order) # Execute the combo limit order newTicket = context.ComboLimitOrder(legs, orderQuantity, newLimitPrice, tag=orderTag) execOrder.transactionIds = [t.OrderId for t in newTicket] # Log the order execution self.logOrderExecution(position, order, newLimitPrice) # Update order information if it's a retry if retry: order.lastRetry = context.Time order.fillRetries += 1 def updateComboLimitOrder(self, position, order, orderTransactionIds): context = self.context orderType = order.orderType execOrder = position[f"{orderType}Order"] # Calculate the new limit price limitOrderPrice = self.limitOrderPrice(order) newLimitPrice = self.calculateNewLimitPrice(position, execOrder, limitOrderPrice, order.fillRetries, len(position.legs), orderType) # Get the first order ticket (we only need to update one for the combo order) ticket = context.Transactions.GetOrderTicket(orderTransactionIds[0]) if ticket and ticket.Status != OrderStatus.Filled: update_settings = UpdateOrderFields() update_settings.LimitPrice = newLimitPrice response = ticket.Update(update_settings) if response.IsSuccess: self.logger.debug(f"Combo order updated successfully. New limit price: {newLimitPrice}") else: self.logger.warning(f"Failed to update combo order: {response.ErrorCode}") # Log the update self.logOrderExecution(position, order, newLimitPrice, action="UPDATED") # Update order information order.lastRetry = context.Time order.fillRetries += 1 # increment the number of fill tries def calculateNewLimitPrice(self, position, execOrder, limitOrderPrice, retries, nrContracts, orderType): if orderType == "close": adjustmentValue = self.calculateAdjustmentValueBought( execOrder=execOrder, limitOrderPrice=limitOrderPrice, retries=retries, nrContracts=nrContracts ) else: adjustmentValue = self.calculateAdjustmentValueSold( execOrder=execOrder, limitOrderPrice=limitOrderPrice, retries=retries, nrContracts=nrContracts ) # Determine if it's a credit or debit strategy isCredit = position.isCreditStrategy if isCredit: # For credit strategies, we want to receive at least this much (negative value) newLimitPrice = -(abs(execOrder.midPrice) - adjustmentValue) if orderType == "open" else -(abs(execOrder.midPrice) + adjustmentValue) else: # For debit strategies, we're willing to pay up to this much (positive value) newLimitPrice = execOrder.midPrice + adjustmentValue if orderType == "open" else execOrder.midPrice - adjustmentValue # Adjust the limit price to meet brokerage precision requirements increment = self.base.adjustmentIncrement if self.base.adjustmentIncrement is not None else 0.05 newLimitPrice = round(newLimitPrice / increment) * increment newLimitPrice = round(newLimitPrice, 2) # Ensure the price is rounded to two decimal places # Ensure the price is never 0 and maintains the correct sign if isCredit: newLimitPrice = min(newLimitPrice, -increment) else: newLimitPrice = max(newLimitPrice, increment) return newLimitPrice def logOrderDetails(self, position, order): orderType = order.orderType execOrder = position[f"{orderType}Order"] contracts = [v.contract for v in position.legs] self.logger.debug(f"Executing Limit Order to {orderType} the position:") self.logger.debug(f" - orderType: {orderType}") self.logger.debug(f" - orderTag: {position.orderTag}") self.logger.debug(f" - underlyingPrice: {Underlying(self.context, position.underlyingSymbol()).Price()}") self.logger.debug(f" - strikes: {[c.Strike for c in contracts]}") self.logger.debug(f" - orderQuantity: {position.orderQuantity}") self.logger.debug(f" - midPrice: {execOrder.midPrice} (limitOrderPrice: {self.limitOrderPrice(order)})") self.logger.debug(f" - bidAskSpread: {execOrder.bidAskSpread}") def logOrderExecution(self, position, order, newLimitPrice, action=None): orderType = order.orderType execOrder = position[f"{orderType}Order"] contracts = [v.contract for v in position.legs] action = action or orderType.upper() log_message = f"{action} {position.orderQuantity} {position.orderTag}, " log_message += f"{[c.Strike for c in contracts]} @ Mid: {round(execOrder.midPrice, 2)}, " log_message += f"NewLimit: {round(newLimitPrice, 2)}, " log_message += f"Limit: {round(self.limitOrderPrice(order), 2)}, " log_message += f"DTTM: {execOrder.limitOrderExpiryDttm}, " log_message += f"Spread: ${round(execOrder.bidAskSpread, 2)}, " log_message += f"Bid & Ask: {[(round(self.contractUtils.bidPrice(c), 2), round(self.contractUtils.askPrice(c),2)) for c in contracts]}, " log_message += f"Volume: {[self.contractUtils.volume(c) for c in contracts]}, " log_message += f"OpenInterest: {[self.contractUtils.openInterest(c) for c in contracts]}" if orderType.lower() == 'close': log_message += f", Reason: {position.closeReason}" # To limit logs just log every 25 minutes self.logger.info(log_message) def limitOrderPrice(self, order): orderType = order.orderType limitOrderPrice = order.limitOrderPrice # Just use a default limit price that is supposed to be the smallest prossible. # The limit order price of 0 can happen if the trade is worthless. if limitOrderPrice == 0 and orderType == 'close': limitOrderPrice = 0.05 return limitOrderPrice def sinceLastRetry(self, context, order, frequency = timedelta(minutes = 3)): if order.lastRetry is None: return True timeSinceLastRetry = context.Time - order.lastRetry minutesSinceLastRetry = timedelta(minutes = round(timeSinceLastRetry.seconds / 60)) return minutesSinceLastRetry % frequency == timedelta(minutes=0) def calculateAdjustmentValueSold(self, execOrder, limitOrderPrice, retries=0, nrContracts=1): if self.base.orderAdjustmentPct is None and self.base.adjustmentIncrement is None: raise ValueError("orderAdjustmentPct or adjustmentIncrement must be set in the parameters") # Adjust the limitOrderPrice limitOrderPrice += self.base.orderAdjustmentPct * limitOrderPrice # Increase the price by orderAdjustmentPct min_price = self.base.minPricePct * limitOrderPrice # Minimum allowed price is % of limitOrderPrice # Calculate the range and step if self.base.adjustmentIncrement is None: # Calculate the step based on the bidAskSpread and the number of retries step = execOrder.bidAskSpread / self.base.maxRetries else: step = self.base.adjustmentIncrement step = max(step, 0.01) # Ensure the step is at least 0.01 # Start with the preferred price target_price = execOrder.midPrice + step # If we have retries, adjust the target price accordingly if retries > 0: target_price -= retries * step # Ensure the target price does not fall below the minimum limit if target_price < min_price: target_price = min_price # Round the target price to the nearest multiple of adjustmentIncrement target_price = round(target_price / step) * step # Calculate the adjustment value adjustment_value = (target_price - execOrder.midPrice) / nrContracts return adjustment_value def calculateAdjustmentValueBought(self, execOrder, limitOrderPrice, retries=0, nrContracts=1): if self.base.orderAdjustmentPct is None and self.base.adjustmentIncrement is None: raise ValueError("orderAdjustmentPct or adjustmentIncrement must be set in the parameters") # Adjust the limitOrderPrice limitOrderPrice += self.base.orderAdjustmentPct * limitOrderPrice # Increase the price by orderAdjustmentPct increment = self.base.retryChangePct * limitOrderPrice # Increment value for each retry max_price = self.base.minPricePct * limitOrderPrice # Maximum allowed price is % of limitOrderPrice # Start with the preferred price target_price = max_price # If we have retries, increment the target price accordingly if retries > 0: target_price += retries * increment # Ensure the target price does not exceed the maximum limit if target_price > limitOrderPrice: target_price = limitOrderPrice # Calculate the range and step if self.base.adjustmentIncrement is None: # Calculate the step based on the bidAskSpread and the number of retries step = execOrder.bidAskSpread / self.base.maxRetries else: step = self.base.adjustmentIncrement # Round the target price to the nearest multiple of adjustmentIncrement target_price = round(target_price / step) * step # Calculate the adjustment value adjustment_value = (target_price - execOrder.midPrice) / nrContracts return adjustment_value
#region imports from AlgorithmImports import * #endregion from Tools import ContractUtils, Logger, Underlying # Your New Python File class MarketOrderHandler: def __init__(self, context, base): self.context = context self.base = base self.contractUtils = ContractUtils(context) # Set the logger self.logger = Logger(context, className=type(self).__name__, logLevel=context.logLevel) def call(self, position, order): # Start the timer self.context.executionTimer.start() # Get the context context = self.context orderTag = position.orderTag orderQuantity = position.orderQuantity orderType = order.orderType contracts = [v.contract for v in position.legs] orderSides = [v.contractSide for v in position.legs] bidAskSpread = sum(list(map(self.contractUtils.bidAskSpread, contracts))) midPrice = sum(side * self.contractUtils.midPrice(contract) for side, contract in zip(orderSides, contracts)) underlying = Underlying(context, position.underlyingSymbol()) orderSign = 2 * int(orderType == "open") - 1 execOrder = position[f"{orderType}Order"] execOrder.midPrice = midPrice # Check if the order already has transaction IDs orderTransactionIds = execOrder.transactionIds if orderTransactionIds: self.logger.debug(f"Market order already placed. Waiting for execution. Transaction IDs: {orderTransactionIds}") return # This updates prices and stats for the order position.updateOrderStats(context, orderType) # This updates the stats for the position position.updateStats(context, orderType) # Keep track of the midPrices of this order for faster debugging execOrder.priceProgressList.append(round(midPrice, 2)) isComboOrder = len(position.legs) > 1 legs = [] # Loop through all contracts for contract in position.legs: # Get the order side orderSide = contract.contractSide * orderSign # Get the order quantity quantity = contract.quantity # Get the contract symbol symbol = contract.symbol # Get the contract object security = context.Securities[symbol] # get the target target = next(t for t in order.targets if t.Symbol == symbol) # calculate remaining quantity to be ordered # quantity = OrderSizing.GetUnorderedQuantity(context, target, security) self.logger.debug(f"{orderType} contract {symbol}:") self.logger.debug(f" - orderSide: {orderSide}") self.logger.debug(f" - quantity: {quantity}") self.logger.debug(f" - orderTag: {orderTag}") if orderSide != 0: if isComboOrder: # If we are doing market orders, we need to create the legs of the combo order legs.append(Leg.Create(symbol, orderSide)) else: # Send the Market order (asynchronous = True -> does not block the execution in case of partial fills) context.MarketOrder( symbol, orderSide * quantity, asynchronous=True, tag=orderTag ) ### Loop through all contracts # Log the parameters used to validate the order log_message = f"{orderType.upper()} {orderQuantity} {orderTag}, " log_message += f"{[c.Strike for c in contracts]} @ Mid: {round(midPrice, 2)}" if orderType.lower() == 'close': log_message += f", Reason: {position.closeReason}" self.logger.info(log_message) self.logger.debug(f"Executing Market Order to {orderType} the position:") self.logger.debug(f" - orderType: {orderType}") self.logger.debug(f" - orderTag: {orderTag}") self.logger.debug(f" - underlyingPrice: {underlying.Price()}") self.logger.debug(f" - strikes: {[c.Strike for c in contracts]}") self.logger.debug(f" - orderQuantity: {orderQuantity}") self.logger.debug(f" - midPrice: {midPrice}") self.logger.debug(f" - bidAskSpread: {bidAskSpread}") # Execute only if we have multiple legs (sides) per order and no existing transaction IDs if ( len(legs) > 0 and not orderTransactionIds # Validate the bid-ask spread to make sure it's not too wide and not (position.strategyParam("validateBidAskSpread") and abs(bidAskSpread) > position.strategyParam("bidAskSpreadRatio")*abs(midPrice)) ): order_result = context.ComboMarketOrder( legs, orderQuantity, asynchronous=True, tag=orderTag ) execOrder.transactionIds = [t.OrderId for t in order_result] # Stop the timer self.context.executionTimer.stop()
#region imports from AlgorithmImports import * #endregion from .LimitOrderHandler import LimitOrderHandler from .LimitOrderHandlerWithCombo import LimitOrderHandlerWithCombo from .MarketOrderHandler import MarketOrderHandler
#region imports from AlgorithmImports import * #endregion # Your New Python File from .AutoExecutionModel import AutoExecutionModel from .SmartPricingExecutionModel import SmartPricingExecutionModel from .SPXExecutionModel import SPXExecutionModel
#region imports from AlgorithmImports import * #endregion class AlwaysBuyingPowerModel(BuyingPowerModel): def __init__(self, context): super().__init__() self.context = context def HasSufficientBuyingPowerForOrder(self, parameters): # custom behavior: this model will assume that there is always enough buying power hasSufficientBuyingPowerForOrderResult = HasSufficientBuyingPowerForOrderResult(True) self.context.logger.debug(f"CustomBuyingPowerModel: {hasSufficientBuyingPowerForOrderResult.IsSufficient}") return hasSufficientBuyingPowerForOrderResult
#region imports from AlgorithmImports import * #endregion import numpy as np # Custom Fill model based on Beta distribution: # - Orders are filled based on a Beta distribution skewed towards the mid-price with Sigma = bidAskSpread/6 (-> 99% fills within the bid-ask spread) class BetaFillModel(ImmediateFillModel): # Initialize Random Number generator with a fixed seed (for replicability) random = np.random.RandomState(1234) def __init__(self, context): self.context = context def MarketFill(self, asset, order): # Start the timer self.context.executionTimer.start() # Get the random number generator random = BetaFillModel.random # Compute the Bid-Ask spread bidAskSpread = abs(asset.AskPrice - asset.BidPrice) # Compute the Mid-Price midPrice = 0.5 * (asset.AskPrice + asset.BidPrice) # Call the parent method fill = super().MarketFill(asset, order) # Setting the parameters of the Beta distribution: # - The shape parameters (alpha and beta) are chosen such that the fill is "reasonably close" to the mid-price about 96% of the times # - How close -> The fill price is within 15% of half the bid-Ask spread if order.Direction == OrderDirection.Sell: # Beta distribution in the range [Bid-Price, Mid-Price], skewed towards the Mid-Price # - Fill price is within the range [Mid-Price - 0.15*bidAskSpread/2, Mid-Price] with about 96% probability offset = asset.BidPrice alpha = 20 beta = 1 else: # Beta distribution in the range [Mid-Price, Ask-Price], skewed towards the Mid-Price # - Fill price is within the range [Mid-Price, Mid-Price + 0.15*bidAskSpread/2] with about 96% probability offset = midPrice alpha = 1 beta = 20 # Range (width) of the Beta distribution range = bidAskSpread / 2.0 # Compute the new fillPrice (centered around the midPrice) fillPrice = round(offset + range * random.beta(alpha, beta), 2) # Update the FillPrice attribute fill.FillPrice = fillPrice # Stop the timer self.context.executionTimer.stop() # Return the fill return fill
#region imports from AlgorithmImports import * #endregion import re import numpy as np from Tools import Logger, Helper """ Details about order types: /// New order pre-submission to the order processor (0) New = 0, /// Order submitted to the market (1) Submitted = 1, /// Partially filled, In Market Order (2) PartiallyFilled = 2, /// Completed, Filled, In Market Order (3) Filled = 3, /// Order cancelled before it was filled (5) Canceled = 5, /// No Order State Yet (6) None = 6, /// Order invalidated before it hit the market (e.g. insufficient capital) (7) Invalid = 7, /// Order waiting for confirmation of cancellation (6) CancelPending = 8, /// Order update submitted to the market (9) UpdateSubmitted = 9 """ class HandleOrderEvents: def __init__(self, context, orderEvent): self.context = context self.orderEvent = orderEvent self.logger = Logger(self.context, className=type(self.context).__name__, logLevel=self.context.logLevel) # section: handle order events from main.py def Call(self): # Get the context context = self.context orderEvent = self.orderEvent # Start the timer context.executionTimer.start() # Process only Fill events if not (orderEvent.Status == OrderStatus.Filled or orderEvent.Status == OrderStatus.PartiallyFilled): return if(orderEvent.IsAssignment): # TODO: Liquidate the assigned position. # Eventually figure out which open position it belongs to and close that position. return # Get the orderEvent id orderEventId = orderEvent.OrderId # Retrieve the order associated to this events order = context.Transactions.GetOrderById(orderEventId) # Get the order tag. Remove any warning text that might have been added in case of Fills at Stale Price orderTag = re.sub(" - Warning.*", "", order.Tag) # TODO: Additionally check for OTM Underlying order.Tag that would mean it expired worthless. # if orderEvent.FillPrice == 0.0: # position = next((position for position in context.allPositions if any(leg.symbol == orderEvent.Symbol for leg in position.legs)), None) # context.workingOrders.pop(position.orderTag) # Get the working order (if available) workingOrder = context.workingOrders.get(orderTag) # Exit if this order tag is not in the list of open orders. if workingOrder == None: return # Get the position from the openPositions openPosition = context.openPositions.get(orderTag) if openPosition is None: return # Retrieved the book position (this it the full entry inside allPositions that will be converted into a CSV record) # bookPosition = context.allPositions[orderId] bookPosition = context.allPositions[openPosition] contractInfo = Helper().findIn( bookPosition.legs, lambda c: c.symbol == orderEvent.Symbol ) # Exit if we couldn't find the contract info. if contractInfo == None: return # Get the order id and expiryStr value for the contract orderId = bookPosition.orderId # contractInfo["orderId"] positionKey = bookPosition.orderTag # contractInfo["positionKey"] expiryStr = contractInfo.expiry # contractInfo["expiryStr"] orderType = workingOrder.orderType # contractInfo["orderType"] # Log the order event self.logger.debug(f" -> Processing order id {orderId} (orderTag: {orderTag} - orderType: {orderType} - Expiry: {expiryStr})") # Get the contract associated to this order event contract = contractInfo.contract # openPosition["contractDictionary"][orderEvent.Symbol] # Get the description associated with this contract contractDesc = contractInfo.key # openPosition["contractSideDesc"][orderEvent.Symbol] # Get the quantity used to open the position positionQuantity = bookPosition.orderQuantity # openPosition["orderQuantity"] # Get the side of each leg (-n -> Short, +n -> Long) contractSides = np.array([c.contractSide for c in bookPosition.legs]) # np.array(openPosition["sides"]) # Leg Quantity legQuantity = abs(bookPosition.contractSide[orderEvent.Symbol]) # Total legs quantity in the whole position Nlegs = sum(abs(contractSides)) # get the position order block execOrder = bookPosition[f"{orderType}Order"] # Check if the contract was filled at a stale price (Warnings in the orderTag) if re.search(" - Warning.*", order.Tag): self.logger.warning(order.Tag) execOrder.stalePrice = True bookPosition[f"{orderType}StalePrice"] = True # Add the order to the list of openPositions orders (only if this is the first time the order is filled - in case of partial fills) # if contractInfo["fills"] == 0: # openPosition[f"{orderType}Order"]["orders"].append(order) # Update the number of filled contracts associated with this order workingOrder.fills += abs(orderEvent.FillQuantity) # Remove this order entry from the self.workingOrders[orderTag] dictionary if it has been fully filled # if workingOrder.fills == legQuantity * positionQuantity: # removedOrder = context.workingOrders.pop(orderTag) # # Update the stats of the given contract inside the bookPosition (reverse the sign of the FillQuantity: Sell -> credit, Buy -> debit) # bookPosition.updateContractStats(openPosition, contract, orderType = orderType, fillPrice = - np.sign(orderEvent.FillQuantity) * orderEvent.FillPrice) # Update the counter of positions that have been filled execOrder.fills += abs(orderEvent.FillQuantity) execOrder.fillPrice -= np.sign(orderEvent.FillQuantity) * orderEvent.FillPrice # Get the total amount of the transaction transactionAmt = orderEvent.FillQuantity * orderEvent.FillPrice * 100 # Check if this is a fill order for an entry position if orderType == "open": # Update the openPremium field to include the current transaction (use "-=" to reverse the side of the transaction: Short -> credit, Long -> debit) bookPosition.openPremium -= transactionAmt else: # This is an order for the exit position # Update the closePremium field to include the current transaction (use "-=" to reverse the side of the transaction: Sell -> credit, Buy -> debit) bookPosition.closePremium -= transactionAmt # Check if all legs have been filled if execOrder.fills == Nlegs*positionQuantity: execOrder.filled = True bookPosition.updateOrderStats(context, orderType) # Remove the working order now that it has been filled context.workingOrders.pop(orderTag) # Set the time when the full order was filled bookPosition[orderType + "FilledDttm"] = context.Time # Record the order mid price bookPosition[orderType + "OrderMidPrice"] = execOrder.midPrice # All of this for the logger.info orderTypeUpper = orderType.upper() premium = round(bookPosition[f'{orderType}Premium'], 2) fillPrice = round(execOrder.fillPrice, 2) message = f" >>> {orderTypeUpper}: {orderTag}, Premium: ${premium} @ ${fillPrice}" if orderTypeUpper == "CLOSE": PnL = round(bookPosition.PnL, 2) percentage = round(bookPosition.PnL / bookPosition.openPremium * 100, 2) message += f"; P&L: ${PnL} ({percentage}%)" self.logger.info(message) self.logger.info(f"Working order progress of prices: {execOrder.priceProgressList}") self.logger.info(f"Position progress of prices: {bookPosition.priceProgressList}") self.logger.debug(f"The {orderType} event happened:") self.logger.debug(f" - orderType: {orderType}") self.logger.debug(f" - orderTag: {orderTag}") self.logger.debug(f" - premium: ${bookPosition[f'{orderType}Premium']}") self.logger.debug(f" - {orderType} price: ${round(execOrder.fillPrice, 2)}") context.charting.plotTrade(bookPosition, orderType) if orderType == "open": # Trigger an update of the charts context.statsUpdated = True # Marks the date/time of the most recenlty opened position context.lastOpenedDttm = context.Time # Store the credit received (needed to determine the stop loss): value is per share (divided by 100) execOrder.premium = bookPosition.openPremium / 100 # Check if the entire position has been closed if orderType == "close" and bookPosition.openOrder.filled and bookPosition.closeOrder.filled: # Compute P&L for the position positionPnL = bookPosition.openPremium + bookPosition.closePremium # Store the PnL for the position bookPosition.PnL = positionPnL # Now we can remove the position from the self.openPositions dictionary context.openPositions.pop(orderTag) # Compute the DTE at the time of closing the position closeDte = (contract.Expiry.date() - context.Time.date()).days # Collect closing trade info closeTradeInfo = {"orderTag": orderTag, "closeDte": closeDte} # Add this trade info to the FIFO list context.recentlyClosedDTE.append(closeTradeInfo) # ########################### # Collect Performance metrics # ########################### context.charting.updateStats(bookPosition) # Stop the timer context.executionTimer.stop() # ENDsection: handle order events from main.py
#region imports from AlgorithmImports import * #endregion # Custom class: fills orders at the mid-price class MidPriceFillModel(ImmediateFillModel): def __init__(self, context): self.context = context def MarketFill(self, asset, order): # Start the timer self.context.executionTimer.start() # Call the parent method fill = super().MarketFill(asset, order) # Compute the new fillPrice (at the mid-price) fillPrice = round(0.5 * (asset.AskPrice + asset.BidPrice), 2) # Update the FillPrice attribute fill.FillPrice = fillPrice # Stop the timer self.context.executionTimer.stop() # Return the fill return fill
#region imports from AlgorithmImports import * #endregion from Tools import Timer, Logger, DataHandler, Underlying, Charting from Initialization import AlwaysBuyingPowerModel, BetaFillModel, TastyWorksFeeModel """ This class is used to setup the base structure of the algorithm in the main.py file. It is used to setup the logger, the timer, the brokerage model, the security initializer, the option chain filter function and the benchmark. It is also used to schedule an event to get the underlying price at market open. The class has chainable methods for Setup and AddUnderlying. How to use it: 1. Import the class 2. Create an instance of the class in the Initialize method of the algorithm 3. Call the AddUnderlying method to add the underlying and the option chain to the algorithm Example: from Initialization import SetupBaseStructure class Algorithm(QCAlgorithm): def Initialize(self): # Set the algorithm base variables and structures self.structure = SetupBaseStructure(self) self.structure.Setup() # Add the alpha model and that will add the underlying and the option chain to the # algorithm self.SetAlpha(AlphaModel(self)) class AlphaModel: def __init__(self, context): # Store the context as a class variable self.context = context # Add the underlying and the option chain to the algorithm self.context.structure.AddUnderlying(self, "SPX") """ class SetupBaseStructure: # Default parameters DEFAULT_PARAMETERS = { "creditStrategy": True, # ----------------------------- # THESE BELOW ARE GENERAL PARAMETERS "backtestMarketCloseCutoffTime": time(15, 45, 0), # Controls whether to include Cancelled orders (Limit orders that didn't fill) in the final output "includeCancelledOrders": True, # Risk Free Rate for the Black-Scholes-Merton model "riskFreeRate": 0.001, # Upside/Downside stress applied to the underlying to calculate the portfolio margin requirement of the position "portfolioMarginStress": 0.12, # Controls the memory (in minutes) of EMA process. The exponential decay # is computed such that the contribution of each value decays by 95% # after <emaMemory> minutes (i.e. decay^emaMemory = 0.05) "emaMemory": 200, } # Initialize the algorithm # The context is the class that contains all the variables that are shared across the different classes def __init__(self, context): # Store the context as a class variable self.context = context def Setup(self): self.context.positions = {} # Set the logger self.context.logger = Logger(self.context, className=type(self.context).__name__, logLevel=self.context.logLevel) # Set the timer to monitor the execution performance self.context.executionTimer = Timer(self.context) self.context.logger.debug(f'{self.__class__.__name__} -> Setup') # Set brokerage model and margin account self.context.SetBrokerageModel(BrokerageName.InteractiveBrokersBrokerage, AccountType.Margin) # override security position group model self.context.Portfolio.SetPositions(SecurityPositionGroupModel.Null) # Set requested data resolution self.context.universe_settings.resolution = self.context.timeResolution # Keep track of the option contract subscriptions self.context.optionContractsSubscriptions = [] # Set Security Initializer self.context.SetSecurityInitializer(self.CompleteSecurityInitializer) # Initialize the dictionary to keep track of all positions self.context.allPositions = {} # Dictionary to keep track of all open positions self.context.openPositions = {} # Create dictionary to keep track of all the working orders. It stores orderTags self.context.workingOrders = {} # Create FIFO list to keep track of all the recently closed positions (needed for the Dynamic DTE selection) self.context.recentlyClosedDTE = [] # Keep track of when was the last position opened self.context.lastOpenedDttm = None # Keep track of all strategies instances. We mainly need this to filter through them in case # we want to call some general method. self.context.strategies = [] # Array to keep track of consolidators self.context.consolidators = {} # Dictionary to keep track of all leg details across time self.positionTracking = {} # Assign the DEFAULT_PARAMETERS self.AddConfiguration(**SetupBaseStructure.DEFAULT_PARAMETERS) self.SetBacktestCutOffTime() # Set charting self.context.charting = Charting( self.context, openPositions=False, Stats=False, PnL=False, WinLossStats=False, Performance=True, LossDetails=False, totalSecurities=False, Trades=True ) return self # Called every time a security (Option or Equity/Index) is initialized def CompleteSecurityInitializer(self, security: Security) -> None: '''Initialize the security with raw prices''' self.context.logger.debug(f"{self.__class__.__name__} -> CompleteSecurityInitializer -> Security: {security}") # Disable buying power on the security: https://www.quantconnect.com/docs/v2/writing-algorithms/live-trading/trading-and-orders#10-Disable-Buying-Power security.set_buying_power_model(BuyingPowerModel.NULL) if self.context.LiveMode: return self.context.executionTimer.start() security.SetDataNormalizationMode(DataNormalizationMode.Raw) security.SetMarketPrice(self.context.GetLastKnownPrice(security)) # security.SetBuyingPowerModel(AlwaysBuyingPowerModel(self.context)) # override margin requirements # security.SetBuyingPowerModel(ConstantBuyingPowerModel(1)) if security.Type == SecurityType.Equity: # This is for stocks security.VolatilityModel = StandardDeviationOfReturnsVolatilityModel(30) history = self.context.History(security.Symbol, 31, Resolution.Daily) if history.empty or 'close' not in history.columns: self.context.executionTimer.stop() return for time, row in history.loc[security.Symbol].iterrows(): trade_bar = TradeBar(time, security.Symbol, row.open, row.high, row.low, row.close, row.volume) security.VolatilityModel.Update(security, trade_bar) elif security.Type in [SecurityType.Option, SecurityType.IndexOption]: # This is for options. security.SetFillModel(BetaFillModel(self.context)) # security.SetFillModel(MidPriceFillModel(self)) security.SetFeeModel(TastyWorksFeeModel()) security.PriceModel = OptionPriceModels.CrankNicolsonFD() # security.set_option_assignment_model(NullOptionAssignmentModel()) if security.Type == SecurityType.IndexOption: # disable option assignment. This is important for SPX but we disable for all for now. security.SetOptionAssignmentModel(NullOptionAssignmentModel()) self.context.executionTimer.stop() def ClearSecurity(self, security: Security) -> None: """ Remove any additional data or settings associated with the security. """ # Remove the security from the optionContractsSubscriptions dictionary if security.Symbol in self.context.optionContractsSubscriptions: self.context.optionContractsSubscriptions.remove(security.Symbol) # Remove the security from the algorithm self.context.RemoveSecurity(security.Symbol) def SetBacktestCutOffTime(self) -> None: # Determine what is the last trading day of the backtest self.context.endOfBacktestCutoffDttm = None if hasattr(self.context, "EndDate") and self.context.EndDate is not None: self.context.endOfBacktestCutoffDttm = datetime.combine(self.context.lastTradingDay(self.context.EndDate), self.context.backtestMarketCloseCutoffTime) def AddConfiguration(self, parent=None, **kwargs) -> None: """ Dynamically add attributes to the self.context object. :param parent: Parent object to which the attributes will be added. :param kwargs: Keyword arguments containing attribute names and their values. """ parent = parent or self.context for attr_name, attr_value in kwargs.items(): setattr(parent, attr_name, attr_value) # Add the underlying and the option chain to the algorithm. We define the number of strikes left and right, # the dte and the dte window. These parameters are used in the option chain filter function. # @param ticker [string] def AddUnderlying(self, strategy, ticker): self.context.strategies.append(strategy) # Store the algorithm base variables strategy.ticker = ticker self.context.logger.debug(f"{self.__class__.__name__} -> AddUnderlying -> Ticker: {ticker}") # Add the underlying and the option chain to the algorithm strategy.dataHandler = DataHandler(self.context, ticker, strategy) underlying = strategy.dataHandler.AddUnderlying(self.context.timeResolution) # Set data normalization mode to Raw underlying.SetDataNormalizationMode(DataNormalizationMode.Raw) self.context.logger.debug(f"{self.__class__.__name__} -> AddUnderlying -> Underlying: {underlying}") # Keep track of the option contract subscriptions self.context.optionContractsSubscriptions = [] # Store the symbol for the option and the underlying strategy.underlyingSymbol = underlying.Symbol # REGION FOR USING SLICE INSTEAD OF PROVIDER if strategy.useSlice: option = strategy.dataHandler.AddOptionsChain(underlying, self.context.timeResolution) # Set the option chain filter function option.SetFilter(strategy.dataHandler.SetOptionFilter) self.context.logger.debug(f"{self.__class__.__name__} -> AddUnderlying -> Option: {option}") strategy.optionSymbol = option.Symbol else: strategy.optionSymbol = None # Set the benchmark. self.context.SetBenchmark(underlying.Symbol) self.context.logger.debug(f"{self.__class__.__name__} -> AddUnderlying -> Benchmark: {self.context.Benchmark}") # Creating a 5-minute consolidator. # self.AddConsolidators(strategy.underlyingSymbol, 5) # !IMPORTANT # ! this schedule needs to happen only once on initialization. That means the method AddUnderlying # ! needs to be called only once either in the main.py file or in the AlphaModel class. self.context.Schedule.On( self.context.DateRules.EveryDay(strategy.underlyingSymbol), self.context.TimeRules.AfterMarketOpen(strategy.underlyingSymbol, minutesAfterOpen=1), self.MarketOpenStructure ) return self def AddConsolidators(self, symbol, minutes=5): consolidator = TradeBarConsolidator(timedelta(minutes=minutes)) # Subscribe to the DataConsolidated event consolidator.DataConsolidated += self.onDataConsolidated self.context.SubscriptionManager.AddConsolidator(symbol, consolidator) self.context.consolidators[symbol] = consolidator def onDataConsolidated(self, sender, bar): for strategy in self.context.strategies: # We don't have the underlying added yet, so we can't get the price. if strategy.underlyingSymbol == None: return strategy.dataConsolidated(sender, bar) self.context.charting.updateUnderlying(bar) # NOTE: this is not needed anymore as we have another method in alpha that handles it. def MarketOpenStructure(self): """ The MarketOpenStructure method is part of the SetupBaseStructure class, which is used to set up the base structure of the algorithm in the main.py file. This specific method is designed to be called at market open every day to update the price of the underlying security. It first checks if the underlying symbol has been added to the context, and if not, it returns without performing any action. If the underlying symbol is available, it creates an instance of the Underlying class using the context and the symbol. Finally, it updates the underlying price at the market open by calling the Price() method on the Underlying instance. Example: Schedule the MarketOpenStructure method to be called at market open self.Schedule.On(self.DateRules.EveryDay(), self.TimeRules.AfterMarketOpen(self.strategy.underlyingSymbol, 0), base_structure.MarketOpenStructure) Other methods, like OnData, can now access the updated underlying price using self.context.underlyingPriceAtOpen """ for strategy in self.context.strategies: # We don't have the underlying added yet, so we can't get the price. if strategy.underlyingSymbol == None: return underlying = Underlying(self.context, strategy.underlyingSymbol) strategy.underlyingPriceAtOpen = underlying.Price() # This just clears the workingOrders that are supposed to be expired or unfilled. It can happen when an order is not filled # for it to stay in check until next day. This will clear that out. Similar method to the monitor one. def checkOpenPositions(self): self.context.executionTimer.start() # Iterate over all option contracts and remove the expired ones from the for symbol, security in self.context.Securities.items(): # Check if the security is an option if security.Type == SecurityType.Option and security.HasData: # Check if the option has expired if security.Expiry.date() < self.context.Time.date(): self.context.logger.debug(f" >>> EXPIRED SECURITY-----> Removing expired {security.Expiry.date()} option contract {security.Symbol} from the algorithm.") # Remove the expired option contract self.ClearSecurity(security) # Remove the expired positions from the openPositions dictionary. These are positions that expired # worthless or were closed before expiration. for orderTag, orderId in list(self.context.openPositions.items()): position = self.context.allPositions[orderId] # Check if we need to cancel the order if any(self.context.Time > leg.expiry for leg in position.legs): # Remove this position from the list of open positions self.context.charting.updateStats(position) self.context.logger.debug(f" >>> EXPIRED POSITION-----> Removing expired position {orderTag} from the algorithm.") self.context.openPositions.pop(orderTag) # Remove the expired positions from the workingOrders dictionary. These are positions that expired # without being filled completely. for order in list(self.context.workingOrders.values()): position = self.context.allPositions[order.orderId] orderTag = position.orderTag orderId = position.orderId orderType = order.orderType execOrder = position[f"{orderType}Order"] # Check if we need to cancel the order if self.context.Time > execOrder.limitOrderExpiryDttm or any(self.context.Time > leg.expiry for leg in position.legs): self.context.logger.debug(f" >>> EXPIRED ORDER-----> Removing expired order {orderTag} from the algorithm.") # Remove this position from the list of open positions if orderTag in self.context.openPositions: self.context.openPositions.pop(orderTag) # Remove the cancelled position from the final output unless we are required to include it if not self.context.includeCancelledOrders: self.context.allPositions.pop(orderId) # Remove the order from the self.context.workingOrders dictionary if orderTag in self.context.workingOrders: self.context.workingOrders.pop(orderTag) # Mark the order as being cancelled position.cancelOrder(self.context, orderType=orderType, message=f"order execution expiration or legs expired") self.context.executionTimer.stop()
#region imports from AlgorithmImports import * #endregion class TastyWorksFeeModel: def GetOrderFee(self, parameters): optionFee = min(10, parameters.Order.AbsoluteQuantity * 0.5) transactionFee = parameters.Order.AbsoluteQuantity * 0.14 return OrderFee(CashAmount(optionFee + transactionFee, 'USD'))
#region imports from AlgorithmImports import * from .AlwaysBuyingPowerModel import AlwaysBuyingPowerModel from .BetaFillModel import BetaFillModel from .MidPriceFillModel import MidPriceFillModel from .TastyWorksFeeModel import TastyWorksFeeModel from .SetupBaseStructure import SetupBaseStructure from .HandleOrderEvents import HandleOrderEvents #endregion
#region imports from AlgorithmImports import * #endregion from Initialization import SetupBaseStructure from Strategy import WorkingOrder from Tools import Underlying class Base(RiskManagementModel): DEFAULT_PARAMETERS = { # The frequency (in minutes) with which each position is managed "managePositionFrequency": 1, # Profit Target Factor (Multiplier of the premium received/paid when the position was opened) "profitTarget": 0.8, # Stop Loss Multiplier, expressed as a function of the profit target (rather than the credit received) # The position is closed (Market Order) if: # Position P&L < -abs(openPremium) * stopLossMultiplier # where: # - openPremium is the premium received (positive) in case of credit strategies # - openPremium is the premium paid (negative) in case of debit strategies # # Credit Strategies (i.e. $2 credit): # - profitTarget < 1 (i.e. 0.5 -> 50% profit target -> $1 profit) # - stopLossMultiplier = 2 * profitTarget (i.e. -abs(openPremium) * stopLossMultiplier = -abs(2) * 2 * 0.5 = -2 --> stop if P&L < -2$) # Debit Strategies (i.e. $4 debit): # - profitTarget < 1 (i.e. 0.5 -> 50% profit target -> $2 profit) # - stopLossMultiplier < 1 (You can't lose more than the debit paid. i.e. stopLossMultiplier = 0.6 --> stop if P&L < -2.4$) # self.stopLossMultiplier = 3 * self.profitTarget # self.stopLossMultiplier = 0.6 "stopLossMultiplier": 1.9, # Ensures that the Stop Loss does not exceed the theoretical loss. (Set to False for Credit Calendars) "capStopLoss": True, } def __init__(self, context): self.context = context self.context.structure.AddConfiguration(parent=self, **self.getMergedParameters()) self.context.logger.debug(f"{self.__class__.__name__} -> __init__") @classmethod def getMergedParameters(cls): # Merge the DEFAULT_PARAMETERS from both classes return {**cls.DEFAULT_PARAMETERS, **getattr(cls, "PARAMETERS", {})} @classmethod def parameter(cls, key, default=None): return cls.getMergedParameters().get(key, default) # @param algorithm [QCAlgorithm] The algorithm argument that the methods receive is an instance of the base QCAlgorithm class, not your subclass of it. # @param targets [List[PortfolioTarget]] The list of targets to be ordered def ManageRisk(self, algorithm: QCAlgorithm, targets: List[PortfolioTarget]) -> List[PortfolioTarget]: # Start the timer self.context.executionTimer.start('Monitor.Base -> ManageRisk') # We are basically ignoring the current portfolio targets to be assessed for risk # and building our own based on the current open positions targets = [] self.context.logger.debug(f"{self.__class__.__name__} -> ManageRisk -> start") managePositionFrequency = max(self.managePositionFrequency, 1) # Continue the processing only if we are at the specified schedule if self.context.Time.minute % managePositionFrequency != 0: return [] # Method to allow child classes access to the manageRisk method before any changes are made self.preManageRisk() self.context.logger.debug(f"{self.__class__.__name__} -> ManageRisk -> preManageRisk") # Loop through all open positions for orderTag, orderId in list(self.context.openPositions.items()): # Skip this contract if in the meantime it has been removed by the onOrderEvent if orderTag not in self.context.openPositions: continue self.context.logger.debug(f"{self.__class__.__name__} -> ManageRisk -> looping through open positions") # Get the book position bookPosition = self.context.allPositions[orderId] # Get the order id orderId = bookPosition.orderId # Get the order tag orderTag = bookPosition.orderTag self.context.logger.debug(f"{self.__class__.__name__} -> ManageRisk -> looping through open positions -> orderTag: {orderTag}, orderId: {orderId}") # Check if this is a fully filled position if bookPosition.openOrder.filled is False: continue # Possible Scenarios: # - Credit Strategy: # -> openPremium > 0 # -> profitTarget <= 1 # -> stopLossMultiplier >= 1 # -> maxLoss = Depending on the strategy # - Debit Strategy: # -> openPremium < 0 # -> profitTarget >= 0 # -> stopLossMultiplier <= 1 # -> maxLoss = openPremium # Get the current value of the position bookPosition.getPositionValue(self.context) # Extract the positionPnL (per share) positionPnL = bookPosition.positionPnL # Exit if the positionPnL is not available (bid-ask spread is too wide) if positionPnL is None: return [] bookPosition.updatePnLRange(self.context.Time.date(), positionPnL) self.context.logger.debug(f"{self.__class__.__name__} -> ManageRisk -> looping through open positions -> orderTag: {orderTag}, orderId: {orderId} -> bookPosition: {bookPosition}") # Special method to monitor the position and handle custom actions on it. self.monitorPosition(bookPosition) # Initialize the closeReason closeReason = [] # Check if we've hit the stop loss threshold stopLossFlg = self.checkStopLoss(bookPosition) if stopLossFlg: closeReason.append("Stop Loss trigger") profitTargetFlg = self.checkProfitTarget(bookPosition) if profitTargetFlg: closeReason.append("Profit target") # Check if we've hit the Dit threshold hardDitStopFlg, softDitStopFlg = self.checkDitThreshold(bookPosition) if hardDitStopFlg: closeReason.append("Hard Dit cutoff") elif softDitStopFlg: closeReason.append("Soft Dit cutoff") # Check if we've hit the Dte threshold hardDteStopFlg, softDteStopFlg = self.checkDteThreshold(bookPosition) if hardDteStopFlg: closeReason.append("Hard Dte cutoff") elif softDteStopFlg: closeReason.append("Soft Dte cutoff") # Check if this is the last trading day before expiration and we have reached the cutoff time expiryCutoffFlg = self.checkMarketCloseCutoffDttm(bookPosition) if expiryCutoffFlg: closeReason.append("Expiration date cutoff") # Check if this is the last trading day before expiration and we have reached the cutoff time endOfBacktestCutoffFlg = self.checkEndOfBacktest() if endOfBacktestCutoffFlg: closeReason.append("End of backtest cutoff") # Set the stopLossFlg = True to force a Market Order stopLossFlg = True # Check any custom condition from the strategy to determine closure. shouldCloseFlg, customReasons = self.shouldClose(bookPosition) if shouldCloseFlg: closeReason.append(customReasons or "It should close from child") # A custom method to handle self.context.logger.debug(f"{self.__class__.__name__} -> ManageRisk -> looping through open positions -> orderTag: {orderTag}, orderId: {orderId} -> shouldCloseFlg: {shouldCloseFlg}, customReasons: {customReasons}") # Update the stats of each contract # TODO: add back this section # if self.strategyParam("includeLegDetails") and self.context.Time.minute % self.strategyParam("legDatailsUpdateFrequency") == 0: # for contract in position["contracts"]: # self.updateContractStats(bookPosition, position, contract) # if self.strategyParam("trackLegDetails"): # underlyingPrice = self.context.GetLastKnownPrice(self.context.Securities[self.context.underlyingSymbol]).Price # self.context.positionTracking[orderId][self.context.Time][f"{self.name}.underlyingPrice"] = underlyingPrice # self.context.positionTracking[orderId][self.context.Time][f"{self.name}.PnL"] = positionPnL # Check if we need to close the position if ( profitTargetFlg # We hit the profit target or stopLossFlg # We hit the stop loss (making sure we don't exceed the max loss in case of spreads) or hardDteStopFlg # The position must be closed when reaching the DTE threshold (hard stop) or softDteStopFlg # Soft DTE stop: close as soon as it is profitable or hardDitStopFlg # The position must be closed when reaching the DIT threshold (hard stop) or softDitStopFlg # Soft DIT stop: close as soon as it is profitable or expiryCutoffFlg # This is the last trading day before expiration, we have reached the cutoff time or endOfBacktestCutoffFlg # This is the last trading day before the end of the backtest -> Liquidate all positions or shouldCloseFlg # This will be the flag that is defined by the child classes of monitor ): # Close the position targets = self.closePosition(bookPosition, closeReason, stopLossFlg=stopLossFlg) # Stop the timer self.context.executionTimer.stop('Monitor.Base -> ManageRisk') return targets """ Method to allow child classes access to the manageRisk method before any changes are made """ def preManageRisk(self): pass """ Special method to monitor the position and handle custom actions on it. These actions can be: - add a working order to open a hedge position to defend the current one - add a working order to increase the size of the position to improve avg price """ def monitorPosition(self, position): pass """ Another special method that should be ovewritten by child classes. This method can look for indicators or other decisions to close the position. """ def shouldClose(self, position): pass def checkMarketCloseCutoffDttm(self, position): if position.strategyParam('marketCloseCutoffTime') != None: return self.context.Time >= position.expiryMarketCloseCutoffDttm(self.context) else: return False def checkStopLoss(self, position): # Get the Stop Loss multiplier stopLossMultiplier = self.stopLossMultiplier capStopLoss = self.capStopLoss # Get the amount of credit received to open the position openPremium = position.openOrder.premium # Get the quantity used to open the position positionQuantity = position.orderQuantity # Maximum Loss (pre-computed at the time of creating the order) maxLoss = position.openOrder.maxLoss * positionQuantity if capStopLoss: # Add the premium to compute the net loss netMaxLoss = maxLoss + openPremium else: netMaxLoss = float("-Inf") stopLoss = None # Check if we are using a stop loss if stopLossMultiplier is not None: # Set the stop loss amount stopLoss = -abs(openPremium) * stopLossMultiplier # Extract the positionPnL (per share) positionPnL = position.positionPnL # Tolerance level, e.g., 0.05 for 5% tolerance = 0.05 # Check if we've hit the stop loss threshold or are within the tolerance range stopLossFlg = False if stopLoss is not None and (netMaxLoss <= positionPnL <= stopLoss or netMaxLoss <= positionPnL <= stopLoss * (1 + tolerance)): stopLossFlg = True # Keep track of the midPrices of this order for faster debugging position.priceProgressList.append(round(position.orderMidPrice, 2)) return stopLossFlg def checkProfitTarget(self, position): # Get the amount of credit received to open the position openPremium = position.openOrder.premium # Extract the positionPnL (per share) positionPnL = position.positionPnL # Get the target profit amount (if it has been set at the time of creating the order) targetProfit = position.targetProfit # Set the target profit amount if the above step returned no value if targetProfit is None and self.profitTarget is not None: targetProfit = abs(openPremium) * self.profitTarget # Tolerance level, e.g., 0.05 for 5% tolerance = 0.05 # Check if we hit the profit target or are within the tolerance range profitTargetFlg = False if targetProfit is not None and (positionPnL >= targetProfit or positionPnL >= targetProfit * (1 - tolerance)): profitTargetFlg = True return profitTargetFlg def checkDitThreshold(self, position): # Get the book position bookPosition = self.context.allPositions[position.orderId] # How many days has this position been in trade for currentDit = (self.context.Time.date() - bookPosition.openFilledDttm.date()).days hardDitStopFlg = False softDitStopFlg = False # Extract the positionPnL (per share) positionPnL = position.positionPnL # Check for DTE stop if ( position.strategyParam("ditThreshold") is not None # The ditThreshold has been specified and position.strategyParam("dte") > position.strategyParam("ditThreshold") # We are using the ditThreshold only if the open DTE was larger than the threshold and currentDit >= position.strategyParam("ditThreshold") # We have reached the DTE threshold ): # Check if this is a hard DTE cutoff if ( position.strategyParam("forceDitThreshold") is True or (position.strategyParam("hardDitThreshold") is not None and currentDit >= position.strategyParam("hardDitThreshold")) ): hardDitStopFlg = True # closeReason = closeReason or "Hard DIT cutoff" # Check if this is a soft DTE cutoff elif positionPnL >= 0: softDitStopFlg = True # closeReason = closeReason or "Soft DIT cutoff" return hardDitStopFlg, softDitStopFlg def checkDteThreshold(self, position): hardDteStopFlg = False softDteStopFlg = False # Extract the positionPnL (per share) positionPnL = position.positionPnL # How many days to expiration are left for this position currentDte = (position.expiry.date() - self.context.Time.date()).days # Check for DTE stop if ( position.strategyParam("dteThreshold") is not None # The dteThreshold has been specified and position.strategyParam("dte") > position.strategyParam("dteThreshold") # We are using the dteThreshold only if the open DTE was larger than the threshold and currentDte <= position.strategyParam("dteThreshold") # We have reached the DTE threshold ): # Check if this is a hard DTE cutoff if position.strategyParam("forceDteThreshold") is True: hardDteStopFlg = True # closeReason = closeReason or "Hard DTE cutoff" # Check if this is a soft DTE cutoff elif positionPnL >= 0: softDteStopFlg = True # closeReason = closeReason or "Soft DTE cutoff" return hardDteStopFlg, softDteStopFlg def checkEndOfBacktest(self): if self.context.endOfBacktestCutoffDttm is not None and self.context.Time >= self.context.endOfBacktestCutoffDttm: return True return False def closePosition(self, position, closeReason, stopLossFlg=False): # Start the timer self.context.executionTimer.start() targets = [] # Get the context context = self.context # Get the strategy parameters # parameters = self.parameters # Get Order Id and expiration orderId = position.orderId # expiryStr = position.expiryStr orderTag = position.orderTag orderMidPrice = position.orderMidPrice limitOrderPrice = position.limitOrderPrice bidAskSpread = position.bidAskSpread # Get the details currently open position openPosition = context.openPositions[orderTag] # Get the book position bookPosition = context.allPositions[orderId] # Get the last trading day before expiration expiryLastTradingDay = bookPosition.expiryLastTradingDay(context) # Get the date/time threshold by which the position must be closed (on the last trading day before expiration) expiryMarketCloseCutoffDttm = None if bookPosition.strategyParam("marketCloseCutoffTime") != None: expiryMarketCloseCutoffDttm = bookPosition.expiryMarketCloseCutoffDttm(context) # Get the contracts and their side contracts = [l.contract for l in bookPosition.legs] contractSide = bookPosition.contractSide # Set the expiration threshold at 15:40 of the expiration date (but no later than the market close cut-off time). expirationThreshold = None if expiryMarketCloseCutoffDttm != None: expirationThreshold = min(expiryLastTradingDay + timedelta(hours=15, minutes=40), expiryMarketCloseCutoffDttm + bookPosition.strategyParam("limitOrderExpiration")) # Set the expiration date for the Limit order. Make sure it does not exceed the expiration threshold limitOrderExpiryDttm = min(context.Time + bookPosition.strategyParam("limitOrderExpiration"), expirationThreshold) else: limitOrderExpiryDttm = min(context.Time + bookPosition.strategyParam("limitOrderExpiration"), expiryLastTradingDay + timedelta(hours=15, minutes=40)) # Determine if we are going to use a Limit Order useLimitOrders = ( # Check if we are supposed to use Limit orders as a default bookPosition.strategyParam("useLimitOrders") # Make sure there is enough time left to expiration. # Once we cross the expiration threshold (10 minutes from market close on the expiration day) we are going to submit a Market order and (expirationThreshold is None or context.Time <= expirationThreshold) # It's not a stop loss (stop losses are executed through a Market order) and not stopLossFlg ) # Determine if we are going to use a Market Order useMarketOrders = not useLimitOrders # Get the price of the underlying at the time of closing the position priceAtClose = None if context.Securities.ContainsKey(bookPosition.underlyingSymbol()): if context.Securities[bookPosition.underlyingSymbol()] is not None: priceAtClose = context.Securities[bookPosition.underlyingSymbol()].Close else: self.context.logger.warning("priceAtClose is None") # Set the midPrice for the order to close bookPosition.closeOrder.orderMidPrice = orderMidPrice # Set the Limit order expiration. bookPosition.closeOrder.limitOrderExpiryDttm = limitOrderExpiryDttm # Set the timestamp when the closing order is created bookPosition.closeDttm = context.Time # Set the date when the closing order is created bookPosition.closeDt = context.Time.strftime("%Y-%m-%d") # Set the price of the underlying at the time of submitting the order to close bookPosition.underlyingPriceAtOrderClose = priceAtClose # Set the price of the underlying at the time of submitting the order to close: # - This is the same as underlyingPriceAtOrderClose in case of Market Orders # - In case of Limit orders, this is the actual price of the underlying at the time when the Limit Order was triggered (price is updated later by the manageLimitOrders method) bookPosition.underlyingPriceAtClose = priceAtClose # Set the mid-price of the position at the time of closing bookPosition.closeOrderMidPrice = orderMidPrice bookPosition.closeOrderMidPriceMin = orderMidPrice bookPosition.closeOrderMidPriceMax = orderMidPrice # Set the Limit Order price of the position at the time of closing bookPosition.closeOrderLimitPrice = limitOrderPrice bookPosition.closeOrder.limitOrderPrice = limitOrderPrice # Set the close DTE bookPosition.closeDTE = (bookPosition.expiry.date() - context.Time.date()).days # Set the Days in Trade bookPosition.DIT = (context.Time.date() - bookPosition.openFilledDttm.date()).days # Set the close reason bookPosition.closeReason = closeReason if useMarketOrders: # Log the parameters used to validate the order self.context.logger.debug("Executing Market Order to close the position:") self.context.logger.debug(f" - orderTag: {orderTag}") self.context.logger.debug(f" - strikes: {[c.Strike for c in contracts]}") self.context.logger.debug(f" - orderQuantity: {bookPosition.orderQuantity}") self.context.logger.debug(f" - midPrice: {orderMidPrice}") self.context.logger.debug(f" - bidAskSpread: {bidAskSpread}") self.context.logger.debug(f" - closeReason: {closeReason}") # Store the Bid-Ask spread at the time of executing the order bookPosition["closeOrderBidAskSpread"] = bidAskSpread # legs = [] # isComboOrder = len(contracts) > 1 if useMarketOrders: position.limitOrder = False elif useLimitOrders: position.limitOrder = True for leg in position.legs: # Extract order parameters symbol = leg.symbol orderSide = leg.orderSide # orderQuantity = leg.orderQuantity # TODO: I'm not sure about this order side check here if orderSide != 0: targets.append(PortfolioTarget(symbol, orderSide)) # Submit the close orders context.workingOrders[orderTag] = WorkingOrder( targets=targets, orderId=orderId, useLimitOrder=useLimitOrders, limitOrderPrice=limitOrderPrice, orderType="close", fills=0 ) # Stop the timer context.executionTimer.stop() return targets # Optional: Be notified when securities change def OnSecuritiesChanged(self, algorithm: QCAlgorithm, changes: SecurityChanges) -> None: pass
#region imports from AlgorithmImports import * #endregion from .Base import Base from CustomIndicators import ATRLevels from Tools import Underlying from Strategy import WorkingOrder class CCMonitor(Base): DEFAULT_PARAMETERS = { # The frequency (in minutes) with which each position is managed "managePositionFrequency": 5, # Profit Target Factor (Multiplier of the premium received/paid when the position was opened) "profitTarget": 1.5, # Stop Loss Multiplier, expressed as a function of the profit target (rather than the credit received) # The position is closed (Market Order) if: # Position P&L < -abs(openPremium) * stopLossMultiplier # where: # - openPremium is the premium received (positive) in case of credit strategies # - openPremium is the premium paid (negative) in case of debit strategies # # Credit Strategies (i.e. $2 credit): # - profitTarget < 1 (i.e. 0.5 -> 50% profit target -> $1 profit) # - stopLossMultiplier = 2 * profitTarget (i.e. -abs(openPremium) * stopLossMultiplier = -abs(2) * 2 * 0.5 = -2 --> stop if P&L < -2$) # Debit Strategies (i.e. $4 debit): # - profitTarget < 1 (i.e. 0.5 -> 50% profit target -> $2 profit) # - stopLossMultiplier < 1 (You can't lose more than the debit paid. i.e. stopLossMultiplier = 0.6 --> stop if P&L < -2.4$) # self.stopLossMultiplier = 3 * self.profitTarget # self.stopLossMultiplier = 0.6 "stopLossMultiplier": None, # Ensures that the Stop Loss does not exceed the theoretical loss. (Set to False for Credit Calendars) "capStopLoss": True, } def __init__(self, context): # Call the Base class __init__ method super().__init__(context) self.fiveMinuteITM = {} self.HODLOD = {} self.triggerHODLOD = {} # The dictionary of consolidators self.consolidators = dict() # self.ATRLevels = ATRLevels("ATRLevels", length = 14) # EMAs for the 8, 21 and 34 periods self.EMAs = {8: {}, 21: {}, 34: {}} # self.stdDevs = {} # Add a dictionary to keep track of whether the position reached 50% profit self.reachedHalfProfit = {} def monitorPosition(self, position): pass def shouldClose(self, position): return False, None
#region imports from AlgorithmImports import * #endregion from .Base import Base from CustomIndicators import ATRLevels from Tools import Underlying from Strategy import WorkingOrder class FPLMonitorModel(Base): DEFAULT_PARAMETERS = { # The frequency (in minutes) with which each position is managed "managePositionFrequency": 1, # Profit Target Factor (Multiplier of the premium received/paid when the position was opened) "profitTarget": 0.5, # Stop Loss Multiplier, expressed as a function of the profit target (rather than the credit received) # The position is closed (Market Order) if: # Position P&L < -abs(openPremium) * stopLossMultiplier # where: # - openPremium is the premium received (positive) in case of credit strategies # - openPremium is the premium paid (negative) in case of debit strategies # # Credit Strategies (i.e. $2 credit): # - profitTarget < 1 (i.e. 0.5 -> 50% profit target -> $1 profit) # - stopLossMultiplier = 2 * profitTarget (i.e. -abs(openPremium) * stopLossMultiplier = -abs(2) * 2 * 0.5 = -2 --> stop if P&L < -2$) # Debit Strategies (i.e. $4 debit): # - profitTarget < 1 (i.e. 0.5 -> 50% profit target -> $2 profit) # - stopLossMultiplier < 1 (You can't lose more than the debit paid. i.e. stopLossMultiplier = 0.6 --> stop if P&L < -2.4$) # self.stopLossMultiplier = 3 * self.profitTarget # self.stopLossMultiplier = 0.6 "stopLossMultiplier": 2, # Ensures that the Stop Loss does not exceed the theoretical loss. (Set to False for Credit Calendars) "capStopLoss": True, } def __init__(self, context): # Call the Base class __init__ method super().__init__(context) self.fiveMinuteITM = {} self.HODLOD = {} self.triggerHODLOD = {} # The dictionary of consolidators self.consolidators = dict() self.ATRLevels = ATRLevels("ATRLevels", length = 14) # EMAs for the 8, 21 and 34 periods self.EMAs = {8: {}, 21: {}, 34: {}} # self.stdDevs = {} # Add a dictionary to keep track of whether the position reached 50% profit self.reachedHalfProfit = {} def monitorPosition(self, position): """ TODO: # These can be complementar - check if 2x(1.0) premium was reached and increase position quantity - check if 2.5x(1.5) premium was reached and increase position """ symbol = position.underlyingSymbol() underlying = Underlying(self.context, position.underlyingSymbol()) # Check if any price in the priceProgressList reached 50% profit if any(abs(price*100) / abs(position.openOrder.fillPrice*100) <= 0.5 for price in position.priceProgressList): self.reachedHalfProfit[position.orderTag] = True # Check if the price of the position reaches 1.0 premium (adding a buffer so we can try and get a fill at 1.0) if any(price / abs(position.openOrder.fillPrice) >= 0.9 for price in position.priceProgressList): # Increase the quantity by 50% new_quantity = position.Quantity * 1.5 orderTag = position.orderTag orderId = position.orderId self.context.workingOrders[orderTag] = WorkingOrder( orderId=orderId, useLimitOrder=True, orderType="update", limitOrderPrice=1.0, fills=0, quantity=new_quantity ) bar = underlying.Security().GetLastData() stats = position.strategy.stats if bar is not None: high = bar.High low = bar.Low for period, emas in self.EMAs.items(): if symbol in emas: ema = emas[symbol] if ema.IsReady: if low <= ema.Current.Value <= high: # The price has touched the EMA stats.touchedEMAs[symbol] = True def shouldClose(self, position): """ TODO: - check if ATR indicator has been breached and exit - check if half premium was reached and close 50%-80% of position (not really possible now as we have a True/False return) """ score = 0 reason = "" stats = position.strategy.stats # Assign a score of 3 if 5m ITM threshold is met if position.orderTag in self.fiveMinuteITM and self.fiveMinuteITM[position.orderTag]: score += 3 reason = "5m ITM" # Assign a score of 1 if HOD/LOD breach occurs if position.orderTag in self.HODLOD and self.HODLOD[position.orderTag] and position.underlyingSymbol() in stats.touchedEMAs and stats.touchedEMAs[position.underlyingSymbol()]: score += 1 reason = "HOD/LOD" # Assign a score of 2 if the position reached 50% profit and is now at break-even or slight loss if position.orderTag in self.reachedHalfProfit and self.reachedHalfProfit[position.orderTag] and position.positionPnL <= 0: score += 2 reason = "Reached 50% profit" # Return True if the total score is 3 or more if score >= 3: return True, reason return False, "" def preManageRisk(self): # Check if it's time to plot and return if it's not the 1 hour mark if self.context.Time.minute % 60 != 0: return # Plot ATR Levels on the "Underlying Price" chart for i, level in enumerate(self.ATRLevels.BullLevels()[:3]): self.context.Plot("Underlying Price", f"Bull Level {i+1}", level) for i, level in enumerate(self.ATRLevels.BearLevels()[:3]): self.context.Plot("Underlying Price", f"Bear Level {i+1}", level) # Loop through all open positions for _orderTag, orderId in list(self.context.openPositions.items()): # Get the book position bookPosition = self.context.allPositions[orderId] # TODO: # if price is 1.0x premium received, increase position quantity # if price is 1.5x premium received, increase position quantity return super().preManageRisk() def on5MinuteData(self, sender: object, consolidated_bar: TradeBar) -> None: """ On a new 5m bar we check if we should close the position. """ # pass for _orderTag, orderId in list(self.context.openPositions.items()): # Get the book position bookPosition = self.context.allPositions[orderId] # if bookPosition.strategyId == "IronCondor": # continue # self.handleHODLOD(bookPosition, consolidated_bar) self.handleFiveMinuteITM(bookPosition, consolidated_bar) def on15MinuteData(self, sender: object, consolidated_bar: TradeBar) -> None: for _orderTag, orderId in list(self.context.openPositions.items()): # Get the book position bookPosition = self.context.allPositions[orderId] if bookPosition.strategyId == "IronCondor": continue self.handleHODLOD(bookPosition, consolidated_bar) # pass def OnSecuritiesChanged(self, algorithm: QCAlgorithm, changes: SecurityChanges) -> None: super().OnSecuritiesChanged(algorithm, changes) for security in changes.AddedSecurities: if security.Type != SecurityType.Equity and security.Type != SecurityType.Index: continue self.context.logger.info(f"Adding consolidator for {security.Symbol}") self.consolidators[security.Symbol] = [] # Creating a 5-minute consolidator. consolidator5m = TradeBarConsolidator(timedelta(minutes=5)) consolidator5m.DataConsolidated += self.on5MinuteData self.context.SubscriptionManager.AddConsolidator(security.Symbol, consolidator5m) self.consolidators[security.Symbol].append(consolidator5m) # Creating a 15-minute consolidator. consolidator15m = TradeBarConsolidator(timedelta(minutes=15)) consolidator15m.DataConsolidated += self.on15MinuteData self.context.SubscriptionManager.AddConsolidator(security.Symbol, consolidator15m) self.consolidators[security.Symbol].append(consolidator15m) # Creating the Daily ATRLevels indicator self.context.RegisterIndicator(security.Symbol, self.ATRLevels, Resolution.Daily) self.context.WarmUpIndicator(security.Symbol, self.ATRLevels, Resolution.Daily) # Creating the EMAs for period in self.EMAs.keys(): ema = ExponentialMovingAverage(period) self.EMAs[period][security.Symbol] = ema self.context.RegisterIndicator(security.Symbol, ema, consolidator15m) # Creating the Standard Deviation indicator # self.stdDevs[security.Symbol] = StandardDeviation(20) # self.context.RegisterIndicator(security.Symbol, self.stdDevs[security.Symbol], consolidator5m) # NOTE: commented out as for some reason in the middle of the backtest SPX is removed from the universe???! # for security in changes.RemovedSecurities: # if security.Type != SecurityType.Equity and security.Type != SecurityType.Index: # continue # if security.Symbol not in self.consolidators: # continue # self.context.logger.info(f"Removing consolidator for {security.Symbol}") # consolidator = self.consolidators.pop(security.Symbol) # self.context.SubscriptionManager.RemoveConsolidator(security.Symbol, consolidator) # consolidator.DataConsolidated -= self.onFiveMinuteData def handleHODLOD(self, bookPosition, consolidated_bar): stats = bookPosition.strategy.stats # Get the high/low of the day before the update highOfDay = stats.highOfTheDay lowOfDay = stats.lowOfTheDay # currentDay = self.context.Time.date() if bookPosition.orderTag not in self.triggerHODLOD: self.triggerHODLOD[bookPosition.orderTag] = RollingWindow[bool](2) # basically wait 25 minutes before triggering if bookPosition.strategyId == 'CallCreditSpread' and consolidated_bar.Close > highOfDay: self.triggerHODLOD[bookPosition.orderTag].Add(True) elif bookPosition.strategyId == "PutCreditSpread" and consolidated_bar.Close < lowOfDay: self.triggerHODLOD[bookPosition.orderTag].Add(True) if bookPosition.orderTag in self.triggerHODLOD: # Check if all values are True and the RollingWindow is full if all(self.triggerHODLOD[bookPosition.orderTag]) and self.triggerHODLOD[bookPosition.orderTag].IsReady: self.HODLOD[bookPosition.orderTag] = True def handleFiveMinuteITM(self, bookPosition, consolidated_bar): soldLeg = [leg for leg in bookPosition.legs if leg.isSold][0] # Check if we should close the position if bookPosition.strategyId == 'CallCreditSpread' and consolidated_bar.Close > soldLeg.strike: self.fiveMinuteITM[bookPosition.orderTag] = True elif bookPosition.strategyId == "PutCreditSpread" and consolidated_bar.Close < soldLeg.strike: self.fiveMinuteITM[bookPosition.orderTag] = True
#region imports from AlgorithmImports import * #endregion from .Base import Base class HedgeRiskManagementModel(Base): def __init__(self, context): # Call the Base class __init__ method super().__init__(context)
# region imports from AlgorithmImports import * # endregion from .Base import Base from CustomIndicators import ATRLevels from Tools import Underlying from Strategy import WorkingOrder class IBSMonitor(Base): DEFAULT_PARAMETERS = { # The frequency (in minutes) with which each position is managed "managePositionFrequency": 5, # Profit Target Factor (Multiplier of the premium received/paid when the position was opened) "profitTarget": 1.2, # Stop Loss Multiplier, expressed as a function of the profit target (rather than the credit received) # The position is closed (Market Order) if: # Position P&L < -abs(openPremium) * stopLossMultiplier # where: # - openPremium is the premium received (positive) in case of credit strategies # - openPremium is the premium paid (negative) in case of debit strategies # # Credit Strategies (i.e. $2 credit): # - profitTarget < 1 (i.e. 0.5 -> 50% profit target -> $1 profit) # - stopLossMultiplier = 2 * profitTarget (i.e. -abs(openPremium) * stopLossMultiplier = -abs(2) * 2 * 0.5 = -2 --> stop if P&L < -2$) # Debit Strategies (i.e. $4 debit): # - profitTarget < 1 (i.e. 0.5 -> 50% profit target -> $2 profit) # - stopLossMultiplier < 1 (You can't lose more than the debit paid. i.e. stopLossMultiplier = 0.6 --> stop if P&L < -2.4$) # self.stopLossMultiplier = 3 * self.profitTarget # self.stopLossMultiplier = 0.6 "stopLossMultiplier": 2, # Ensures that the Stop Loss does not exceed the theoretical loss. (Set to False for Credit Calendars) "capStopLoss": True, } def __init__(self, context): # Call the Base class __init__ method super().__init__(context) self.fiveMinuteITM = {} self.HODLOD = {} self.triggerHODLOD = {} # The dictionary of consolidators self.consolidators = dict() # self.ATRLevels = ATRLevels("ATRLevels", length = 14) # EMAs for the 8, 21 and 34 periods self.EMAs = {8: {}, 21: {}, 34: {}} # self.stdDevs = {} # Add a dictionary to keep track of whether the position reached 50% profit self.reachedHalfProfit = {} def monitorPosition(self, position): pass def shouldClose(self, position): return False, None
#region imports from AlgorithmImports import * #endregion from .Base import Base class NoStopLossModel(Base): DEFAULT_PARAMETERS = { # The frequency (in minutes) with which each position is managed "managePositionFrequency": 1, # Profit Target Factor (Multiplier of the premium received/paid when the position was opened) "profitTarget": 0.9, # Stop Loss Multiplier, expressed as a function of the profit target (rather than the credit received) # The position is closed (Market Order) if: # Position P&L < -abs(openPremium) * stopLossMultiplier # where: # - openPremium is the premium received (positive) in case of credit strategies # - openPremium is the premium paid (negative) in case of debit strategies # # Credit Strategies (i.e. $2 credit): # - profitTarget < 1 (i.e. 0.5 -> 50% profit target -> $1 profit) # - stopLossMultiplier = 2 * profitTarget (i.e. -abs(openPremium) * stopLossMultiplier = -abs(2) * 2 * 0.5 = -2 --> stop if P&L < -2$) # Debit Strategies (i.e. $4 debit): # - profitTarget < 1 (i.e. 0.5 -> 50% profit target -> $2 profit) # - stopLossMultiplier < 1 (You can't lose more than the debit paid. i.e. stopLossMultiplier = 0.6 --> stop if P&L < -2.4$) # self.stopLossMultiplier = 3 * self.profitTarget # self.stopLossMultiplier = 0.6 "stopLossMultiplier": None, # Ensures that the Stop Loss does not exceed the theoretical loss. (Set to False for Credit Calendars) "capStopLoss": True, } def __init__(self, context): # Call the Base class __init__ method super().__init__(context)
#region imports from AlgorithmImports import * #endregion from .Base import Base from CustomIndicators import ATRLevels from Tools import Underlying from Strategy import WorkingOrder class SPXButterflyMonitor(Base): DEFAULT_PARAMETERS = { # The frequency (in minutes) with which each position is managed "managePositionFrequency": 5, # Profit Target Factor (Multiplier of the premium received/paid when the position was opened) "profitTarget": 1, # Stop Loss Multiplier, expressed as a function of the profit target (rather than the credit received) # The position is closed (Market Order) if: # Position P&L < -abs(openPremium) * stopLossMultiplier # where: # - openPremium is the premium received (positive) in case of credit strategies # - openPremium is the premium paid (negative) in case of debit strategies # # Credit Strategies (i.e. $2 credit): # - profitTarget < 1 (i.e. 0.5 -> 50% profit target -> $1 profit) # - stopLossMultiplier = 2 * profitTarget (i.e. -abs(openPremium) * stopLossMultiplier = -abs(2) * 2 * 0.5 = -2 --> stop if P&L < -2$) # Debit Strategies (i.e. $4 debit): # - profitTarget < 1 (i.e. 0.5 -> 50% profit target -> $2 profit) # - stopLossMultiplier < 1 (You can't lose more than the debit paid. i.e. stopLossMultiplier = 0.6 --> stop if P&L < -2.4$) # self.stopLossMultiplier = 3 * self.profitTarget # self.stopLossMultiplier = 0.6 "stopLossMultiplier": 1.9, # Ensures that the Stop Loss does not exceed the theoretical loss. (Set to False for Credit Calendars) "capStopLoss": True, } def __init__(self, context): # Call the Base class __init__ method super().__init__(context) self.fiveMinuteITM = {} self.HODLOD = {} self.triggerHODLOD = {} # The dictionary of consolidators self.consolidators = dict() # self.ATRLevels = ATRLevels("ATRLevels", length = 14) # EMAs for the 8, 21 and 34 periods self.EMAs = {8: {}, 21: {}, 34: {}} # self.stdDevs = {} # Add a dictionary to keep track of whether the position reached 50% profit self.reachedHalfProfit = {} def monitorPosition(self, position): pass def shouldClose(self, position): """ TODO: - check if ATR indicator has been breached and exit - check if half premium was reached and close 50%-80% of position (not really possible now as we have a True/False return) """ score = 0 reason = "" stats = position.strategy.stats # Assign a score of 3 if 5m ITM threshold is met # if position.orderTag in self.fiveMinuteITM and self.fiveMinuteITM[position.orderTag]: # score += 3 # reason = "5m ITM" # Assign a score of 1 if HOD/LOD breach occurs # if position.orderTag in self.HODLOD and self.HODLOD[position.orderTag] and position.underlyingSymbol() in stats.touchedEMAs and stats.touchedEMAs[position.underlyingSymbol()]: # score += 1 # reason = "HOD/LOD" # Assign a score of 2 if the position reached 50% profit and is now at break-even or slight loss # if position.orderTag in self.reachedHalfProfit and self.reachedHalfProfit[position.orderTag] and position.positionPnL <= 0: # score += 2 # reason = "Reached 50% profit" # Return True if the total score is 3 or more # if score >= 3: # return True, reason return False, ""
#region imports from AlgorithmImports import * #endregion from .Base import Base from CustomIndicators import ATRLevels from Tools import Underlying from Strategy import WorkingOrder class SPXCondorMonitor(Base): DEFAULT_PARAMETERS = { # The frequency (in minutes) with which each position is managed "managePositionFrequency": 5, # Profit Target Factor (Multiplier of the premium received/paid when the position was opened) "profitTarget": 1.2, # Stop Loss Multiplier, expressed as a function of the profit target (rather than the credit received) # The position is closed (Market Order) if: # Position P&L < -abs(openPremium) * stopLossMultiplier # where: # - openPremium is the premium received (positive) in case of credit strategies # - openPremium is the premium paid (negative) in case of debit strategies # # Credit Strategies (i.e. $2 credit): # - profitTarget < 1 (i.e. 0.5 -> 50% profit target -> $1 profit) # - stopLossMultiplier = 2 * profitTarget (i.e. -abs(openPremium) * stopLossMultiplier = -abs(2) * 2 * 0.5 = -2 --> stop if P&L < -2$) # Debit Strategies (i.e. $4 debit): # - profitTarget < 1 (i.e. 0.5 -> 50% profit target -> $2 profit) # - stopLossMultiplier < 1 (You can't lose more than the debit paid. i.e. stopLossMultiplier = 0.6 --> stop if P&L < -2.4$) # self.stopLossMultiplier = 3 * self.profitTarget # self.stopLossMultiplier = 0.6 "stopLossMultiplier": 2, # Ensures that the Stop Loss does not exceed the theoretical loss. (Set to False for Credit Calendars) "capStopLoss": True, } def __init__(self, context): # Call the Base class __init__ method super().__init__(context) self.fiveMinuteITM = {} self.HODLOD = {} self.triggerHODLOD = {} # The dictionary of consolidators self.consolidators = dict() # self.ATRLevels = ATRLevels("ATRLevels", length = 14) # EMAs for the 8, 21 and 34 periods self.EMAs = {8: {}, 21: {}, 34: {}} # self.stdDevs = {} # Add a dictionary to keep track of whether the position reached 50% profit self.reachedHalfProfit = {} def monitorPosition(self, position): pass def shouldClose(self, position): return False, None
#region imports from AlgorithmImports import * #endregion from .Base import Base from CustomIndicators import ATRLevels from Tools import Underlying from Strategy import WorkingOrder class SPXicMonitor(Base): DEFAULT_PARAMETERS = { # The frequency (in minutes) with which each position is managed "managePositionFrequency": 1, # Profit Target Factor (Multiplier of the premium received/paid when the position was opened) "profitTarget": 1.5, # Stop Loss Multiplier, expressed as a function of the profit target (rather than the credit received) # The position is closed (Market Order) if: # Position P&L < -abs(openPremium) * stopLossMultiplier # where: # - openPremium is the premium received (positive) in case of credit strategies # - openPremium is the premium paid (negative) in case of debit strategies # # Credit Strategies (i.e. $2 credit): # - profitTarget < 1 (i.e. 0.5 -> 50% profit target -> $1 profit) # - stopLossMultiplier = 2 * profitTarget (i.e. -abs(openPremium) * stopLossMultiplier = -abs(2) * 2 * 0.5 = -2 --> stop if P&L < -2$) # Debit Strategies (i.e. $4 debit): # - profitTarget < 1 (i.e. 0.5 -> 50% profit target -> $2 profit) # - stopLossMultiplier < 1 (You can't lose more than the debit paid. i.e. stopLossMultiplier = 0.6 --> stop if P&L < -2.4$) # self.stopLossMultiplier = 3 * self.profitTarget # self.stopLossMultiplier = 0.6 "stopLossMultiplier": 1.2, # Ensures that the Stop Loss does not exceed the theoretical loss. (Set to False for Credit Calendars) "capStopLoss": True, } def __init__(self, context): # Call the Base class __init__ method super().__init__(context) self.fiveMinuteITM = {} self.HODLOD = {} self.triggerHODLOD = {} # The dictionary of consolidators self.consolidators = dict() # self.ATRLevels = ATRLevels("ATRLevels", length = 14) # EMAs for the 8, 21 and 34 periods self.EMAs = {8: {}, 21: {}, 34: {}} # self.stdDevs = {} # Add a dictionary to keep track of whether the position reached 50% profit self.reachedHalfProfit = {} def monitorPosition(self, position): pass def shouldClose(self, position): return False, None
#region imports from AlgorithmImports import * #endregion from .Base import Base class StopLossModel(Base): DEFAULT_PARAMETERS = { # The frequency (in minutes) with which each position is managed "managePositionFrequency": 1, # Profit Target Factor (Multiplier of the premium received/paid when the position was opened) "profitTarget": 0.7, # Stop Loss Multiplier, expressed as a function of the profit target (rather than the credit received) # The position is closed (Market Order) if: # Position P&L < -abs(openPremium) * stopLossMultiplier # where: # - openPremium is the premium received (positive) in case of credit strategies # - openPremium is the premium paid (negative) in case of debit strategies # # Credit Strategies (i.e. $2 credit): # - profitTarget < 1 (i.e. 0.5 -> 50% profit target -> $1 profit) # - stopLossMultiplier = 2 * profitTarget (i.e. -abs(openPremium) * stopLossMultiplier = -abs(2) * 2 * 0.5 = -2 --> stop if P&L < -2$) # Debit Strategies (i.e. $4 debit): # - profitTarget < 1 (i.e. 0.5 -> 50% profit target -> $2 profit) # - stopLossMultiplier < 1 (You can't lose more than the debit paid. i.e. stopLossMultiplier = 0.6 --> stop if P&L < -2.4$) # self.stopLossMultiplier = 3 * self.profitTarget # self.stopLossMultiplier = 0.6 "stopLossMultiplier": 2.0, # Ensures that the Stop Loss does not exceed the theoretical loss. (Set to False for Credit Calendars) "capStopLoss": True, } def __init__(self, context): # Call the Base class __init__ method super().__init__(context)
#region imports from AlgorithmImports import * #endregion # Your New Python File from .HedgeRiskManagementModel import HedgeRiskManagementModel from .NoStopLossModel import NoStopLossModel from .StopLossModel import StopLossModel from .FPLMonitorModel import FPLMonitorModel from .SPXicMonitor import SPXicMonitor from .CCMonitor import CCMonitor from .SPXButterflyMonitor import SPXButterflyMonitor from .SPXCondorMonitor import SPXCondorMonitor from .IBSMonitor import IBSMonitor
#region imports from AlgorithmImports import * #endregion from Tools import Helper # https://www.quantconnect.com/docs/v2/writing-algorithms/algorithm-framework/portfolio-construction/key-concepts # Portfolio construction scaffolding class; basic method args. class Base(PortfolioConstructionModel): def __init__(self, context): self.context = context self.context.logger.debug(f"{self.__class__.__name__} -> __init__") # Create list of PortfolioTarget objects from Insights def CreateTargets(self, algorithm: QCAlgorithm, insights: List[Insight]) -> List[PortfolioTarget]: # super().CreateTargets(algorithm, insights) targets = [] for insight in insights: self.context.logger.debug(f'Insight: {insight.Id}') # Let's find the order that this insight belongs to order = Helper().findIn( self.context.workingOrders.values(), lambda v: any(i.Id == insight.Id for i in v.insights)) position = self.context.allPositions[order.orderId] target = PortfolioTarget(insight.Symbol, insight.Direction * position.orderQuantity) self.context.logger.debug(f'Target: {target.Symbol} {target.Quantity}') order.targets.append(target) targets.append(target) return targets # Determines if the portfolio should rebalance based on the provided rebalancing func # def IsRebalanceDue(self, insights: List[Insight], algorithmUtc: datetime) -> bool: # return True # # Determines the target percent for each insight # def DetermineTargetPercent(self, activeInsights: List[Insight]) -> Dict[Insight, float]: # return {} # # Gets the target insights to calculate a portfolio target percent for, they will be piped to DetermineTargetPercent() # def GetTargetInsights(self) -> List[Insight]: # return [] # # Determine if the portfolio construction model should create a target for this insight # def ShouldCreateTargetForInsight(self, insight: Insight) -> bool: # return True # OPTIONAL: Security change details # def OnSecuritiesChanged(self, algorithm: QCAlgorithm, changes: SecurityChanges) -> None: # # Security additions and removals are pushed here. # # This can be used for setting up algorithm state. # # changes.AddedSecurities: # # changes.RemovedSecurities: # pass
#region imports from AlgorithmImports import * #endregion from .Base import Base class OptionsPortfolioConstruction(Base): def __init__(self, context): # Call the Base class __init__ method super().__init__(context)
#region imports from AlgorithmImports import * #endregion # https://www.quantconnect.com/docs/v2/writing-algorithms/algorithm-framework/portfolio-construction/key-concepts # Portfolio construction scaffolding class; basic method args. class OptionsPortfolioConstructionModel(PortfolioConstructionModel): def __init__(self, context): pass # Create list of PortfolioTarget objects from Insights def CreateTargets(self, algorithm: QCAlgorithm, insights: List[Insight]) -> List[PortfolioTarget]: return [] # Determines if the portfolio should rebalance based on the provided rebalancing func def IsRebalanceDue(self, insights: List[Insight], algorithmUtc: datetime) -> bool: return True # Determines the target percent for each insight def DetermineTargetPercent(self, activeInsights: List[Insight]) -> Dict[Insight, float]: return {} # Gets the target insights to calculate a portfolio target percent for, they will be piped to DetermineTargetPercent() def GetTargetInsights(self) -> List[Insight]: return [] # Determine if the portfolio construction model should create a target for this insight def ShouldCreateTargetForInsight(self, insight: Insight) -> bool: return True # OPTIONAL: Security change details def OnSecuritiesChanged(self, algorithm: QCAlgorithm, changes: SecurityChanges) -> None: # Security additions and removals are pushed here. # This can be used for setting up algorithm state. # changes.AddedSecurities: # changes.RemovedSecurities: pass
#region imports from AlgorithmImports import * #endregion # Your New Python File from .OptionsPortfolioConstruction import OptionsPortfolioConstruction
#region imports from AlgorithmImports import * #endregion import matplotlib.pyplot as plt import mplfinance import numpy as np from pandas.plotting import register_matplotlib_converters register_matplotlib_converters() # Your New Python File class Charting: def __init__(self, data, symbol = None): self.data = data self.symbol = symbol def plot(self): mplfinance.plot(self.data, type='candle', style='charles', title=f'{self.symbol.Value if self.symbol else "General"} OHLC', ylabel='Price ($)', figratio=(15, 10))
#region imports from AlgorithmImports import * #endregion # Your New Python File from .Charting import Charting
#region imports from AlgorithmImports import * #endregion import dataclasses from dataclasses import dataclass, field from operator import attrgetter from typing import Dict, List, Optional from Tools import ContractUtils import importlib from Tools import Helper, ContractUtils, Logger, Underlying """ Use it like this: position_key = "some_key" # Replace with an appropriate key position_data = Position(orderId="12345", orderTag="SPX_Put", Strategy="CreditPutSpread", StrategyTag="CPS", expiryStr="20220107", openDttm="2022-01-07 09:30:00", openDt="2022-01-07", openDTE=0, targetPremium=500, orderQuantity=1, maxOrderQuantity=5, openOrderMidPrice=10.0, openOrderMidPriceMin=9.0, openOrderMidPriceMax=11.0, openOrderBidAskSpread=1.0, openOrderLimitPrice=10.0, underlyingPriceAtOpen=4500.0) # Create Leg objects for sold and bought options sold_put_leg = Leg(leg_type="SoldPut", option_symbol="SPXW220107P4500", quantity=-1, strike=4500, expiry="20220107") bought_put_leg = Leg(leg_type="BoughtPut", option_symbol="SPXW220107P4490", quantity=1, strike=4490, expiry="20220107") # Add the Leg objects to the Position's legs attribute position_data.legs.extend([sold_put_leg, bought_put_leg]) # Add the Position to the self.positions dictionary self.positions[position_key] = position_data """ @dataclass class _ParentBase: # With the __getitem__ and __setitem__ methods here we are transforming the # dataclass into a regular dict. This method is to allow getting fields using ["field"] def __getitem__(self, key): return super().__getattribute__(key) def __setitem__(self, key, value): return super().__setattr__(key, value) r"""Skip default fields in :func:`~dataclasses.dataclass` :func:`object representation <repr()>`. Notes ----- Credit: Pietro Oldrati, 2022-05-08, Unilicense https://stackoverflow.com/a/72161437/1396928 """ def __repr__(self): """Omit default fields in object representation.""" nodef_f_vals = ( (f.name, attrgetter(f.name)(self)) for f in dataclasses.fields(self) if attrgetter(f.name)(self) != f.default ) nodef_f_repr = ", ".join(f"{name}={value}" for name, value in nodef_f_vals) return f"{self.__class__.__name__}({nodef_f_repr})" # recursive method that checks the fields of each dataclass and calls asdict if we have another dataclass referenced # otherwise it just builds a dictionary and assigns the values and keys. def asdict(self): result = {} for f in dataclasses.fields(self): fieldValue = attrgetter(f.name)(self) if isinstance(fieldValue, dict): result[f.name] = {} for k, v in fieldValue.items(): if hasattr(type(v), "__dataclass_fields__"): result[f.name][k] = v.asdict() else: result[f.name][k] = v elif hasattr(type(fieldValue), "__dataclass_fields__"): result[f.name] = fieldValue.asdict() else: if fieldValue != f.default: result[f.name] = fieldValue return result @dataclass class WorkingOrder(_ParentBase): positionKey: str = "" insights: List[Insight] = field(default_factory=list) targets: List[PortfolioTarget] = field(default_factory=list) orderId: str = "" strategy: str = "" # Ex: FPLModel actual class strategyTag: str = "" # Ex: FPLModel orderType: str = "" fills: int = 0 useLimitOrder: bool = True limitOrderPrice: float = 0.0 lastRetry: Optional[datetime.date] = None fillRetries: int = 0 # number retries to get a fill @dataclass class Leg(_ParentBase): key: str = "" expiry: Optional[datetime.date] = None contractSide: int = 0 # TODO: this one i think would be the one to use instead of self.contractSide symbol: str = "" quantity: int = 0 strike: float = 0.0 contract: OptionContract = None # attributes used for order placement # orderSide: int # TODO: also this i'm not sure what it brings as i can use contractSide. # orderQuantity: int # limitPrice: float @property def isCall(self): return self.contract.Right == OptionRight.Call @property def isPut(self): return self.contract.Right == OptionRight.Put @property def isSold(self): return self.contractSide == -1 @property def isBought(self): return self.contractSide == 1 @dataclass class OrderType(_ParentBase): premium: float = 0.0 fills: int = 0 limitOrderExpiryDttm: str = "" limitOrderPrice: float = 0.0 bidAskSpread: float = 0.0 midPrice: float = 0.0 midPriceMin: float = 0.0 midPriceMax: float = 0.0 limitPrice: float = 0.0 fillPrice: float = 0.0 openPremium: float = 0.0 stalePrice: bool = False filled: bool = False maxLoss: float = 0.0 transactionIds: List[int] = field(default_factory=list) priceProgressList: List[float] = field(default_factory=list) @dataclass class Position(_ParentBase): """ The position class should have a structure to hold data and attributes that define it's functionality. Like what the target premium should be or what the slippage should be. """ # These are structural attributes that never change. orderId: str = "" # Ex: 1 orderTag: str = "" # Ex: PutCreditSpread-1 strategy: str = "" # Ex: FPLModel actual class strategyTag: str = "" # Ex: FPLModel strategyId: str = "" # Ex: PutCreditSpread, IronCondor expiryStr: str = "" expiry: Optional[datetime.date] = None linkedOrderTag: str = "" targetPremium: float = 0.0 orderQuantity: int = 0 maxOrderQuantity: int = 0 targetProfit: Optional[float] = None legs: List[Leg] = field(default_factory=list) contractSide: Dict[str, int] = field(default_factory=dict) # These are attributes that change based on the position's lifecycle. # The first set of attributes are set when the position is opened. # Attributes that hold data about the order type openOrder: OrderType = field(default_factory=OrderType) closeOrder: OrderType = field(default_factory=OrderType) # Open attributes that will be set when the position is opened. openDttm: str = "" openDt: str = "" openDTE: int = 0 openOrderMidPrice: float = 0.0 openOrderMidPriceMin: float = 0.0 openOrderMidPriceMax: float = 0.0 openOrderBidAskSpread: float = 0.0 openOrderLimitPrice: float = 0.0 openPremium: float = 0.0 underlyingPriceAtOpen: float = 0.0 openFilledDttm: float = 0.0 openStalePrice: bool = False # Attributes that hold the current state of the position orderMidPrice: float = 0.0 limitOrderPrice: float = 0.0 bidAskSpread: float = 0.0 positionPnL: float = 0.0 # Close attributes that will be set when the position is closed. closeDttm: str = "" closeDt: str = "" closeDTE: float = float("NaN") closeOrderMidPrice: float = 0.0 closeOrderMidPriceMin: float = 0.0 closeOrderMidPriceMax: float = 0.0 closeOrderBidAskSpread: float = float("NaN") closeOrderLimitPrice: float = 0.0 closePremium: float = 0.0 underlyingPriceAtClose: float = float("NaN") underlyingPriceAtOrderClose: float = float("NaN") DIT: int = 0 # days in trade closeStalePrice: bool = False closeReason: List[str] = field(default_factory=list, init=False) # Other attributes that will hold the P&L and other stats. PnL: float = 0.0 PnLMin: float = 0.0 PnLMax: float = 0.0 PnLMinDIT: float = 0.0 PnLMaxDIT: float = 0.0 # Attributes that determine the status of the position. orderCancelled: bool = False filled: bool = False limitOrder: bool = False # True if we want the order to be a limit order when it is placed. priceProgressList: List[float] = field(default_factory=list) def underlyingSymbol(self): if not self.legs: raise ValueError(f"Missing legs/contracts") contracts = [v.symbol for v in self.legs] return contracts[0].Underlying def strategyModule(self): try: strategy_module = importlib.import_module(f'Alpha.{self.strategy.name}') strategy_class = getattr(strategy_module, self.strategy.name) return strategy_class except (ImportError, AttributeError): raise ValueError(f"Unknown strategy: {self.strategy}") def strategyParam(self, parameter_name): """ // Create a Position instance pos = Position( orderId="123", orderTag="ABC", strategy="TestAlphaModel", strategyTag="XYZ", expiryStr="2023-12-31" ) // Get targetProfit parameter from the position's strategy print(pos.strategyParam('targetProfit')) // 0.5 """ return self.strategyModule().parameter(parameter_name) @property def isCreditStrategy(self): return self.strategyId in ["PutCreditSpread", "CallCreditSpread", "IronCondor", "IronFly", "CreditButterfly", "ShortStrangle", "ShortStraddle", "ShortCall", "ShortPut"] @property def isDebitStrategy(self): return self.strategyId in ["DebitButterfly", "ReverseIronFly", "ReverseIronCondor", "CallDebitSpread", "PutDebitSpread", "LongStrangle", "LongStraddle", "LongCall", "LongPut"] # Slippage used to set Limit orders def getPositionValue(self, context): # Start the timer context.executionTimer.start() contractUtils = ContractUtils(context) # Get the amount of credit received to open the position openPremium = self.openOrder.premium orderQuantity = self.orderQuantity slippage = self.strategyParam("slippage") # Loop through all legs of the open position orderMidPrice = 0.0 limitOrderPrice = 0.0 bidAskSpread = 0.0 for leg in self.legs: contract = leg.contract # Reverse the original contract side orderSide = -self.contractSide[leg.symbol] # Compute the Bid-Ask spread bidAskSpread += contractUtils.bidAskSpread(contract) # Get the latest mid-price midPrice = contractUtils.midPrice(contract) # Adjusted mid-price (including slippage) adjustedMidPrice = midPrice + orderSide * slippage # Total order mid-price orderMidPrice -= orderSide * midPrice # Total Limit order mid-price (including slippage) limitOrderPrice -= orderSide * adjustedMidPrice # Add the parameters needed to place a Market/Limit order if needed leg.orderSide = orderSide leg.orderQuantity = orderQuantity leg.limitPrice = adjustedMidPrice # Check if the mid-price is positive: avoid closing the position if the Bid-Ask spread is too wide (more than 25% of the credit received) positionPnL = openPremium + orderMidPrice * orderQuantity if self.strategyParam("validateBidAskSpread") and bidAskSpread > self.strategyParam("bidAskSpreadRatio") * openPremium: context.logger.trace(f"The Bid-Ask spread is too wide. Open Premium: {openPremium}, Mid-Price: {orderMidPrice}, Bid-Ask Spread: {bidAskSpread}") positionPnL = None # Store the full mid-price of the position self.orderMidPrice = orderMidPrice # Store the Limit Order mid-price of the position (including slippage) self.limitOrderPrice = limitOrderPrice # Store the full bid-ask spread of the position self.bidAskSpread = bidAskSpread # Store the position PnL self.positionPnL = positionPnL # Stop the timer context.executionTimer.stop() def updateStats(self, context, orderType): underlying = Underlying(context, self.underlyingSymbol()) # If we do use combo orders then we might not need to do this check as it has the midPrice in there. # Store the price of the underlying at the time of submitting the Market Order self[f"underlyingPriceAt{orderType.title()}"] = underlying.Close() def updateOrderStats(self, context, orderType): # Start the timer context.executionTimer.start() # leg = next((leg for leg in self.legs if contract.Symbol == leg.symbol), None) # Get the side of the contract at the time of opening: -1 -> Short +1 -> Long # contractSide = leg.contractSide contractUtils = ContractUtils(context) # Get the contracts contracts = [v.contract for v in self.legs] # Get the slippage slippage = self.strategyParam("slippage") or 0.0 # Sign of the order: open -> 1 (use orderSide as is), close -> -1 (reverse the orderSide) orderSign = 2*int(orderType == "open")-1 # Sign of the transaction: open -> -1, close -> +1 transactionSign = -orderSign # Get the mid price of each contract prices = np.array(list(map(contractUtils.midPrice, contracts))) # Get the order sides orderSides = np.array([c.contractSide for c in self.legs]) # Total slippage totalSlippage = sum(abs(orderSides)) * slippage # Compute the total order price (including slippage) # This calculates the sum of contracts midPrice so the midPrice difference between contracts. midPrice = transactionSign * sum(orderSides * prices) - totalSlippage # Compute Bid-Ask spread bidAskSpread = sum(list(map(contractUtils.bidAskSpread, contracts))) # Store the Open/Close Fill Price (if specified) closeFillPrice = self.closeOrder.fillPrice order = self[f"{orderType}Order"] # Keep track of the Limit order mid-price range order.midPriceMin = min(order.midPriceMin, midPrice) order.midPriceMax = max(order.midPriceMax, midPrice) order.midPrice = midPrice order.bidAskSpread = bidAskSpread # Exit if we don't need to include the details # if not self.strategyParam("includeLegDetails") or context.Time.minute % self.strategyParam("legDatailsUpdateFrequency") != 0: # return # # Get the EMA memory factor # emaMemory = self.strategyParam("emaMemory") # # Compute the decay such that the contribution of each new value drops to 5% after emaMemory iterations # emaDecay = 0.05**(1.0/emaMemory) # # Update the counter (used for the average) # bookPosition["statsUpdateCount"] += 1 # statsUpdateCount = bookPosition["statsUpdateCount"] # # Compute the Greeks (retrieve it as a dictionary) # greeks = self.bsm.computeGreeks(contract).__dict__ # # Add the midPrice and PnL values to the greeks dictionary to generalize the processing loop # greeks["midPrice"] = midPrice # # List of variables for which we are going to update the stats # #vars = ["midPrice", "Delta", "Gamma", "Vega", "Theta", "Rho", "Vomma", "Elasticity", "IV"] # vars = [var.title() for var in self.strategyParam("greeksIncluded")] + ["midPrice", "IV"] # Get the fill price at the open openFillPrice = self.openOrder.fillPrice # Check if the fill price is set if not math.isnan(openFillPrice): # Compute the PnL of position. openPremium will be positive for credit and closePremium will be negative so we just add them together. self.PnL = self.openPremium + self.closePremium # Add the PnL to the list of variables for which we want to update the stats # vars.append("PnL") # greeks["PnL"] = PnL # for var in vars: # # Set the name of the field to be updated # fieldName = f"{fieldPrefix}.{var}" # strategyLeg = positionStrategyLeg[var] # # Get the latest value from the dictionary # fieldValue = greeks[var] # # Special case for the PnL # if var == "PnL" and statsUpdateCount == 2: # # Initialize the EMA for the PnL # strategyLeg.EMA = fieldValue # # Update the Min field # strategyLeg.Min = min(strategyLeg.Min, fieldValue) # # Update the Max field # strategyLeg.Max = max(strategyLeg.Max, fieldValue) # # Update the Close field (this is the most recent value of the greek) # strategyLeg.Close = fieldValue # # Update the EMA field (IMPORTANT: this must be done before we update the Avg field!) # strategyLeg.EMA = emaDecay * strategyLeg.EMA + (1-emaDecay)*fieldValue # # Update the Avg field # strategyLeg.Avg = (strategyLeg.Avg*(statsUpdateCount-1) + fieldValue)/statsUpdateCount # if self.strategyParam("trackLegDetails") and var == "IV": # if context.Time not in context.positionTracking[self.orderId]: # context.positionTracking[self.orderId][context.Time] = {"orderId": self.orderId # , "Time": context.Time # } # context.positionTracking[self.orderId][context.Time][fieldName] = fieldValue # Stop the timer context.executionTimer.stop() def updatePnLRange(self, currentDate, positionPnL): # How many days has this position been in trade for # currentDit = (self.context.Time.date() - bookPosition.openFilledDttm.date()).days currentDit = (currentDate - self.openFilledDttm.date()).days # Keep track of the P&L range throughout the life of the position (mark the DIT of when the Min/Max PnL occurs) if 100 * positionPnL < self.PnLMax: self.PnLMinDIT = currentDit self.PnLMin = min(self.PnLMin, 100 * positionPnL) if 100 * positionPnL > self.PnLMax: self.PnLMaxDIT = currentDit self.PnLMax = max(self.PnLMax, 100 * positionPnL) def expiryLastTradingDay(self, context): # Get the last trading day for the given expiration date (in case it falls on a holiday) return context.lastTradingDay(self.expiry) def expiryMarketCloseCutoffDttm(self, context): # Set the date/time threshold by which the position must be closed (on the last trading day before expiration) return datetime.combine(self.expiryLastTradingDay(context), self.strategyParam("marketCloseCutoffTime")) def cancelOrder(self, context, orderType = 'open', message = ''): self.orderCancelled = True execOrder = self[f"{orderType}Order"] orderTransactionIds = execOrder.transactionIds context.logger.info(f" >>> CANCEL-----> {orderType} order with message: {message}") context.logger.debug("Expired or the limit order was not filled in the allocated time.") context.logger.info(f"Cancel {self.orderTag} & Progress of prices: {execOrder.priceProgressList}") context.logger.info(f"Position progress of prices: {self.priceProgressList}") context.charting.updateStats(self) for id in orderTransactionIds: context.logger.info(f"Canceling order: {id}") ticket = context.Transactions.GetOrderTicket(id) ticket.Cancel()
#region imports from AlgorithmImports import * #endregion from .Position import Position, Leg, OrderType, WorkingOrder
#region imports from AlgorithmImports import * #endregion import pytest from unittest.mock import patch, call import pandas as pd @pytest.fixture def logger(mock_algorithm, mocked_logger): return mocked_logger(mock_algorithm, className="TestClass", logLevel=3) def test_logger_initialization(mock_algorithm, mocked_logger): logger = mocked_logger(mock_algorithm, className="TestClass", logLevel=3) assert logger.context == mock_algorithm assert logger.className == "TestClass" assert logger.logLevel == 3 def test_log_method(logger, mock_algorithm): with patch('sys._getframe') as mock_frame: mock_frame.return_value.f_code.co_name = 'test_function' logger.Log("Test message", trsh=2) mock_algorithm.Log.assert_called_once_with(" INFO -> TestClass.test_function: Test message") @pytest.mark.parametrize("method,expected_prefix", [ ("error", "ERROR"), ("warning", "WARNING"), ("info", "INFO"), ("debug", "DEBUG"), ("trace", "TRACE") ]) def test_log_levels(logger, mock_algorithm, method, expected_prefix): with patch('sys._getframe') as mock_frame: mock_frame.return_value.f_code.co_name = 'test_function' getattr(logger, method)("Test message") mock_algorithm.Log.assert_called_once_with(f" {expected_prefix} -> TestClass.test_function: Test message") def test_log_level_filtering(mock_algorithm, mocked_logger): logger = mocked_logger(mock_algorithm, className="TestClass", logLevel=2) with patch('sys._getframe') as mock_frame: mock_frame.return_value.f_code.co_name = 'test_function' logger.error("Error message") logger.warning("Warning message") logger.info("Info message") logger.debug("Debug message") logger.trace("Trace message") assert mock_algorithm.Log.call_count == 3 mock_algorithm.Log.assert_has_calls([ call(" ERROR -> TestClass.test_function: Error message"), call(" WARNING -> TestClass.test_function: Warning message"), call(" INFO -> TestClass.test_function: Info message") ]) def test_dataframe_logging(logger, mock_algorithm): test_data = [ {'name': 'Alice', 'age': 30}, {'name': 'Bob', 'age': 25} ] expected_output = "\n name age\nAlice 30\n Bob 25" with patch('sys._getframe') as mock_frame: mock_frame.return_value.f_code.co_name = 'test_function' with patch('pandas.DataFrame.to_string', return_value=expected_output): logger.dataframe(test_data) mock_algorithm.Log.assert_called_once_with(f" INFO -> TestClass.test_function: {expected_output}") def test_dataframe_logging_empty_data(logger, mock_algorithm): test_data = [] logger.dataframe(test_data) mock_algorithm.Log.assert_not_called() def test_dataframe_logging_dict_input(logger, mock_algorithm): test_data = {'name': ['Alice', 'Bob'], 'age': [30, 25]} expected_output = "\n name age\nAlice 30\n Bob 25" with patch('sys._getframe') as mock_frame: mock_frame.return_value.f_code.co_name = 'test_function' with patch('pandas.DataFrame.to_string', return_value=expected_output): logger.dataframe(test_data) mock_algorithm.Log.assert_called_once_with(f" INFO -> TestClass.test_function: {expected_output}")
#region imports from AlgorithmImports import * #endregion import sys import os sys.path.insert(0, os.path.abspath(os.path.join(os.path.dirname(__file__), '../../'))) import pytest from unittest.mock import patch, MagicMock, call import time def test_timer_initialization(mock_algorithm, mocked_timer): timer = mocked_timer(mock_algorithm) assert timer.context == mock_algorithm assert timer.performance == {} def test_timer_start(mock_algorithm, mocked_timer): timer = mocked_timer(mock_algorithm) with patch('time.perf_counter', return_value=100.0): timer.start('test_method') assert 'test_method' in timer.performance assert timer.performance['test_method']['startTime'] == 100.0 def test_timer_stop(mock_algorithm, mocked_timer): timer = mocked_timer(mock_algorithm) timer.performance['test_method'] = timer.performanceTemplate.copy() timer.performance['test_method']['startTime'] = 100.0 with patch('time.perf_counter', return_value=150.0): timer.stop('test_method') performance = timer.performance['test_method'] assert performance['calls'] == 1 assert performance['elapsedLast'] == 50.0 assert performance['elapsedMin'] == 50.0 assert performance['elapsedMax'] == 50.0 assert performance['elapsedTotal'] == 50.0 assert performance['elapsedMean'] == 50.0 def test_timer_show_stats(mock_algorithm, mocked_timer): timer = mocked_timer(mock_algorithm) timer.performance['method1'] = { 'calls': 2, 'elapsedMin': 10.0, 'elapsedMean': 15.0, 'elapsedMax': 20.0, 'elapsedTotal': 30.0, 'elapsedLast': 15.0, 'startTime': None } timer.performance['method2'] = { 'calls': 1, 'elapsedMin': 5.0, 'elapsedMean': 5.0, 'elapsedMax': 5.0, 'elapsedTotal': 5.0, 'elapsedLast': 5.0, 'startTime': None } timer.showStats() # Check that Log method was called with the correct arguments expected_calls = [ call("Execution Stats (method1):"), call(" --> calls:2"), call(" --> elapsedMin:0:00:10"), call(" --> elapsedMean:0:00:15"), call(" --> elapsedMax:0:00:20"), call(" --> elapsedTotal:0:00:30"), call(" --> elapsedLast:0:00:15"), call("Execution Stats (method2):"), call(" --> calls:1"), call(" --> elapsedMin:0:00:05"), call(" --> elapsedMean:0:00:05"), call(" --> elapsedMax:0:00:05"), call(" --> elapsedTotal:0:00:05"), call(" --> elapsedLast:0:00:05"), call("Summary:"), call(" --> elapsedTotal: 0:00:35") ] mock_algorithm.Log.assert_has_calls(expected_calls, any_order=True) def test_timer_multiple_methods(mock_algorithm, mocked_timer): timer = mocked_timer(mock_algorithm) with patch('time.perf_counter') as mock_time: mock_time.side_effect = [100.0, 150.0, 200.0, 300.0] timer.start('method1') timer.stop('method1') timer.start('method2') timer.stop('method2') assert 'method1' in timer.performance assert 'method2' in timer.performance assert timer.performance['method1']['elapsedTotal'] == 50.0 assert timer.performance['method2']['elapsedTotal'] == 100.0
#region imports from AlgorithmImports import * #endregion # Your New Python File
#region imports from AlgorithmImports import * #endregion # Your New Python File
#region imports from AlgorithmImports import * #endregion import pytest from unittest.mock import MagicMock, patch @pytest.fixture def mock_resolution(): return MagicMock(Minute="Minute", Hour="Hour", Daily="Daily") @pytest.fixture def mock_algorithm_imports(mock_resolution): mock_imports = MagicMock() mock_imports.Resolution = mock_resolution return mock_imports @pytest.fixture(autouse=True) def patch_algorithm_imports(mock_algorithm_imports): with patch.dict('sys.modules', {'AlgorithmImports': mock_algorithm_imports}): yield mock_algorithm_imports @pytest.fixture def mock_algorithm(): return MagicMock() @pytest.fixture def mock_qc_data(): return MagicMock() @pytest.fixture def mock_symbol(): return MagicMock() @pytest.fixture def mock_resolution_class(): class MockResolution: Minute = "Minute" Hour = "Hour" Daily = "Daily" return MockResolution @pytest.fixture def mocked_timer(patch_algorithm_imports): from Tools.Timer import Timer return Timer @pytest.fixture def mocked_logger(patch_algorithm_imports): from Tools.Logger import Logger return Logger
#region imports from AlgorithmImports import * #endregion ######################################################################################## # # # Licensed under the Apache License, Version 2.0 (the "License"); # # you may not use this file except in compliance with the License. # # You may obtain a copy of the License at http://www.apache.org/licenses/LICENSE-2.0 # # # # Unless required by applicable law or agreed to in writing, software # # distributed under the License is distributed on an "AS IS" BASIS, # # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # # See the License for the specific language governing permissions and # # limitations under the License. # # # # Copyright [2021] [Rocco Claudio Cannizzaro] # # # ######################################################################################## import numpy as np from math import * from scipy import optimize from scipy.stats import norm from Tools import Logger, ContractUtils class BSM: def __init__(self, context, tradingDays = 365.0): # Set the context self.context = context # Set the logger self.logger = Logger(context, className = type(self).__name__, logLevel = context.logLevel) # Initialize the contract utils self.contractUtils = ContractUtils(context) # Set the IR self.riskFreeRate = context.riskFreeRate # Set the number of trading days self.tradingDays = tradingDays def isITM(self, contract, spotPrice = None): # Get the current price of the underlying unless otherwise specified if spotPrice == None: spotPrice = self.contractUtils.getUnderlyingLastPrice(contract) if contract.Right == OptionRight.Call: # A Call option is in the money if the underlying price is above the strike price return contract.Strike < spotPrice else: # A Put option is in the money if the underlying price is below the strike price return spotPrice < contract.Strike def bsmD1(self, contract, sigma, tau = None, ir = None, spotPrice = None, atTime = None): # Get the DTE as a fraction of a year if tau == None: tau = self.optionTau(contract, atTime = atTime) # Use the risk free rate unless otherwise specified if ir == None: ir = self.riskFreeRate # Get the current price of the underlying unless otherwise specified if spotPrice == None: spotPrice = self.contractUtils.getUnderlyingLastPrice(contract) # Strike price strikePrice = contract.Strike # Check edge cases: # - The contract is expired -> tau = 0 # - The IV could not be computed (deep ITM or far OTM options) -> sigma = 0 if tau == 0 or sigma == 0: # Set the sign based on whether it is a Call (+1) or a Put (-1) sign = 2*int(contract.Right == OptionRight.Call)-1 if(self.isITM(contract, spotPrice = spotPrice)): # Deep ITM options: # - Call: d1 = Inf -> Delta = Norm.CDF(d1) = 1 # - Put: d1 = -Inf -> Delta = -Norm.CDF(-d1) = -1 d1 = sign * float('inf') else: # Far OTM options: # - Call: d1 = -Inf -> Delta = Norm.CDF(d1) = 0 # - Put: d1 = Inf -> Delta = -Norm.CDF(-d1) = 0 d1 = sign * float('-inf') else: d1 = (np.log(spotPrice/strikePrice) + (ir + 0.5*sigma**2)*tau)/(sigma * np.sqrt(tau)) return d1 def bsmD2(self, contract, sigma, tau = None, d1 = None, ir = None, spotPrice = None, atTime = None): # Get the DTE as a fraction of a year if tau == None: tau = self.optionTau(contract, atTime = atTime) if d1 == None: d1 = self.bsmD1(contract, sigma, tau = tau, ir = ir, spotPrice = spotPrice) # Compute D2 d2 = d1 - sigma * np.sqrt(tau) return d2 # Compute the DTE as a time fraction of the year def optionTau(self, contract, atTime = None): if atTime == None: atTime = self.context.Time # Get the expiration date and add 16 hours to the market close expiryDttm = contract.Expiry + timedelta(hours = 16) # Time until market close timeDiff = expiryDttm - atTime # Days to expiration: use the fraction of minutes until market close in case of 0-DTE (390 minutes = 6.5h -> from 9:30 to 16:00) dte = max(0, timeDiff.days, timeDiff.seconds/(60.0*390.0)) # DTE as a fraction of a year tau = dte/self.tradingDays return tau # Pricing of a European option based on the Black Scholes Merton model (without dividends) def bsmPrice(self, contract, sigma, tau = None, ir = None, spotPrice = None, atTime = None): # Get the DTE as a fraction of a year if tau == None: tau = self.optionTau(contract, atTime = atTime) # Get the current price of the underlying unless otherwise specified if spotPrice == None: spotPrice = self.contractUtils.getUnderlyingLastPrice(contract) # Compute D1 d1 = self.bsmD1(contract, sigma, tau = tau, ir = ir, spotPrice = spotPrice) # Compute D2 d2 = self.bsmD2(contract, sigma, tau = tau, d1 = d1, ir = ir, spotPrice = spotPrice) # X*e^(-r*tau) Xert = contract.Strike * np.exp(-self.riskFreeRate*tau) #Price the option if contract.Right == OptionRight.Call: # Call Option theoreticalPrice = norm.cdf(d1)*spotPrice - norm.cdf(d2)*Xert else: # Put Option theoreticalPrice = norm.cdf(-d2)*Xert - norm.cdf(-d1)*spotPrice return theoreticalPrice # Compute the Theta of an option def bsmTheta(self, contract, sigma, tau = None, d1 = None, d2 = None, ir = None, spotPrice = None, atTime = None): # Get the DTE as a fraction of a year if tau == None: tau = self.optionTau(contract, atTime = atTime) # Get the current price of the underlying unless otherwise specified if spotPrice == None: spotPrice = self.contractUtils.getUnderlyingLastPrice(contract) # Compute D1 if d1 == None: d1 = self.bsmD1(contract, sigma, tau = tau, ir = ir, spotPrice = spotPrice) # Compute D2 if d2 == None: d2 = self.bsmD2(contract, sigma, tau = tau, d1 = d1, ir = ir, spotPrice = spotPrice) # -S*N'(d1)*sigma/(2*sqrt(tau)) SNs = -(spotPrice * norm.pdf(d1) * sigma) / (2.0 * np.sqrt(tau)) # r*X*e^(-r*tau) rXert = self.riskFreeRate * contract.Strike * np.exp(-self.riskFreeRate*tau) # Compute Theta (divide by the number of trading days to get a daily Theta value) if contract.Right == OptionRight.Call: theta = (SNs - rXert * norm.cdf(d2))/self.tradingDays else: theta = (SNs + rXert * norm.cdf(-d2))/self.tradingDays return theta # Compute the Theta of an option def bsmRho(self, contract, sigma, tau = None, d1 = None, d2 = None, ir = None, spotPrice = None, atTime = None): # Get the DTE as a fraction of a year if tau == None: tau = self.optionTau(contract, atTime = atTime) # Get the current price of the underlying unless otherwise specified if spotPrice == None: spotPrice = self.contractUtils.getUnderlyingLastPrice(contract) # Compute D1 if d1 == None: d1 = self.bsmD1(contract, sigma, tau = tau, ir = ir, spotPrice = spotPrice) # Compute D2 if d2 == None: d2 = self.bsmD2(contract, sigma, tau = tau, d1 = d1, ir = ir, spotPrice = spotPrice) # tau*X*e^(-r*tau) tXert = tau * self.riskFreeRate * contract.Strike * np.exp(-self.riskFreeRate*tau) # Compute Theta if contract.Right == OptionRight.Call: rho = tXert * norm.cdf(d2) else: rho = -tXert * norm.cdf(-d2) return rho # Compute the Gamma of an option def bsmGamma(self, contract, sigma, tau = None, d1 = None, ir = None, spotPrice = None, atTime = None): # Get the DTE as a fraction of a year if tau == None: tau = self.optionTau(contract, atTime = atTime) # Get the current price of the underlying unless otherwise specified if spotPrice == None: spotPrice = self.contractUtils.getUnderlyingLastPrice(contract) # Compute D1 if d1 == None: d1 = self.bsmD1(contract, sigma, tau = tau, ir = ir, spotPrice = spotPrice) # Compute Gamma if(sigma == 0 or tau == 0): gamma = float('inf') else: gamma = norm.pdf(d1) / (spotPrice * sigma * np.sqrt(tau)) return gamma # Compute the Vega of an option def bsmVega(self, contract, sigma, tau = None, d1 = None, ir = None, spotPrice = None, atTime = None): # Get the DTE as a fraction of a year if tau == None: tau = self.optionTau(contract, atTime = atTime) # Get the current price of the underlying unless otherwise specified if spotPrice == None: spotPrice = self.contractUtils.getUnderlyingLastPrice(contract) # Compute D1 if d1 == None: d1 = self.bsmD1(contract, sigma, tau = tau, ir = ir, spotPrice = spotPrice) # Compute Vega vega = spotPrice * norm.pdf(d1) * np.sqrt(tau) return vega # Compute the Vomma of an option def bsmVomma(self, contract, sigma, tau = None, d1 = None, d2 = None, ir = None, spotPrice = None, atTime = None): # Get the DTE as a fraction of a year if tau == None: tau = self.optionTau(contract, atTime = atTime) # Get the current price of the underlying unless otherwise specified if spotPrice == None: spotPrice = self.contractUtils.getUnderlyingLastPrice(contract) # Compute D1 if d1 == None: d1 = self.bsmD1(contract, sigma, tau = tau, ir = ir, spotPrice = spotPrice) # Compute D2 if d2 == None: d2 = self.bsmD2(contract, sigma, tau = tau, d1 = d1, ir = ir, spotPrice = spotPrice) # Compute Vomma if(sigma == 0): vomma = float('inf') else: vomma = spotPrice * norm.pdf(d1) * np.sqrt(tau) * d1 * d2 / sigma return vomma # Compute Implied Volatility from the price of an option def bsmIV(self, contract, tau = None, saveIt = False): # Start the timer self.context.executionTimer.start() # Inner function used to compute the root def f(sigma, contract, tau): return self.bsmPrice(contract, sigma = sigma, tau = tau) - self.contractUtils.midPrice(contract) # First order derivative (Vega) def fprime(sigma, contract, tau): return self.bsmVega(contract, sigma = sigma, tau = tau) # Second order derivative (Vomma) def fprime2(sigma, contract, tau): return self.bsmVomma(contract, sigma = sigma, tau = tau) # Initialize the IV to zero in case anything goes wrong IV = 0 # Initialize the flag to mark whether we were able to find the root converged = False # Find the root -> Implied Volatility: Use Halley's method try: # Start the search at the lastest known value for the IV (if previously calculated) x0 = 0.1 if hasattr(contract, "BSMImpliedVolatility"): x0 = contract.BSMImpliedVolatility sol = optimize.root_scalar(f, x0 = x0, args = (contract, tau), fprime = fprime, fprime2 = fprime2, method = 'halley', xtol = 1e-6) # Get the convergence status converged = sol.converged # Set the IV if we found the root if converged: IV = sol.root except: pass # Fallback method (Bisection) if Halley's optimization failed if not converged: # Find the root -> Implied Volatility try: sol = optimize.root_scalar(f, bracket = [0.0001, 2], args = (contract, tau), xtol = 1e-6) # Get the convergence status converged = sol.converged # Set the IV if we found the root if converged: IV = sol.root except: pass # Check if we need to save the IV as an attribute of the contract object if saveIt: contract.BSMImpliedVolatility = IV # Stop the timer self.context.executionTimer.stop() # Return the result return IV # Compute the Delta of an option def bsmDelta(self, contract, sigma, tau = None, d1 = None, ir = None, spotPrice = None, atTime = None): if d1 == None: if tau == None: # Get the DTE as a fraction of a year tau = self.optionTau(contract, atTime = atTime) # Compute D1 d1 = self.bsmD1(contract, sigma, tau = tau, ir = ir, spotPrice = spotPrice) ### if (d1 == None) # Compute option delta (rounded to 2 digits) if contract.Right == OptionRight.Call: delta = norm.cdf(d1) else: delta = -norm.cdf(-d1) return delta def computeGreeks(self, contract, sigma = None, ir = None, spotPrice = None, atTime = None, saveIt = False): # Start the timer self.context.executionTimer.start("Tools.BSMLibrary -> computeGreeks") # Avoid recomputing the Greeks if we have already done it for this time bar if hasattr(contract, "BSMGreeks") and contract.BSMGreeks.lastUpdated == self.context.Time: return contract.BSMGreeks # Get the DTE as a fraction of a year tau = self.optionTau(contract, atTime = atTime) if sigma == None: # Compute Implied Volatility sigma = self.bsmIV(contract, tau = tau, saveIt = saveIt) ### if (sigma == None) # Get the current price of the underlying unless otherwise specified if spotPrice == None: spotPrice = self.contractUtils.getUnderlyingLastPrice(contract) # Compute D1 d1 = self.bsmD1(contract, sigma, tau = tau, ir = ir, spotPrice = spotPrice) # Compute D2 d2 = self.bsmD2(contract, sigma, tau = tau, d1 = d1, ir = ir, spotPrice = spotPrice) # First order derivatives delta = self.bsmDelta(contract, sigma = sigma, tau = tau, d1 = d1, ir = ir, spotPrice = spotPrice) theta = self.bsmTheta(contract, sigma, tau = tau, d1 = d1, d2 = d2, ir = ir, spotPrice = spotPrice) vega = self.bsmVega(contract, sigma, tau = tau, d1 = d1, ir = ir, spotPrice = spotPrice) rho = self.bsmRho(contract, sigma, tau = tau, d1 = d1, d2 = d2, ir = ir, spotPrice = spotPrice) # Second Order derivatives gamma = self.bsmGamma(contract, sigma, tau = tau, d1 = d1, ir = ir, spotPrice = spotPrice) vomma = self.bsmVomma(contract, sigma, tau = tau, d1 = d1, d2 = d2, ir = ir, spotPrice = spotPrice) # Lambda (a.k.a. elasticity or leverage: the percentage change in option value per percentage change in the underlying price) elasticity = delta * np.float64(spotPrice)/np.float64(self.contractUtils.midPrice(contract)) # Create a Greeks object greeks = BSMGreeks(delta = delta , gamma = gamma , vega = vega , theta = theta , rho = rho , vomma = vomma , elasticity = elasticity , IV = sigma , lastUpdated = self.context.Time ) # Check if we need to save the Greeks as an attribute of the contract object if saveIt: contract.BSMGreeks = greeks # Stop the timer self.context.executionTimer.stop("Tools.BSMLibrary -> computeGreeks") return greeks # Compute and store the Greeks for a list of contracts def setGreeks(self, contracts, sigma = None, ir = None): # Start the timer self.context.executionTimer.start("Tools.BSMLibrary -> setGreeks") if isinstance(contracts, list): # Loop through all contracts for contract in contracts: # Get the current price of the underlying spotPrice = self.contractUtils.getUnderlyingLastPrice(contract) # Compute the Greeks for the contract self.computeGreeks(contract, sigma = sigma, ir = ir, spotPrice = spotPrice, saveIt = True) else: # Get the current price of the underlying spotPrice = self.contractUtils.getUnderlyingLastPrice(contracts) # Compute the Greeks on a single contract self.computeGreeks(contracts, sigma = sigma, ir = ir, spotPrice = spotPrice, saveIt = True) # Log the contract details self.logger.trace(f"Contract: {contracts.Symbol}") self.logger.trace(f" -> Contract Mid-Price: {self.contractUtils.midPrice(contracts)}") self.logger.trace(f" -> Spot: {spotPrice}") self.logger.trace(f" -> Strike: {contracts.Strike}") self.logger.trace(f" -> Type: {'Call' if contracts.Right == OptionRight.Call else 'Put'}") self.logger.trace(f" -> IV: {contracts.BSMImpliedVolatility}") self.logger.trace(f" -> Delta: {contracts.BSMGreeks.Delta}") self.logger.trace(f" -> Gamma: {contracts.BSMGreeks.Gamma}") self.logger.trace(f" -> Vega: {contracts.BSMGreeks.Vega}") self.logger.trace(f" -> Theta: {contracts.BSMGreeks.Theta}") self.logger.trace(f" -> Rho: {contracts.BSMGreeks.Rho}") self.logger.trace(f" -> Vomma: {contracts.BSMGreeks.Vomma}") self.logger.trace(f" -> Elasticity: {contracts.BSMGreeks.Elasticity}") # Stop the timer self.context.executionTimer.stop("Tools.BSMLibrary -> setGreeks") return class BSMGreeks: def __init__(self, delta = None, gamma = None, vega = None, theta = None, rho = None, vomma = None, elasticity = None, IV = None, lastUpdated = None, precision = 5): self.Delta = self.roundIt(delta, precision) self.Gamma = self.roundIt(gamma, precision) self.Vega = self.roundIt(vega, precision) self.Theta = self.roundIt(theta, precision) self.Rho = self.roundIt(rho, precision) self.Vomma = self.roundIt(vomma, precision) self.Elasticity = self.roundIt(elasticity, precision) self.IV = self.roundIt(IV, precision) self.lastUpdated = lastUpdated def roundIt(self, value, precision = None): if precision: return round(value, precision) else: return value
#region imports from AlgorithmImports import * #endregion from Tools import Underlying class Charting: def __init__(self, context, openPositions=True, Stats=True, PnL=True, WinLossStats=True, Performance=True, LossDetails=True, totalSecurities=False, Trades=True, Distribution=True): self.context = context self.resample = datetime.min # QUANTCONNECT limitations in terms of charts # Tier Max Series Max Data Points per Series # Free 10 4,000 # Quant Researcher 10 8,000 # Team 25 16,000 # Trading Firm 25 32,000 # Institution 100 96,000 # Max datapoints set to 4000 (free), 8000 (researcher), 16000 (team) (the maximum allowed by QC) self.resamplePeriod = (context.EndDate - context.StartDate) / 8_000 # Max number of series allowed self.maxSeries = 10 self.charts = [] # Create an object to store all the stats self.stats = CustomObject() # Store the details about which charts will be plotted (there is a maximum of 10 series per backtest) self.stats.plot = CustomObject() self.stats.plot.openPositions = openPositions self.stats.plot.Stats = Stats self.stats.plot.PnL = PnL self.stats.plot.WinLossStats = WinLossStats self.stats.plot.Performance = Performance self.stats.plot.LossDetails = LossDetails self.stats.plot.totalSecurities = totalSecurities self.stats.plot.Trades = Trades self.stats.plot.Distribution = Distribution # Initialize performance metrics self.stats.won = 0 self.stats.lost = 0 self.stats.winRate = 0.0 self.stats.premiumCaptureRate = 0.0 self.stats.totalCredit = 0.0 self.stats.totalDebit = 0.0 self.stats.PnL = 0.0 self.stats.totalWinAmt = 0.0 self.stats.totalLossAmt = 0.0 self.stats.averageWinAmt = 0.0 self.stats.averageLossAmt = 0.0 self.stats.maxWin = 0.0 self.stats.maxLoss = 0.0 self.stats.testedCall = 0 self.stats.testedPut = 0 totalSecurities = Chart("Total Securities") totalSecurities.AddSeries(Series('Total Securities', SeriesType.Line, 0)) # Setup Charts if openPositions: activePositionsPlot = Chart('Open Positions') activePositionsPlot.AddSeries(Series('Open Positions', SeriesType.Line, '')) self.charts.append(activePositionsPlot) if Stats: statsPlot = Chart('Stats') statsPlot.AddSeries(Series('Won', SeriesType.Line, '', Color.Green)) statsPlot.AddSeries(Series('Lost', SeriesType.Line, '', Color.Red)) self.charts.append(statsPlot) if PnL: pnlPlot = Chart('Profit and Loss') pnlPlot.AddSeries(Series('PnL', SeriesType.Line, '')) self.charts.append(pnlPlot) if WinLossStats: winLossStatsPlot = Chart('Win and Loss Stats') winLossStatsPlot.AddSeries(Series('Average Win', SeriesType.Line, '$', Color.Green)) winLossStatsPlot.AddSeries(Series('Average Loss', SeriesType.Line, '$', Color.Red)) self.charts.append(winLossStatsPlot) if Performance: performancePlot = Chart('Performance') performancePlot.AddSeries(Series('Win Rate', SeriesType.Line, '%')) performancePlot.AddSeries(Series('Premium Capture', SeriesType.Line, '%')) self.charts.append(performancePlot) # Loss Details chart. Only relevant in case of credit strategies if LossDetails: lossPlot = Chart('Loss Details') lossPlot.AddSeries(Series('Short Put Tested', SeriesType.Line, '')) lossPlot.AddSeries(Series('Short Call Tested', SeriesType.Line, '')) self.charts.append(lossPlot) if Trades: tradesPlot = Chart('Trades') tradesPlot.AddSeries(CandlestickSeries('UNDERLYING', '$')) tradesPlot.AddSeries(Series("OPEN TRADE", SeriesType.Scatter, "", Color.Green, ScatterMarkerSymbol.Triangle)) tradesPlot.AddSeries(Series("CLOSE TRADE", SeriesType.Scatter, "", Color.Red, ScatterMarkerSymbol.TriangleDown)) self.charts.append(tradesPlot) if Distribution: distributionPlot = Chart('Distribution') distributionPlot.AddSeries(Series('Distribution', SeriesType.Bar, '')) self.charts.append(distributionPlot) # Add the charts to the context for chart in self.charts: self.context.AddChart(chart) # TODO: consider this for strategies. # Call the chart initialization method of each strategy (give a chance to setup custom charts) # for strategy in self.strategies: # strategy.setupCharts() # Add the first data point to the charts self.updateCharts() def updateUnderlying(self, bar): # Add the latest data point to the underlying chart # self.context.Plot("UNDERLYING", "UNDERLYING", bar) self.context.Plot("Trades", "UNDERLYING", bar) def updateCharts(self, symbol=None): # Start the timer self.context.executionTimer.start() # TODO: consider this for strategies. # Call the updateCharts method of each strategy (give a chance to update any custom charts) # for strategy in self.strategies: # strategy.updateCharts() # Exit if there is nothing to update if self.context.Time.time() >= time(15, 59, 0): return # self.context.logger.info(f"Time: {self.context.Time}, Resample: {self.resample}") # In order to not exceed the maximum number of datapoints, we resample the charts. if self.context.Time <= self.resample: return self.resample = self.context.Time + self.resamplePeriod plotInfo = self.stats.plot if plotInfo.Trades: # If symbol is defined then we print the symbol data on the chart if symbol is not None: underlying = Underlying(self.context, symbol) self.context.Plot("Trades", "UNDERLYING", underlying.Security().GetLastData()) if plotInfo.totalSecurities: self.context.Plot("Total Securities", "Total Securities", self.context.Securities.Count) # Add the latest stats to the plots if plotInfo.openPositions: self.context.Plot("Open Positions", "Open Positions", self.context.openPositions.Count) if plotInfo.Stats: self.context.Plot("Stats", "Won", self.stats.won) self.context.Plot("Stats", "Lost", self.stats.lost) if plotInfo.PnL: self.context.Plot("Profit and Loss", "PnL", self.stats.PnL) if plotInfo.WinLossStats: self.context.Plot("Win and Loss Stats", "Average Win", self.stats.averageWinAmt) self.context.Plot("Win and Loss Stats", "Average Loss", self.stats.averageLossAmt) if plotInfo.Performance: self.context.Plot("Performance", "Win Rate", self.stats.winRate) self.context.Plot("Performance", "Premium Capture", self.stats.premiumCaptureRate) if plotInfo.LossDetails: self.context.Plot("Loss Details", "Short Put Tested", self.stats.testedPut) self.context.Plot("Loss Details", "Short Call Tested", self.stats.testedCall) if plotInfo.Distribution: self.context.Plot("Distribution", "Distribution", 0) # Stop the timer self.context.executionTimer.stop() def plotTrade(self, trade, orderType): # Start the timer self.context.executionTimer.start() # Add the trade to the chart strikes = [] for leg in trade.legs: if trade.isCreditStrategy: if leg.isSold: strikes.append(leg.strike) else: if leg.isBought: strikes.append(leg.strike) # self.context.logger.info(f"plotTrades!! : Strikes: {strikes}") if orderType == "open": for strike in strikes: self.context.Plot("Trades", "OPEN TRADE", strike) else: for strike in strikes: self.context.Plot("Trades", "CLOSE TRADE", strike) # NOTE: this can not be made because there is a limit of 10 Series on all charts so it will fail! # for strike in strikes: # self.context.Plot("Trades", f"TRADE {strike}", strike) # Stop the timer self.context.executionTimer.stop() def updateStats(self, closedPosition): # Start the timer self.context.executionTimer.start() orderId = closedPosition.orderId # Get the position P&L positionPnL = closedPosition.PnL # Get the price of the underlying at the time of closing the position priceAtClose = closedPosition.underlyingPriceAtClose if closedPosition.isCreditStrategy: # Update total credit (the position was opened for a credit) self.stats.totalCredit += closedPosition.openPremium # Update total debit (the position was closed for a debit) self.stats.totalDebit += closedPosition.closePremium else: # Update total credit (the position was closed for a credit) self.stats.totalCredit += closedPosition.closePremium # Update total debit (the position was opened for a debit) self.stats.totalDebit += closedPosition.openPremium # Update the total P&L self.stats.PnL += positionPnL # Update Win/Loss counters if positionPnL > 0: self.stats.won += 1 self.stats.totalWinAmt += positionPnL self.stats.maxWin = max(self.stats.maxWin, positionPnL) self.stats.averageWinAmt = self.stats.totalWinAmt / self.stats.won else: self.stats.lost += 1 self.stats.totalLossAmt += positionPnL self.stats.maxLoss = min(self.stats.maxLoss, positionPnL) self.stats.averageLossAmt = -self.stats.totalLossAmt / self.stats.lost # Check if this is a Credit Strategy if closedPosition.isCreditStrategy: # Get the strikes for the sold contracts sold_puts = [leg.strike for leg in closedPosition.legs if leg.isSold and leg.isPut] sold_calls = [leg.strike for leg in closedPosition.legs if leg.isSold and leg.isCall] if sold_puts and sold_calls: # Get the short put and short call strikes shortPutStrike = min(sold_puts) shortCallStrike = max(sold_calls) # Check if the short Put is in the money if priceAtClose <= shortPutStrike: self.stats.testedPut += 1 # Check if the short Call is in the money elif priceAtClose >= shortCallStrike: self.stats.testedCall += 1 # Check if the short Put is being tested elif (priceAtClose-shortPutStrike) < (shortCallStrike - priceAtClose): self.stats.testedPut += 1 # The short Call is being tested else: self.stats.testedCall += 1 # Update the Win Rate if ((self.stats.won + self.stats.lost) > 0): self.stats.winRate = 100*self.stats.won/(self.stats.won + self.stats.lost) if self.stats.totalCredit > 0: self.stats.premiumCaptureRate = 100*self.stats.PnL/self.stats.totalCredit # Trigger an update of the charts self.updateCharts() self.plotTrade(closedPosition, "close") # Stop the timer self.context.executionTimer.stop() # Dummy class useful to create empty objects class CustomObject: pass
#region imports from AlgorithmImports import * #endregion from .Logger import Logger class ContractUtils: def __init__(self, context): # Set the context self.context = context # Set the logger self.logger = Logger(context, className=type(self).__name__, logLevel=context.logLevel) def getUnderlyingPrice(self, symbol): security = self.context.Securities[symbol] return self.context.GetLastKnownPrice(security).Price def getUnderlyingLastPrice(self, contract): # Get the context context = self.context # Get the object from the Securities dictionary if available (pull the latest price), else use the contract object itself if contract.UnderlyingSymbol in context.Securities: security = context.Securities[contract.UnderlyingSymbol] # Check if we have found the security if security is not None: # Get the last known price of the security return context.GetLastKnownPrice(security).Price else: # Get the UnderlyingLastPrice attribute of the contract return contract.UnderlyingLastPrice def getSecurity(self, contract): # Get the Securities object Securities = self.context.Securities # Check if we can extract the Symbol attribute if hasattr(contract, "Symbol") and contract.Symbol in Securities: # Get the security from the Securities dictionary if available (pull the latest price), else use the contract object itself security = Securities[contract.Symbol] else: # Use the contract itself security = contract return security # Returns the mid-price of an option contract def midPrice(self, contract): security = self.getSecurity(contract) return 0.5 * (security.BidPrice + security.AskPrice) def volume(self, contract): security = self.getSecurity(contract) return security.Volume def openInterest(self, contract): security = self.getSecurity(contract) return security.OpenInterest def delta(self, contract): security = self.getSecurity(contract) return security.Delta def gamma(self, contract): security = self.getSecurity(contract) return security.Gamma def theta(self, contract): security = self.getSecurity(contract) return security.Theta def vega(self, contract): security = self.getSecurity(contract) return security.Vega def rho(self, contract): security = self.getSecurity(contract) return security.Rho def bidPrice(self, contract): security = self.getSecurity(contract) return security.BidPrice def askPrice(self, contract): security = self.getSecurity(contract) return security.AskPrice def bidAskSpread(self, contract): security = self.getSecurity(contract) return abs(security.AskPrice - security.BidPrice)
#region imports from AlgorithmImports import * #endregion from .Underlying import Underlying from .ProviderOptionContract import ProviderOptionContract import operator class DataHandler: # The supported cash indices by QC https://www.quantconnect.com/docs/v2/writing-algorithms/datasets/tickdata/us-cash-indices#05-Supported-Indices # These need to be added using AddIndex instead of AddEquity CashIndices = ['VIX','SPX','NDX'] def __init__(self, context, ticker, strategy): self.ticker = ticker self.context = context self.strategy = strategy # Method to add the ticker[String] data to the context. # @param resolution [Resolution] # @return [Symbol] def AddUnderlying(self, resolution=Resolution.Minute): if self.__CashTicker(): return self.context.AddIndex(self.ticker, resolution=resolution) else: return self.context.AddEquity(self.ticker, resolution=resolution) # SECTION BELOW IS FOR ADDING THE OPTION CHAIN # Method to add the option chain data to the context. # @param resolution [Resolution] def AddOptionsChain(self, underlying, resolution=Resolution.Minute): if self.ticker == "SPX": # Underlying is SPX. We'll load and use SPXW and ignore SPX options (these close in the AM) return self.context.AddIndexOption(underlying.Symbol, "SPXW", resolution) elif self.__CashTicker(): # Underlying is an index return self.context.AddIndexOption(underlying.Symbol, resolution) else: # Underlying is an equity return self.context.AddOption(underlying.Symbol, resolution) # Should be called on an option object like this: option.SetFilter(self.OptionFilter) # !This method is called every minute if the algorithm resolution is set to minute def SetOptionFilter(self, universe): # Start the timer self.context.executionTimer.start('Tools.DataHandler -> SetOptionFilter') self.context.logger.debug(f"SetOptionFilter -> universe: {universe}") # Include Weekly contracts # nStrikes contracts to each side of the ATM # Contracts expiring in the range (DTE-5, DTE) filteredUniverse = universe.Strikes(-self.strategy.nStrikesLeft, self.strategy.nStrikesRight)\ .Expiration(max(0, self.strategy.dte - self.strategy.dteWindow), max(0, self.strategy.dte))\ .IncludeWeeklys() self.context.logger.debug(f"SetOptionFilter -> filteredUniverse: {filteredUniverse}") # Stop the timer self.context.executionTimer.stop('Tools.DataHandler -> SetOptionFilter') return filteredUniverse # SECTION BELOW HANDLES OPTION CHAIN PROVIDER METHODS def optionChainProviderFilter(self, symbols, min_strike_rank, max_strike_rank, minDte, maxDte): self.context.executionTimer.start('Tools.DataHandler -> optionChainProviderFilter') self.context.logger.debug(f"optionChainProviderFilter -> symbols count: {len(symbols)}") if len(symbols) == 0: self.context.logger.warning("No symbols provided to optionChainProviderFilter") return None filteredSymbols = [symbol for symbol in symbols if minDte <= (symbol.ID.Date.date() - self.context.Time.date()).days <= maxDte] self.context.logger.debug(f"Filtered symbols count: {len(filteredSymbols)}") self.context.logger.debug(f"Context Time: {self.context.Time.date()}") unique_dates = set(symbol.ID.Date.date() for symbol in symbols) self.context.logger.debug(f"Unique symbol dates: {unique_dates}") self.context.logger.debug(f"optionChainProviderFilter -> filteredSymbols: {filteredSymbols}") if not filteredSymbols: self.context.logger.warning("No symbols left after date filtering") return None if not self.__CashTicker(): filteredSymbols = [x for x in filteredSymbols if self.context.Securities[x.ID.Symbol].IsTradable] self.context.logger.debug(f"Tradable filtered symbols count: {len(filteredSymbols)}") if not filteredSymbols: self.context.logger.warning("No tradable symbols left after filtering") return None underlying = Underlying(self.context, self.strategy.underlyingSymbol) underlyingLastPrice = underlying.Price() self.context.logger.debug(f"Underlying last price: {underlyingLastPrice}") if underlyingLastPrice is None: self.context.logger.warning("Underlying price is None") return None try: atm_strike = sorted(filteredSymbols, key=lambda x: abs(x.ID.StrikePrice - underlyingLastPrice))[0].ID.StrikePrice except IndexError: self.context.logger.error("Unable to find ATM strike. Check if filteredSymbols is empty or if strike prices are available.") return None self.context.logger.debug(f"ATM strike: {atm_strike}") strike_list = sorted(set([i.ID.StrikePrice for i in filteredSymbols])) atm_strike_rank = strike_list.index(atm_strike) min_strike = strike_list[max(0, atm_strike_rank + min_strike_rank + 1)] max_strike = strike_list[min(atm_strike_rank + max_strike_rank - 1, len(strike_list)-1)] selectedSymbols = [symbol for symbol in filteredSymbols if min_strike <= symbol.ID.StrikePrice <= max_strike] self.context.logger.debug(f"Selected symbols count: {len(selectedSymbols)}") contracts = [] for symbol in selectedSymbols: self.AddOptionContracts([symbol], resolution=self.context.timeResolution) contract = ProviderOptionContract(symbol, underlyingLastPrice, self.context) contracts.append(contract) self.context.executionTimer.stop('Tools.DataHandler -> optionChainProviderFilter') return contracts def getOptionContracts(self, slice=None): self.context.executionTimer.start('Tools.DataHandler -> getOptionContracts') contracts = None minDte = max(0, self.strategy.dte - self.strategy.dteWindow) maxDte = max(0, self.strategy.dte) self.context.logger.debug(f"getOptionContracts -> minDte: {minDte}") self.context.logger.debug(f"getOptionContracts -> maxDte: {maxDte}") if slice: for chain in slice.OptionChains: if self.strategy.optionSymbol == None or chain.Key != self.strategy.optionSymbol: continue if chain.Value.Contracts.Count != 0: contracts = [ contract for contract in chain.Value if minDte <= (contract.Expiry.date() - self.context.Time.date()).days <= maxDte ] self.context.logger.debug(f"getOptionContracts -> number of contracts from slice: {len(contracts) if contracts else 0}") if contracts is None: symbols = self.context.OptionChainProvider.GetOptionContractList(self.ticker, self.context.Time) contracts = self.optionChainProviderFilter(symbols, -self.strategy.nStrikesLeft, self.strategy.nStrikesRight, minDte, maxDte) self.context.executionTimer.stop('Tools.DataHandler -> getOptionContracts') return contracts # Method to add option contracts data to the context. # @param contracts [Array] # @param resolution [Resolution] # @return [Symbol] def AddOptionContracts(self, contracts, resolution = Resolution.Minute): # Add this contract to the data subscription so we can retrieve the Bid/Ask price if self.__CashTicker(): for contract in contracts: if contract not in self.context.optionContractsSubscriptions: self.context.AddIndexOptionContract(contract, resolution) self.context.optionContractsSubscriptions.append(contract) else: for contract in contracts: if contract not in self.context.optionContractsSubscriptions: self.context.AddOptionContract(contract, resolution) self.context.optionContractsSubscriptions.append(contract) # PRIVATE METHODS # Internal method to determine if we are using a cashticker to add the data. # @returns [Boolean] def __CashTicker(self): return self.ticker in self.CashIndices
#region imports from AlgorithmImports import * #endregion class Helper: def findIn(self, data, condition): return next((v for v in data if condition(v)), None)
#region imports from AlgorithmImports import * #endregion import sys import pandas as pd from collections import deque class Logger: def __init__(self, context, className=None, logLevel=0, buffer_size=100): self.context = context self.className = className self.logLevel = logLevel self.log_buffer = deque(maxlen=buffer_size) self.current_pattern = [] self.pattern_count = 0 def Log(self, msg, trsh=0): if self.logLevel < trsh: return className = f"{self.className}." if self.className else "" prefix = ["ERROR", "WARNING", "INFO", "DEBUG", "TRACE"][min(trsh, 4)] log_msg = f"{prefix} -> {className}{sys._getframe(2).f_code.co_name}: {msg}" self.process_log(log_msg) def process_log(self, log_msg): if not self.current_pattern: self.print_log(log_msg) self.current_pattern.append(log_msg) else: pattern_index = self.find_pattern_start(log_msg) if pattern_index == -1: self.print_pattern() self.print_log(log_msg) self.current_pattern.append(log_msg) else: if pattern_index == 0: self.pattern_count += 1 else: self.print_pattern() self.print_log("--- New log cycle starts ---") self.current_pattern = self.current_pattern[pattern_index:] self.pattern_count = 1 self.log_buffer.append(log_msg) def find_pattern_start(self, log_msg): for i in range(len(self.current_pattern)): if log_msg == self.current_pattern[i]: if self.is_pattern_repeating(i): return i return -1 def is_pattern_repeating(self, start_index): pattern_length = len(self.current_pattern) - start_index if len(self.log_buffer) < pattern_length: return False return list(self.log_buffer)[-pattern_length:] == self.current_pattern[start_index:] def print_pattern(self): if self.pattern_count > 1: self.print_log(f"The following pattern repeated {self.pattern_count} times:") for msg in self.current_pattern: self.print_log(f" {msg}") elif self.pattern_count == 1: for msg in self.current_pattern: self.print_log(msg) self.pattern_count = 0 def print_log(self, msg): self.context.Log(msg) def error(self, msg): self.Log(msg, trsh=0) def warning(self, msg): self.Log(msg, trsh=1) def info(self, msg): self.Log(msg, trsh=2) def debug(self, msg): self.Log(msg, trsh=3) def trace(self, msg): self.Log(msg, trsh=4) def dataframe(self, data): if isinstance(data, list): columns = list(data[0].keys()) else: columns = list(data.keys()) df = pd.DataFrame(data, columns=columns) if df.shape[0] > 0: self.info(f"\n{df.to_string(index=False)}") def __del__(self): self.print_pattern()
#region imports from AlgorithmImports import * #endregion class Performance: def __init__(self, context): self.context = context self.logger = self.context.logger self.dailyTracking = datetime.now() self.seenSymbols = set() self.tradedSymbols = set() self.chainSymbols = set() self.tradedToday = False self.tracking = {} def endOfDay(self, symbol): day_summary = { "Time": (datetime.now() - self.dailyTracking).total_seconds(), "Portfolio": len(self.context.Portfolio), "Invested": sum(1 for kvp in self.context.Portfolio if kvp.Value.Invested), "Seen": len(self.seenSymbols), "Traded": len(self.tradedSymbols), "Chains": len(self.chainSymbols) } # Convert Symbol instance to string symbol_str = str(symbol) # Ensure the date is in the tracking dictionary date_key = self.context.Time.date() if date_key not in self.tracking: self.tracking[date_key] = {} # Ensure the symbol is in the tracking dictionary if symbol_str not in self.tracking[date_key]: self.tracking[date_key][symbol_str] = {} # Store the day summary self.tracking[date_key][symbol_str] = day_summary self.dailyTracking = datetime.now() self.tradedToday = False def OnOrderEvent(self, orderEvent): if orderEvent.Status == OrderStatus.Filled or orderEvent.Status == OrderStatus.PartiallyFilled: if orderEvent.Quantity > 0: self.logger.trace(f"Filled {orderEvent.Symbol}") self.tradedSymbols.add(orderEvent.Symbol) self.tradedToday = True else: self.logger.trace(f"Unwound {orderEvent.Symbol}") def OnUpdate(self, data): if data.OptionChains: for kvp in data.OptionChains: chain = kvp.Value # Access the OptionChain from the KeyValuePair self.chainSymbols.update([oc.Symbol for oc in chain]) if not self.tradedToday: for optionContract in (contract for contract in chain if contract.Symbol not in self.tradedSymbols): self.seenSymbols.add(optionContract.Symbol) def show(self, csv=False): if csv: self.context.Log("Day,Symbol,Time,Portfolio,Invested,Seen,Traded,Chains") for day in sorted(self.tracking.keys()): for symbol, stats in self.tracking[day].items(): if csv: self.context.Log(f"{day},{symbol},{stats['Time']},{stats['Portfolio']},{stats['Invested']},{stats['Seen']},{stats['Traded']},{stats['Chains']}") else: self.context.Log(f"{day} - {symbol}: {stats}")
# region imports from AlgorithmImports import * # endregion from datetime import datetime class ProviderOptionContract: def __init__(self, symbol, underlying_price, context): self.Symbol = symbol self.Underlying = symbol.Underlying self.UnderlyingSymbol = symbol.Underlying self.ID = symbol.ID self.UnderlyingLastPrice = underlying_price self.security = context.Securities[symbol] @property def Expiry(self): return self.ID.Date @property def Strike(self): return self.ID.StrikePrice @property def Right(self): return self.ID.OptionRight @property def BidPrice(self): return self.security.BidPrice @property def AskPrice(self): return self.security.AskPrice @property def LastPrice(self): return self.security.Price # Add any other properties or methods you commonly use from OptionContract
#region imports from AlgorithmImports import * #endregion from .Underlying import Underlying import operator class StrictDataHandler: # The supported cash indices by QC https://www.quantconnect.com/docs/v2/writing-algorithms/datasets/tickdata/us-cash-indices#05-Supported-Indices # These need to be added using AddIndex instead of AddEquity CashIndices = ['VIX','SPX','NDX'] def __init__(self, context, ticker, strategy): self.ticker = ticker self.context = context self.strategy = strategy # Method to add the ticker[String] data to the context. # @param resolution [Resolution] # @return [Symbol] def AddUnderlying(self, resolution=Resolution.Minute): if self.__CashTicker(): return self.context.AddIndex(self.ticker, resolution=resolution) else: return self.context.AddEquity(self.ticker, resolution=resolution) # SECTION BELOW IS FOR ADDING THE OPTION CHAIN # Method to add the option chain data to the context. # @param resolution [Resolution] def AddOptionsChain(self, underlying, resolution=Resolution.Minute): if self.ticker == "SPX": # Underlying is SPX. We'll load and use SPXW and ignore SPX options (these close in the AM) return self.context.AddIndexOption(underlying.Symbol, "SPXW", resolution) elif self.__CashTicker(): # Underlying is an index return self.context.AddIndexOption(underlying.Symbol, resolution) else: # Underlying is an equity return self.context.AddOption(underlying.Symbol, resolution) # Should be called on an option object like this: option.SetFilter(self.OptionFilter) # !This method is called every minute if the algorithm resolution is set to minute def SetOptionFilter(self, universe): # Start the timer self.context.executionTimer.start('Tools.DataHandler -> SetOptionFilter') self.context.logger.debug(f"SetOptionFilter -> universe: {universe}") # Include Weekly contracts # nStrikes contracts to each side of the ATM # Contracts expiring in the range (DTE-5, DTE) filteredUniverse = universe.Strikes(-self.strategy.nStrikesLeft, self.strategy.nStrikesRight)\ .Expiration(max(0, self.strategy.dte - self.strategy.dteWindow), max(0, self.strategy.dte))\ .IncludeWeeklys() self.context.logger.debug(f"SetOptionFilter -> filteredUniverse: {filteredUniverse}") # Stop the timer self.context.executionTimer.stop('Tools.DataHandler -> SetOptionFilter') return filteredUniverse # SECTION BELOW HANDLES OPTION CHAIN PROVIDER METHODS def optionChainProviderFilter(self, symbols, min_strike_rank, max_strike_rank, minDte, maxDte): self.context.executionTimer.start('Tools.DataHandler -> optionChainProviderFilter') self.context.logger.debug(f"optionChainProviderFilter -> symbols count: {len(symbols)}") # Check if we got any symbols to process if len(symbols) == 0: return None # Filter the symbols based on the expiry range filteredSymbols = [symbol for symbol in symbols if minDte <= (symbol.ID.Date.date() - self.context.Time.date()).days <= maxDte ] self.context.logger.debug(f"Context Time: {self.context.Time.date()}") unique_dates = set(symbol.ID.Date.date() for symbol in symbols) self.context.logger.debug(f"Unique symbol dates: {unique_dates}") self.context.logger.debug(f"optionChainProviderFilter -> filteredSymbols: {filteredSymbols}") # Exit if there are no symbols for the selected expiry range if not filteredSymbols: return None # If this is not a cash ticker, filter out any non-tradable symbols. # to escape the error `Backtest Handled Error: The security with symbol 'SPY 220216P00425000' is marked as non-tradable.` if not self.__CashTicker(): filteredSymbols = [x for x in filteredSymbols if self.context.Securities[x.ID.Symbol].IsTradable] underlying = Underlying(self.context, self.strategy.underlyingSymbol) # Get the latest price of the underlying underlyingLastPrice = underlying.Price() # Find the ATM strike atm_strike = sorted(filteredSymbols ,key = lambda x: abs(x.ID.StrikePrice - underlying.Price()) )[0].ID.StrikePrice # Get the list of available strikes strike_list = sorted(set([i.ID.StrikePrice for i in filteredSymbols])) # Find the index of ATM strike in the sorted strike list atm_strike_rank = strike_list.index(atm_strike) # Get the Min and Max strike price based on the specified number of strikes min_strike = strike_list[max(0, atm_strike_rank + min_strike_rank + 1)] max_strike = strike_list[min(atm_strike_rank + max_strike_rank - 1, len(strike_list)-1)] # Get the list of symbols within the selected strike range selectedSymbols = [symbol for symbol in filteredSymbols if min_strike <= symbol.ID.StrikePrice <= max_strike ] self.context.logger.debug(f"optionChainProviderFilter -> selectedSymbols: {selectedSymbols}") # Loop through all Symbols and create a list of OptionContract objects contracts = [] for symbol in selectedSymbols: # Create the OptionContract contract = self.context.OptionContract(symbol, symbol.Underlying) self.AddOptionContracts([contract], resolution = self.context.timeResolution) # Set the BidPrice contract.BidPrice = self.Securities[contract.Symbol].BidPrice # Set the AskPrice contract.AskPrice = self.Securities[contract.Symbol].AskPrice # Set the UnderlyingLastPrice contract.UnderlyingLastPrice = underlyingLastPrice # Add this contract to the output list contracts.append(contract) self.context.executionTimer.stop('Tools.DataHandler -> optionChainProviderFilter') # Return the list of contracts return contracts def getOptionContracts(self, slice): # Start the timer self.context.executionTimer.start('Tools.DataHandler -> getOptionContracts') contracts = None # Set the DTE range (make sure values are not negative) minDte = max(0, self.strategy.dte - self.strategy.dteWindow) maxDte = max(0, self.strategy.dte) self.context.logger.debug(f"getOptionContracts -> minDte: {minDte}") self.context.logger.debug(f"getOptionContracts -> maxDte: {maxDte}") # Loop through all chains for chain in slice.OptionChains: # Look for the specified optionSymbol if chain.Key != self.strategy.optionSymbol: continue # Make sure there are any contracts in this chain if chain.Value.Contracts.Count != 0: # Put the contracts into a list so we can cache the Greeks across multiple strategies contracts = [ contract for contract in chain.Value if minDte <= (contract.Expiry.date() - self.context.Time.date()).days <= maxDte ] self.context.logger.debug(f"getOptionContracts -> number of contracts: {len(contracts) if contracts else 0}") # If no chains were found, use OptionChainProvider to see if we can find any contracts # Only do this for short term expiration contracts (DTE < 3) where slice.OptionChains usually fails to retrieve any chains # We don't want to do this all the times for performance reasons if contracts == None and self.strategy.dte < 3: # Get the list of available option Symbols symbols = self.context.OptionChainProvider.GetOptionContractList(self.ticker, self.context.Time) # Get the contracts contracts = self.optionChainProviderFilter(symbols, -self.strategy.nStrikesLeft, self.strategy.nStrikesRight, minDte, maxDte) # Stop the timer self.context.executionTimer.stop('Tools.DataHandler -> getOptionContracts') return contracts # Method to add option contracts data to the context. # @param contracts [Array] # @param resolution [Resolution] # @return [Symbol] def AddOptionContracts(self, contracts, resolution = Resolution.Minute): # Add this contract to the data subscription so we can retrieve the Bid/Ask price if self.__CashTicker(): for contract in contracts: if contract.Symbol not in self.context.optionContractsSubscriptions: self.context.AddIndexOptionContract(contract, resolution) else: for contract in contracts: if contract.Symbol not in self.context.optionContractsSubscriptions: self.context.AddOptionContract(contract, resolution) # PRIVATE METHODS # Internal method to determine if we are using a cashticker to add the data. # @returns [Boolean] def __CashTicker(self): return self.ticker in self.CashIndices
#region imports from AlgorithmImports import * #endregion import time as timer import math from datetime import timedelta class Timer: performanceTemplate = { "calls": 0.0, "elapsedMin": float('Inf'), "elapsedMean": None, "elapsedMax": float('-Inf'), "elapsedTotal": 0.0, "elapsedLast": None, "startTime": None, } def __init__(self, context): self.context = context self.performance = {} def start(self, methodName=None): # Get the name of the calling method methodName = methodName or sys._getframe(1).f_code.co_name # Get current performance stats performance = self.performance.get(methodName, Timer.performanceTemplate.copy()) # Get the startTime performance["startTime"] = timer.perf_counter() # Save it back in the dictionary self.performance[methodName] = performance def stop(self, methodName=None): # Get the name of the calling method methodName = methodName or sys._getframe(1).f_code.co_name # Get current performance stats performance = self.performance.get(methodName) # Compute the elapsed elapsed = timer.perf_counter() - performance["startTime"] # Update the stats performance["calls"] += 1 performance["elapsedLast"] = elapsed performance["elapsedMin"] = min(performance["elapsedMin"], elapsed) performance["elapsedMax"] = max(performance["elapsedMax"], elapsed) performance["elapsedTotal"] += elapsed performance["elapsedMean"] = performance["elapsedTotal"]/performance["calls"] def showStats(self, methodName=None): methods = methodName or self.performance.keys() total_elapsed = 0.0 # Initialize total elapsed time for method in methods: performance = self.performance.get(method) if performance: self.context.Log(f"Execution Stats ({method}):") for key in performance: if key != "startTime": if key == "calls" or performance[key] == None: value = performance[key] elif math.isinf(performance[key]): value = None else: value = timedelta(seconds=performance[key]) self.context.Log(f" --> {key}:{value}") total_elapsed += performance.get("elapsedTotal", 0) # Accumulate elapsedTotal else: self.context.Log(f"There are no execution stats available for method {method}!") # Print the total elapsed time over all methods self.context.Log("Summary:") self.context.Log(f" --> elapsedTotal: {timedelta(seconds=total_elapsed)}")
#region imports from AlgorithmImports import * #endregion """ Underlying class for the Options Strategy Framework. This class is used to get the underlying price. Example: self.underlying = Underlying(self, self.underlyingSymbol) self.underlyingPrice = self.underlying.Price() """ class Underlying: def __init__(self, context, underlyingSymbol): self.context = context self.underlyingSymbol = underlyingSymbol def Security(self): return self.context.Securities[self.underlyingSymbol] # Returns the underlying symbol current price. def Price(self): return self.Security().Price def Close(self): return self.Security().Close
#region imports from AlgorithmImports import * from .Timer import Timer from .Logger import Logger from .ContractUtils import ContractUtils from .DataHandler import DataHandler from .Underlying import Underlying from .BSMLibrary import BSM, BSMGreeks from .Helper import Helper from .Charting import Charting from .Performance import Performance from .ProviderOptionContract import ProviderOptionContract #endregion
# region imports from AlgorithmImports import * # endregion import numpy as np import pandas as pd # The custom algo imports from Execution import AutoExecutionModel, SmartPricingExecutionModel, SPXExecutionModel from Monitor import HedgeRiskManagementModel, NoStopLossModel, StopLossModel, FPLMonitorModel, SPXicMonitor, CCMonitor, SPXButterflyMonitor, SPXCondorMonitor, IBSMonitor from PortfolioConstruction import OptionsPortfolioConstruction # The alpha models from Alpha import FPLModel, CCModel, SPXic, SPXButterfly, SPXCondor, IBS # The execution classes from Initialization import SetupBaseStructure, HandleOrderEvents from Tools import Performance """ Algorithm Structure Case v1: 1. We run the SetupBaseStructure.Setup() that will set the defaults for all the holders of data and base configuration 2. We have inside each AlphaModel a set of default parameters that will not be assigned to the context. - This means that each AlphaModel (Strategy) will have their own configuration defined in each class. - The AlphaModel will add the Underlying and options chains required - The QC algo will call the AlphaModel#Update method every 1 minute (self.timeResolution) - The Update method will call the AlphaModel#getOrder method - The getOrder method should use self.order (Alpha.Utils.Order) methods to get the options - The options returned will use the Alpha.Utils.Scanner and the Alpha.Utils.OrderBuilder classes - The final returned method requred to be returned by getOrder method is the Order#getOrderDetails - The Update method now in AlphaModel will use the getOrder method output to create Insights """ class CentralAlgorithm(QCAlgorithm): def Initialize(self): # WARNING!! If your are going to trade SPX 0DTE options then make sure you set the startDate after July 1st 2022. # This is the start of the data we have. self.SetStartDate(2024, 1, 3) self.SetEndDate(2024, 2, 4) # self.SetStartDate(2024, 4, 1) # self.SetEndDate(2024, 4, 30) # self.SetEndDate(2022, 9, 15) # Warmup for some days # self.SetWarmUp(timedelta(14)) # Logging level: # -> 0 = ERROR # -> 1 = WARNING # -> 2 = INFO # -> 3 = DEBUG # -> 4 = TRACE (Attention!! This can consume your entire daily log limit) self.logLevel = 3 if self.LiveMode else 2 # Set the initial account value self.initialAccountValue = 100_000 self.SetCash(self.initialAccountValue) # Time Resolution self.timeResolution = Resolution.Minute # Set Export method self.CSVExport = False # Should the trade log be displayed self.showTradeLog = False # Show the execution statistics self.showExecutionStats = False # Show the performance statistics self.showPerformanceStats = False # Set the algorithm base variables and structures self.structure = SetupBaseStructure(self).Setup() self.performance = Performance(self) # Set the algorithm framework models # self.SetAlpha(FPLModel(self)) # self.SetAlpha(SPXic(self)) # self.SetAlpha(CCModel(self)) # self.SetAlpha(SPXButterfly(self)) # self.SetAlpha(SPXCondor(self)) self.SetAlpha(IBS(self)) self.SetPortfolioConstruction(OptionsPortfolioConstruction(self)) # self.SetPortfolioConstruction(InsightWeightingPortfolioConstructionModel()) # self.SetExecution(SpreadExecutionModel()) self.SetExecution(SPXExecutionModel(self)) # self.SetExecution(AutoExecutionModel(self)) # self.SetExecution(SmartPricingExecutionModel(self)) # self.SetExecution(ImmediateExecutionModel()) # self.SetRiskManagement(NoStopLossModel(self)) # self.SetRiskManagement(StopLossModel(self)) # self.SetRiskManagement(FPLMonitorModel(self)) # self.SetRiskManagement(SPXicMonitor(self)) # self.SetRiskManagement(CCMonitor(self)) # self.SetRiskManagement(SPXButterflyMonitor(self)) # self.SetRiskManagement(SPXCondorMonitor(self)) self.SetRiskManagement(IBSMonitor(self)) # Initialize the security every time that a new one is added def OnSecuritiesChanged(self, changes): for security in changes.AddedSecurities: self.structure.CompleteSecurityInitializer(security) for security in changes.RemovedSecurities: self.structure.ClearSecurity(security) def OnEndOfDay(self, symbol): self.structure.checkOpenPositions() self.performance.endOfDay(symbol) def OnOrderEvent(self, orderEvent): # Start the timer self.executionTimer.start() # Log the order event self.logger.debug(orderEvent) self.performance.OnOrderEvent(orderEvent) HandleOrderEvents(self, orderEvent).Call() # Loop through all strategies # for strategy in self.strategies: # # Call the Strategy orderEvent handler # strategy.handleOrderEvent(orderEvent) # Stop the timer self.executionTimer.stop() def OnEndOfAlgorithm(self) -> None: # Convert the dictionary into a Pandas Data Frame # dfAllPositions = pd.DataFrame.from_dict(self.allPositions, orient = "index") # Convert the dataclasses into Pandas Data Frame dfAllPositions = pd.json_normalize(obj.asdict() for k,obj in self.allPositions.items()) if self.showExecutionStats: self.Log("") self.Log("---------------------------------") self.Log(" Execution Statistics ") self.Log("---------------------------------") self.executionTimer.showStats() self.Log("") if self.showPerformanceStats: self.Log("---------------------------------") self.Log(" Performance Statistics ") self.Log("---------------------------------") self.performance.show() self.Log("") self.Log("") if self.showTradeLog: self.Log("---------------------------------") self.Log(" Trade Log ") self.Log("---------------------------------") self.Log("") if self.CSVExport: # Print the csv header self.Log(dfAllPositions.head(0).to_csv(index = False, header = True, line_terminator = " ")) # Print the data frame to the log in csv format (one row at a time to avoid QC truncation limitation) for i in range(0, len(dfAllPositions.index)): self.Log(dfAllPositions.iloc[[i]].to_csv(index = False, header = False, line_terminator = " ")) else: self.Log(f"\n#{dfAllPositions.to_string()}") self.Log("") def lastTradingDay(self, expiry): # Get the trading calendar tradingCalendar = self.TradingCalendar # Find the last trading day for the given expiration date lastDay = list(tradingCalendar.GetDaysByType(TradingDayType.BusinessDay, expiry - timedelta(days = 20), expiry))[-1].Date return lastDay