Overall Statistics |
Total Orders 4 Average Win 3.89% Average Loss -4.94% Compounding Annual Return -13.801% Drawdown 1.300% Expectancy -0.106 Start Equity 5000 End Equity 4938 Net Profit -1.240% Sharpe Ratio -8.48 Sortino Ratio -6.811 Probabilistic Sharpe Ratio 0.002% Loss Rate 50% Win Rate 50% Profit-Loss Ratio 0.79 Alpha -0.137 Beta 0.076 Annual Standard Deviation 0.018 Annual Variance 0 Information Ratio 0.774 Tracking Error 0.101 Treynor Ratio -2.03 Total Fees $0.00 Estimated Strategy Capacity $1000.00 Lowest Capacity Asset TQQQ 32BINNWRRXQKM|TQQQ UK280CGTCB51 Portfolio Turnover 0.67% |
from AlgorithmImports import * import numpy as np from collections import defaultdict class IBSVolatilityPendulum(QCAlgorithm): """ ------------------------------------------------------------------------------------------------ Summary: The Volatility Pendulum is an advanced options trading strategy designed to capitalize on extreme Internal Bar Strength (IBS) readings in the TQQQ ETF. By strategically selling Put Vertical spreads during oversold conditions, this algorithm aims to generate excess returns while managing downside risk. Stratey Documentation: https://docs.google.com/document/d/1rvoyZbaPbfEeT6bl4KbnbUrSK0f0em586Iu1wbK-Zoo/edit Entry Signal: - IBS < 0.2 (IBS = (Close - Low) / (High - Low)) Strike Selection Criteria - Days to Expiration (DTE): 20-30 days - Higher (Short) Put: 28-40 delta - Lower (Long) Put: 1-2 strike steps below the short put - Spread Selection: Optimize for best risk/reward profile Position Sizing - Position Sizing: 0.3 * Portfolio.availableBuyingPower - Max Open Positions: 100 Exit Signals (first to occur): - IBS > 0.8 - DTE ≤ 1 - Profit ≥ 25% of max potential profit - Loss ≥ 25% of max potential loss TODO: - Pick strikes based on ATR instead of Delta - Implement Stop Loss logic (underlyuing) - Implement Stop Loss logic (pct of max loss) - Money Management Open position risking 30% of available buying power - Add tag for % of loss / profit for the spread - Understand why this option exercise keeps happening, even with DTE exit - 2023-04-22 00:00:00 TQQQ 230421P00024500 Buy Option Exercise - When calculating DTEExit, it should account for weekends. - Check margin requirements before attempting trade - Include transaction fees when calculating max loss / profit - Form and test hypotheses on time of day to enter ------------------------------------------------------------------------------------------------ """ ## Initialize the algo ## ------------------------ def Initialize(self): self.InitAlgo() self.InitParams() self.InitData() self.ScheduleRoutines() ## Init backtest details ## ------------------------ def InitAlgo(self): self.ticker = "TQQQ" # Ticker symbol to trade self.SetBenchmark("SPY") # Benchmark for reporting (buy and hold) self.SetStartDate(2023, 9, 11) # Backtest start date self.SetEndDate(2023, 10, 11) # Backtest end date self.SetCash(5000) # Starting portfolio balance ## Init Local and external params ## -------------------------------- def InitParams(self): self.exitMessage = "" self.targetProfitPct = int(self.get_parameter("targetProfitPct")) self.targetLossPct = int(self.get_parameter("targetLossPct")) self.qty = int(self.get_parameter("qty")) self.addToPositions = int(self.get_parameter("addToPositions")) == 1 self.ibsExitThresh = float(self.get_parameter("ibsExitThresh")) self.ibsEnterThresh = float(self.get_parameter("ibsEnterThresh")) self.tradeOptions = int(self.get_parameter("tradeOptions")) == 1 self.checkIBSExit = int(self.get_parameter("checkIBSExit")) == 1 self.dteExit = int(self.get_parameter("dteExit")) # Filtering criteria for days to expiration (DTE) self.maxDTEDays = int(self.get_parameter("minDTE")) + int(self.get_parameter("dteDiff")) self.min_dte = timedelta(days=int(self.get_parameter("minDTE"))) self.max_dte = timedelta(days=(self.maxDTEDays)) # self.max_dte = timedelta(days=int(self.get_parameter("maxDTE"))) self.goShort = False self.useComboOrders = True self.lastDailyBar = None ## Init Data Feed, Conoslidators, etc ## ----------------------------------- def InitData(self): # Subscrbe to a minute data feed (minute bars) res = Resolution.Minute if self.tradeOptions else Resolution.Daily equity = self.AddEquity(self.ticker, res) self.symbol = equity.symbol equity.SetDataNormalizationMode(DataNormalizationMode.Raw) # Set up the daily bar consolidator self.dailyConsolidator = TradeBarConsolidator(timedelta(days=1)) self.dailyConsolidator.DataConsolidated += lambda _, dailyBar: setattr(self, 'lastDailyBar', dailyBar) self.SubscriptionManager.AddConsolidator(self.symbol, self.dailyConsolidator) if self.tradeOptions: self.InitOptionsData() ## Init Options Data ## ----------------- def InitOptionsData(self): self.chainLength = 100 option = self.AddOption(self.ticker, Resolution.Minute) self.optionSymbol = option.Symbol option.SetFilter(lambda universe: universe.IncludeWeeklys().Strikes(-self.chainLength//2, 0)\ .PutsOnly()\ .Expiration(self.min_dte, self.max_dte)) self.SetSecurityInitializer(CompositeSecurityInitializer(self.SecurityInitializer, FuncSecurityInitializer(self.CustomSecurityInitializer))) ## Schedule recurring logic (chron jobs) ## -------------------------------------- def ScheduleRoutines(self): # Schedule a daily chron job to check for signals at the open self.Schedule.On(self.DateRules.EveryDay(), \ self.TimeRules.AfterMarketOpen(self.ticker, 5), self.CheckForEntries) if self.tradeOptions: # # Schedule a daily chron job to check for exits every few minutes self.Schedule.On(self.DateRules.EveryDay(self.ticker), self.TimeRules.Every(timedelta(minutes=3)), self.CheckForExits) # Schedule a daily chron job to check for exit at the EoD # self.Schedule.On(self.DateRules.EveryDay(), \ # self.TimeRules.AfterMarketOpen(self.ticker, 329), # self.CheckForExits) else: # Schedule a daily chron job to check for signals at the open self.Schedule.On(self.DateRules.EveryDay(), \ self.TimeRules.AfterMarketOpen(self.ticker, 6), self.CheckForExits) ## Check for entries ## Called as soon as IBS signal is ready, ## At a scheduled time every day. ## -------------------------------------- def CheckForEntries(self): if not self.DataIsReady(): return if self.EntrySignalFired(): if( not self.Portfolio.Invested or self.addToPositions ): if self.tradeOptions: self.OpenOptionsTrade() else: self.SetHoldings(self.ticker, 1) ## Convenience: Check if Data is Ready ## ----------------------------------- def DataIsReady(self): data = self.CurrentSlice if self.IsWarmingUp or (self.ticker not in data ) or (data[self.ticker] is None): self.Log("\t\tNo data, or warming up ") return False return True ## Check for exits ## Called intraday at a scheduled time. ## 1. IBS > 0.8 ## 2. DTE≤1 ## 3. Profit ≥ 25% of max potential profit ## 4. Loss ≥ 25% of max potential loss ## ------------------------------------------------------- def CheckForExits(self): if not self.DataIsReady(): return if not self.is_market_open(self.symbol): return self.Plot(f"{self.ticker}: Price","Ticker Price",self.Securities[self.ticker].Price) if self.Portfolio.Invested: # General Exit 1. IBS threshold breached # ---------------------------------------- if( self.checkIBSExit ): if self.lastDailyBar is not None: if(self.lastDailyBar.high != self.lastDailyBar.low): ibsValue = (self.lastDailyBar.close - self.lastDailyBar.low) / (self.lastDailyBar.high - self.lastDailyBar.low) if (ibsValue > self.ibsExitThresh ): self.exitMessage = "IBS Exit Threshold breached. Exit all holdings" self.Liquidate(tag=f"{self.exitMessage}") return ## Handle Exits for options ## ------------------------ if self.tradeOptions: ## Check Individual Spreads for exit Criteria ## --------------------------------------------------- spreads = self.GetPutCreditSpreadsInPortfolio() priceInfo = f"{self.ticker} @ ${self.Securities[self.ticker].Price}" for (short_position, long_position) in spreads: currentValue = round(self.GetNetPremium(short_position.Symbol, long_position.Symbol),2) curValLabel = f"[${currentValue} Spread Value]" spreadLabel = f"{short_position.Symbol.ID.StrikePrice}~{long_position.Symbol.ID.StrikePrice}" initial_premium = abs(short_position.AveragePrice) - abs(long_position.AveragePrice) unrealizedPnL = round((initial_premium - currentValue),2) iconLabel = "❌" if unrealizedPnL <= 0 else "✅" pctOfMaxProfit = self.percent_max_profit_achieved(short_position, long_position) pctOfMaxLoss = self.percent_max_loss_achieved(short_position, long_position) roiPercentage = self.calculate_return_on_credit(short_position, long_position) # Debug Plots # ------------------------ # self.Plot(f"% Profit: {spreadLabel}","% max profit", pctOfMaxProfit ) # self.Plot(f"% PLoss: {spreadLabel}","% max loss", pctOfMaxLoss ) # self.Plot(f"Value: {spreadLabel}","Curr Premium", currentValue ) # self.Plot(f"ROI Pct: {spreadLabel}","roiPercentage", roiPercentage ) # Spread Exit 1. Expiration Date <= self.dteExit (backtested at 1 DTE) # ------------------------------------------ expiry = short_position.Symbol.ID.Date days_till_expiration = (expiry - self.Time).days # Todo: try holding till the day of, if # if days_till_expiration <= 1 and (roiPercentage <= 0): if days_till_expiration <= self.dteExit: self.exitMessage = f'[{iconLabel}][{spreadLabel}][DTE Exit] | {curValLabel} | {unrealizedPnL} PnL | {days_till_expiration} DTE <= {self.dteExit} {priceInfo}' self.liquidate(short_position.Symbol, tag=self.exitMessage) self.liquidate(long_position.Symbol, tag=self.exitMessage) # Spread Exit 2. Profit ≥ X% of max potential profit # -------------------------------------------------- # elif (pctOfMaxProfit >= 250): elif (roiPercentage >= self.targetProfitPct): self.exitMessage = f'[{iconLabel}][{spreadLabel}][Take Profit] | {curValLabel} | {unrealizedPnL} PnL | {roiPercentage}% ROI Profit ≥ {self.targetProfitPct}% of max potential profit {priceInfo}' self.liquidate(short_position.Symbol, tag=self.exitMessage) self.liquidate(long_position.Symbol, tag=self.exitMessage) # Spread Exit 3. Loss ≥ X% of max potential loss # ------------------------------------------------- elif (pctOfMaxLoss >= self.targetLossPct): # elif (roiPercentage <= -25): self.exitMessage = f'[{iconLabel}][{spreadLabel}][Stop Loss] | {curValLabel} | {unrealizedPnL} PnL | {roiPercentage}% ROI Loss ≥ {self.targetLossPct}% of max potential loss {priceInfo}' self.liquidate(short_position.Symbol, tag=self.exitMessage) self.liquidate(long_position.Symbol, tag=self.exitMessage) # Spread Exit 4. Underlying below lower (long) strike # ------------------------------------------------------- # elif (self.Securities[self.ticker].Price <= long_position.Symbol.ID.StrikePrice): # self.exitMessage = f'[❌ Underlying Stop Loss] {spreadLabel} @ {self.Securities[self.ticker].Price} < {long_position.Symbol.ID.StrikePrice} | {priceInfo}' # self.liquidate(short_position.Symbol, tag=self.exitMessage) # self.liquidate(long_position.Symbol, tag=self.exitMessage) ## 3. Close any holdings of the Underlying, if any ## ----------------------------------------------- for symbol, holding in self.Portfolio.items(): # Check if the held position is not an option. If so, close position. if (holding.Invested) and (holding.Type != SecurityType.Option): self.Debug(f"Holding {symbol.Value} is a {holding.Type}. Liquidate.") self.Liquidate(tag=f"Holding {symbol.Value} is a {holding.Type}") ## Go long when IBS < 0.2 ## ------------------------------ def EntrySignalFired(self): if self.lastDailyBar is not None: if(self.lastDailyBar.high != self.lastDailyBar.low): ibsValue = (self.lastDailyBar.close - self.lastDailyBar.low) / (self.lastDailyBar.high - self.lastDailyBar.low) return (ibsValue < self.ibsEnterThresh ) return False def CustomSecurityInitializer(self, security): security.SetMarketPrice(self.GetLastKnownPrice(security)) if Extensions.IsOption(security.Symbol.SecurityType): security.SetOptionAssignmentModel(NullOptionAssignmentModel()) security.SetFeeModel(ConstantFeeModel(0)) security.SetMarketPrice(self.GetLastKnownPrice(security)) security.SetFillModel(CustomFillModel()) ## Open Options Trade ## ------------------------------ def OpenOptionsTrade(self): slice = self.current_slice gotBPS = False # Get the OptionChain chain = slice.OptionChains.get(self.optionSymbol, None) if not chain: return # Get the furthest expiration date of the contracts expiry = sorted(chain, key = lambda x: x.Expiry, reverse=True)[0].Expiry # Get ComboMarketOrders to trade. Doesnt Work with Tradier pairs = self.GetPutPairs(chain) topPair = self.GetTopRRRatioPair(pairs) if topPair is None: return self.quit() else: short_put = topPair[0] long_put = topPair[1] # gotBPS, bps_strategy = self.OpenBullPutSpread(short_put.Strike, long_put.Strike, long_put.Expiry ) bps_strategy = OptionStrategies.BullPutSpread(self.optionSymbol, short_put.Strike, long_put.Strike, long_put.Expiry) # If we've got contracts, trade them if( bps_strategy is not None ): if(self.useComboOrders): prefix = "[++]" if self.Portfolio.Invested else "[+] " spreadInfo = f"{bps_strategy.OptionLegs[0].Strike} ~ {bps_strategy.OptionLegs[1].Strike}" priceInfo = f"{self.ticker} @ ${self.Securities[self.ticker].Price}" costBasis = f"${round((short_put.BidPrice - long_put.AskPrice),2)} Received" self.Buy(bps_strategy, self.qty, tag=f"{prefix} {spreadInfo} {priceInfo} | {costBasis}") else: self.Debug("Open position one leg at a time.") self.Debug("Not implemented yet.") self.quit() return else: self.Debug(f"{self.Time} | OptionStrategies.BullPutSpread Failed") ## Get all possible put pairs that match our criteria: ## 1. Days to Expiration (DTE): 20-30 days ## 2. Strike Selection: ## 3. Higher (Short) Put: 28-40 delta ## 4. Lower (Long) Put: 1-2 strike steps below the short put ## ------------------------------------------------------- def GetPutPairs(self, chain): from datetime import timedelta # Current time current_time = self.Time # Filter all puts with expiration within the desired DTE range eligible_puts = [] for i in chain: if i.Right == OptionRight.Put: dte = i.Expiry - current_time if self.min_dte <= dte <= self.max_dte: eligible_puts.append(i) # Filter puts with delta between -0.40 and -0.28 high_puts = [put for put in eligible_puts if -0.40 <= put.Greeks.Delta <= -0.28] # Get unique strike prices sorted strike_prices = sorted(set(put.Strike for put in eligible_puts)) # Initialize list for put pairs put_pairs = [] # Loop over each high_put to find suitable low_puts for high_put in high_puts: high_strike = high_put.Strike expiry = high_put.Expiry # Find the index of the high_put strike in the sorted strike prices list high_strike_index = strike_prices.index(high_strike) # Define possible lower strikes as 1 or 2 steps below the high strike possible_low_strikes = strike_prices[max(0, high_strike_index - 2):high_strike_index] # Find puts that match the criteria for lower strikes low_puts = [put for put in eligible_puts if put.Expiry == expiry and put.Strike in possible_low_strikes] # Create pairs of high and low puts for low_put in low_puts: put_pairs.append((high_put, low_put)) return put_pairs ## Get the Pair with the most favorable Reward/Risk ratio ## ------------------------------------------------------ def GetTopRRRatioPair(self, put_pairs): # Initialize variables to keep track of the best pair and highest RR ratio top_pair = None highest_rr_ratio = float('-inf') self.Debug("------------------------") self.Debug(self.Time.strftime("%Y-%m-%d %H:%M:%S")) self.Debug("Pairs Under Consideration:") self.Debug("spread | collected credit | reward-risk ratio") # Iterate through each put pair for high_put, low_put in put_pairs: # Calculate collected credit (difference in strikes) collected_credit = round((high_put.BidPrice - low_put.AskPrice),2) if( collected_credit < 0): continue # Calculate max loss (difference in strikes minus collected credit) max_loss = (high_put.Strike - low_put.Strike) - collected_credit max_loss = round(max_loss,2) # ( ITM put strike price - OTM strke ) + (ITM put value at position opening (credit received) - OTM put value at position opening (debit paid)Contract multiplierTime of expiration # Calculate RR ratio (collected credit divided by max loss) if max_loss > 0: # Ensure max_loss is positive to avoid division by zero rr_ratio = round((collected_credit / max_loss),2) # Debug: Print RR ratio for the current pair # self.Debug(f"RR Ratio for pair ({high_put.Strike}, {low_put.Strike}): {rr_ratio}") # Update the top pair if the current RR ratio is higher than the highest found so far if rr_ratio > highest_rr_ratio: highest_rr_ratio = rr_ratio top_pair = (high_put, low_put) self.Debug(f" {high_put.Strike} ~ {low_put.Strike} | {collected_credit} / {max_loss}| RR:{rr_ratio}") self.Debug("------------------------") # Return the pair with the highest RR ratio return top_pair # Example usage within the same class: # top_pair = self.GetTopRRRatioPair(put_pairs) ## Get all the PCS pairs in the portfolio ## Identify put credit spreads in the portfolio. ## ## :return: List of tuples, each containing (short_contract, long_contract) ## ------------------------------------------ def GetPutCreditSpreadsInPortfolio(self): # Group put options by underlying symbol and expiration put_options = defaultdict(lambda: defaultdict(list)) for holding in self.Portfolio.Values: if holding.Type == SecurityType.Option and \ holding.Symbol.ID.OptionRight == OptionRight.Put and \ holding.Invested: underlying = holding.Symbol.Underlying expiry = holding.Symbol.ID.Date put_options[underlying][expiry].append(holding) credit_spreads = [] for underlying in put_options: for expiry in put_options[underlying]: # Sort puts by strike price, descending sorted_puts = sorted(put_options[underlying][expiry], key=lambda x: x.Symbol.ID.StrikePrice, reverse=True) for i in range(len(sorted_puts) - 1): higher_strike = sorted_puts[i] lower_strike = sorted_puts[i + 1] # Check if it's a valid put credit spread if (higher_strike.Quantity < 0 and lower_strike.Quantity > 0 and higher_strike.Symbol.ID.StrikePrice > lower_strike.Symbol.ID.StrikePrice): credit_spreads.append((higher_strike, lower_strike)) return credit_spreads ## Calculate the percentage of max profit achieved for a put credit spread. ## ## :param short_position: The short put option position ## :param long_position: The long put option position ## :return: Percentage of max profit achieved (0-100) ## ------—------—------—------—------—------—------—------—------—------— def percent_max_profit_achieved(self, short_position, long_position): # Calculate initial and current net premiums initial_net_premium = abs(short_position.AveragePrice) - abs(long_position.AveragePrice) current_net_premium = self.GetNetPremium(short_position.Symbol, long_position.Symbol) # current_net_premium = abs(self.Securities[short_position.Symbol].Price) - abs(self.Securities[long_position.Symbol].Price) if initial_net_premium == 0: pctOfMaxProfit = 0 # Avoid division by zero else: percent = (initial_net_premium - current_net_premium) / initial_net_premium * 100 pctOfMaxProfit = np.clip(percent, 0, 100) # Ensure result is between 0% and 100% return pctOfMaxProfit ## Calculate the percentage of max loss achieved for a put credit spread. ## ## :param short_position: The short put option position ## :param long_position: The long put option position ## :return: Percentage of max loss achieved (0-100) ## ------------------------------------------------------------------------ def percent_max_loss_achieved(self, short_position, long_position): # Calculate initial and current net premiums initial_net_premium = abs(short_position.AveragePrice) - abs(long_position.AveragePrice) current_net_premium = self.GetNetPremium(short_position.Symbol, long_position.Symbol) # current_net_premium = abs(self.Securities[short_position.Symbol].Price) - abs(self.Securities[long_position.Symbol].Price) # Calculate max loss max_loss = (short_position.Symbol.ID.StrikePrice - long_position.Symbol.ID.StrikePrice) - initial_net_premium if max_loss - initial_net_premium == 0: pctOfMaxLoss = 0 # Avoid division by zero else: percent = (current_net_premium - initial_net_premium) / (max_loss - initial_net_premium) * 100 pctOfMaxLoss = np.clip(percent, 0, 100) # Ensure result is between 0% and 100% return pctOfMaxLoss ## Calculate the return on the cost basis (credit received) of a put credit spread. ## ## :param short_position: The short put option position ## :param long_position: The long put option position ## :return: Percentage return on the credit received ## -------------------------------------------- def calculate_return_on_credit(self, short_position, long_position): # Calculate the initial credit received (cost basis) initial_credit = abs(short_position.AveragePrice) - abs(long_position.AveragePrice) # Get the current cost to close the position current_cost = self.GetNetPremium(short_position.Symbol, long_position.Symbol) # Calculate the current profit/loss profit_loss = initial_credit - current_cost # Calculate the return as a percentage of the initial credit if initial_credit == 0: return 0 # Avoid division by zero return_percentage = round(((profit_loss / initial_credit) * 100),3) return return_percentage ## Calculate the current net premium of a put credit spread. ## ## :param short_symbol: Symbol of the short put option ## :param long_symbol: Symbol of the long put option ## :return: Current net premium of the spread ## ---------------------------------------------------------------- def GetNetPremium(self, short_symbol, long_symbol): short_option = self.Securities[short_symbol] long_option = self.Securities[long_symbol] # Use ask price for short put (cost to buy back) short_price = short_option.AskPrice # Use bid price for long put (what we'd receive to sell) long_price = long_option.BidPrice # If ask or bid is zero or not available, fall back to last price if short_price == 0: short_price = short_option.Price if long_price == 0: long_price = long_option.Price # Calculate net premium net_premium = short_price - long_price return abs(net_premium) # Return absolute value for consistency ###################################### ## Custom Fill Model ## TODO: Evluate necessity ###################################### class CustomFillModel(FillModel): def MarketFill(self, asset, order): if order.Direction == OrderDirection.Buy: fill_price = asset.AskPrice else: fill_price = asset.BidPrice fill = super().MarketFill(asset, order) fill.FillPrice = fill_price return fill