Overall Statistics
Total Orders
69
Average Win
3.32%
Average Loss
-2.81%
Compounding Annual Return
25.631%
Drawdown
3.800%
Expectancy
0.160
Start Equity
5000
End Equity
5504
Net Profit
10.080%
Sharpe Ratio
1.83
Sortino Ratio
3.048
Probabilistic Sharpe Ratio
89.314%
Loss Rate
47%
Win Rate
53%
Profit-Loss Ratio
1.18
Alpha
0.014
Beta
0.449
Annual Standard Deviation
0.067
Annual Variance
0.005
Information Ratio
-1.589
Tracking Error
0.075
Treynor Ratio
0.274
Total Fees
$0.00
Estimated Strategy Capacity
$0
Lowest Capacity Asset
TQQQ 3297Y40EE4OKM|TQQQ UK280CGTCB51
Portfolio Turnover
1.52%
from AlgorithmImports import *
import numpy as np
from collections import defaultdict

class IBSVolatilityPendulum(QCAlgorithm):

    """
    ------------------------------------------------------------------------------------------------
    Summary:
        The Volatility Pendulum is an advanced options trading strategy 
        designed to capitalize on extreme Internal Bar Strength (IBS) 
        readings in the TQQQ ETF. By strategically selling Put Vertical 
        spreads during oversold conditions, this algorithm aims to generate 
        excess returns while managing downside risk.

    Stratey Documentation:
        https://docs.google.com/document/d/1rvoyZbaPbfEeT6bl4KbnbUrSK0f0em586Iu1wbK-Zoo/edit

    Entry Signal:
        - IBS < 0.2 (IBS = (Close - Low) / (High - Low))
    
    Strike Selection Criteria
        - Days to Expiration (DTE): 20-30 days
        - Higher (Short) Put: 28-40 delta
        - Lower (Long) Put: 1-2 strike steps below the short put
        - Spread Selection: Optimize for best risk/reward profile
            
    Position Sizing
        - Position Sizing: 0.3 * Portfolio.availableBuyingPower
        - Max Open Positions: 100


    Exit Signals (first to occur):
        - IBS > 0.8
        - DTE ≤ 1
        - Profit ≥ 25% of max potential profit
        - Loss ≥ 25% of max potential loss

    TODO: 
        - Implement Stop Loss logic
        - Understand why this option exercise keeps happening, even with DTE exit
            - 2023-04-22 00:00:00	 TQQQ 230421P00024500	Buy Option Exercise
        - Open position risking 30% of available buying power
        - When calculating DTEExit, it should account for weekends. 
        - Check margin requirements before attempting trade
        - Include transaction fees when calculating max loss / profit
        - Form and test hypotheses on time of day to enter
        
    ------------------------------------------------------------------------------------------------
    """

    ## Initialize the algo
    ## ------------------------
    def Initialize(self):
        self.InitAlgo()
        self.InitParams()
        self.InitData()
        self.ScheduleRoutines()


    ## Init backtest details
    ## ------------------------
    def InitAlgo(self):
        self.ticker = "TQQQ"            # Ticker symbol to trade
        self.SetBenchmark("SPY")        # Benchmark for reporting (buy and hold)
        self.SetStartDate(2023, 3, 1)   # Backtest start date
        self.SetEndDate(2023, 8, 1)     # Backtest end date
        self.SetCash(5000)              # Starting portfolio balance

    ## Init Local and external params
    ## --------------------------------
    def InitParams(self):        

        self.exitMessage      = ""
        self.targetProfitPct  = int(self.get_parameter("targetProfitPct")) 
        self.targetLossPct    = int(self.get_parameter("targetLossPct")) 
        self.qty              = int(self.get_parameter("qty")) 
        self.addToPositions   = int(self.get_parameter("addToPositions")) == 1
        self.ibsExitThresh    = float(self.get_parameter("ibsExitThresh"))
        self.ibsEnterThresh   = float(self.get_parameter("ibsEnterThresh"))
        self.tradeOptions     = int(self.get_parameter("tradeOptions")) == 1
        self.checkIBSExit     = int(self.get_parameter("checkIBSExit")) == 1
        self.dteExit          = int(self.get_parameter("dteExit"))

        # Filtering criteria for days to expiration (DTE)
        self.maxDTEDays       = int(self.get_parameter("minDTE")) + int(self.get_parameter("dteDiff"))   
        self.min_dte          = timedelta(days=int(self.get_parameter("minDTE")))
        self.max_dte          = timedelta(days=(self.maxDTEDays))
        # self.max_dte          = timedelta(days=int(self.get_parameter("maxDTE")))


        self.goShort        = False
        self.useComboOrders = True
        self.lastDailyBar   = None

    ## Init Data Feed, Conoslidators, etc
    ## -----------------------------------
    def InitData(self):
        # Subscrbe to a minute data feed (minute bars)
        res = Resolution.Minute if self.tradeOptions else Resolution.Daily
            
        equity = self.AddEquity(self.ticker, res)

        self.symbol = equity.symbol

        equity.SetDataNormalizationMode(DataNormalizationMode.Raw)

        # Set up the daily bar consolidator
        self.dailyConsolidator = TradeBarConsolidator(timedelta(days=1))
        self.dailyConsolidator.DataConsolidated += lambda _, dailyBar: setattr(self, 'lastDailyBar', dailyBar)
        self.SubscriptionManager.AddConsolidator(self.symbol, self.dailyConsolidator)

        if self.tradeOptions:
            self.InitOptionsData()

    ## Init Options Data
    ## -----------------
    def InitOptionsData(self):

        self.chainLength = 100

        option = self.AddOption(self.ticker, Resolution.Minute)

        self.optionSymbol = option.Symbol            

        option.SetFilter(lambda universe: universe.IncludeWeeklys().Strikes(-self.chainLength//2, 0)\
                                                                    .PutsOnly()\
                                                                    .Expiration(self.min_dte, self.max_dte))

        self.SetSecurityInitializer(CompositeSecurityInitializer(self.SecurityInitializer, FuncSecurityInitializer(self.CustomSecurityInitializer)))

    ## Schedule recurring logic (chron jobs)
    ## --------------------------------------    
    def ScheduleRoutines(self):
        # Schedule a daily chron job to check for signals at the open
        self.Schedule.On(self.DateRules.EveryDay(), \
                         self.TimeRules.AfterMarketOpen(self.ticker, 5), 
                         self.CheckForEntries)

        if self.tradeOptions:
            # Schedule a daily chron job to check for exits every few minutes
            self.Schedule.On(self.DateRules.EveryDay(self.ticker), 
                             self.TimeRules.Every(timedelta(minutes=3)), 
                             self.CheckForExits)

        else:
            # Schedule a daily chron job to check for signals at the open
            self.Schedule.On(self.DateRules.EveryDay(), \
                             self.TimeRules.AfterMarketOpen(self.ticker, 6), 
                             self.CheckForExits)



    ## Check for entries
    ## Called as soon as IBS signal is ready,
    ## At a scheduled time every day.
    ## --------------------------------------
    def CheckForEntries(self):
        if not self.DataIsReady(): return
       
        if self.EntrySignalFired():                
            
            if( not self.Portfolio.Invested or self.addToPositions ):
                if self.tradeOptions:
                    self.OpenOptionsTrade()
                else:
                    self.SetHoldings(self.ticker, 1)


    ## Convenience: Check if Data is Ready
    ## -----------------------------------
    def DataIsReady(self):
        data = self.CurrentSlice    
        if self.IsWarmingUp or (self.ticker not in data ) or (data[self.ticker] is None):
            self.Log("\t\tNo data, or warming up ")    
            return False
        return True

    ## Check for exits
    ## Called intraday at a scheduled time. 
    ##  1. IBS > 0.8            
    ##  2. DTE≤1
    ##  3. Profit ≥ 25% of max potential profit
    ##  4. Loss ≥ 25% of max potential loss
    ## -------------------------------------------------------
    def CheckForExits(self):

        if not self.DataIsReady(): return
        if not self.is_market_open(self.symbol): return

        if self.Portfolio.Invested:

        
            # General Exit 1. IBS threshold breached
            # ----------------------------------------
            if( self.checkIBSExit ):
                if self.lastDailyBar is not None:
                    if(self.lastDailyBar.high != self.lastDailyBar.low):
                        ibsValue = (self.lastDailyBar.close - self.lastDailyBar.low) / (self.lastDailyBar.high - self.lastDailyBar.low)
                        if (ibsValue > self.ibsExitThresh ):
                            self.exitMessage = "IBS Exit Threshold breached. Exit all holdings"
                            self.Liquidate(tag=f"{self.exitMessage}")
                            return
            
            ## Handle Exits for options
            ## ------------------------
            if self.tradeOptions:

                ## Check Individual Spreads for exit Criteria
                ## ---------------------------------------------------
                spreads = self.GetPutCreditSpreadsInPortfolio()


                priceInfo       = f"{self.ticker} @ ${self.Securities[self.ticker].Price}"
                

                # self.Plot(f"{self.ticker}: Price","Ticker Price",self.Securities[self.ticker].Price)
                for (short_position, long_position) in spreads:
                    currPremium     = self.GetNetPremium(short_position.Symbol, long_position.Symbol)
                    spreadLabel     = f"{short_position.Symbol.ID.StrikePrice} ~ {long_position.Symbol.ID.StrikePrice}"
                    
                    pctOfMaxProfit  = self.percent_max_profit_achieved(short_position, long_position)
                    pctOfMaxLoss    = self.percent_max_loss_achieved(short_position, long_position)
                    roiPercentage   = self.calculate_return_on_credit(short_position, long_position) 
                    

                    # Debug Plots
                    # ------------------------    
                    # self.Plot(f"% PnL: {spreadLabel}","% max profit", pctOfMaxProfit )
                    # self.Plot(f"% PnL: {spreadLabel}","% max loss", pctOfMaxLoss )
                    # self.Plot(f"Value: {spreadLabel}","Curr Premium", currPremium )
                    # self.Plot(f"ROI Pct: {spreadLabel}","roiPercentage", roiPercentage )

                    # Spread Exit 1. Expiration Date <= self.dteExit (backtested at 1 DTE)
                    # ------------------------------------------
                    expiry = short_position.Symbol.ID.Date
                    days_till_expiration = (expiry - self.Time).days
                    
                    # Todo: try holding till the day of, if 
                    # if days_till_expiration <= 1 and (roiPercentage <= 0):
                    if days_till_expiration <= self.dteExit:
                        self.exitMessage = f'[❌ DTE Exit] {spreadLabel} - {days_till_expiration} DTE <= {self.dteExit} {priceInfo}'
                        self.liquidate(short_position.Symbol, tag=self.exitMessage)
                        self.liquidate(long_position.Symbol, tag=self.exitMessage)


                    # Spread Exit 2. Profit ≥ 25% of max potential profit
                    # --------------------------------------------------
                    # elif (pctOfMaxProfit >= 250):
                    elif (roiPercentage >= self.targetProfitPct):

                        self.exitMessage = f'[❌ Take Profit] {spreadLabel} - {roiPercentage} Profit ≥ {self.targetProfitPct}% of max potential profit {priceInfo}'
                        self.liquidate(short_position.Symbol, tag=self.exitMessage)
                        self.liquidate(long_position.Symbol, tag=self.exitMessage)


                    # Spread Exit 3. Loss ≥ 25% of max potential loss
                    # -------------------------------------------------
                    elif (pctOfMaxLoss >= self.targetLossPct):
                    # elif (roiPercentage <= -25):
                        self.exitMessage = f'[❌ Stop Loss] {spreadLabel} - {pctOfMaxLoss} Loss ≥ {self.targetLossPct}% of max potential loss {priceInfo}'
                        self.liquidate(short_position.Symbol, tag=self.exitMessage)
                        self.liquidate(long_position.Symbol, tag=self.exitMessage)


                ## 3. Close any holdings of the Underlying, if any
                ## -----------------------------------------------
                for symbol, holding in self.Portfolio.items():
                    # Check if the held position is not an option. If so, close position.
                    if (holding.Invested) and (holding.Type != SecurityType.Option):
                        self.Debug(f"Holding {symbol.Value} is a {holding.Type}. Liquidate.")                
                        self.Liquidate(tag=f"Holding {symbol.Value} is a {holding.Type}")
                    

    ## Go long when  IBS < 0.2
    ## ------------------------------
    def EntrySignalFired(self):

        if self.lastDailyBar is not None:
            if(self.lastDailyBar.high != self.lastDailyBar.low):
                ibsValue = (self.lastDailyBar.close - self.lastDailyBar.low) / (self.lastDailyBar.high - self.lastDailyBar.low)
                return (ibsValue < self.ibsEnterThresh ) 

        return False

    def CustomSecurityInitializer(self, security):
        security.SetMarketPrice(self.GetLastKnownPrice(security))
        
        if Extensions.IsOption(security.Symbol.SecurityType):
            security.SetOptionAssignmentModel(NullOptionAssignmentModel())
            security.SetFeeModel(ConstantFeeModel(0))
            security.SetMarketPrice(self.GetLastKnownPrice(security))
            security.SetFillModel(CustomFillModel())            

    ## Open Options Trade
    ## ------------------------------
    def OpenOptionsTrade(self):

        slice = self.current_slice

        gotBPS = False
        
        # Get the OptionChain
        chain = slice.OptionChains.get(self.optionSymbol, None)
        if not chain: return 

        # Get the furthest expiration date of the contracts
        expiry = sorted(chain, key = lambda x: x.Expiry, reverse=True)[0].Expiry
        
        
        # Get ComboMarketOrders to trade. Doesnt Work with Tradier
        pairs   = self.GetPutPairs(chain)
        topPair = self.GetTopRRRatioPair(pairs)
        if topPair is None:
            return
            self.quit()
        else:
            short_put    = topPair[0]
            long_put     = topPair[1]
            
            # gotBPS, bps_strategy = self.OpenBullPutSpread(short_put.Strike, long_put.Strike, long_put.Expiry )
            bps_strategy = OptionStrategies.BullPutSpread(self.optionSymbol, short_put.Strike, long_put.Strike, long_put.Expiry)


        # If we've got contracts, trade them
        if( bps_strategy is not None ):

            if(self.useComboOrders):
                
                prefix      = "[++]" if self.Portfolio.Invested else "[+] "
                spreadInfo  = f"{bps_strategy.OptionLegs[0].Strike} ~ {bps_strategy.OptionLegs[1].Strike}"
                priceInfo   = f"{self.ticker} @ ${self.Securities[self.ticker].Price}" 
                costBasis   = f"${round((short_put.BidPrice - long_put.AskPrice),2)} Received"

                self.Buy(bps_strategy, self.qty, tag=f"{prefix} {spreadInfo} {priceInfo} | {costBasis}")
                
            else:

                self.Debug("Open position one leg at a time.")          
                self.Debug("Not implemented yet.")          
                self.quit()                

            return
        else:
            self.Debug(f"{self.Time} | OptionStrategies.BullPutSpread Failed")


    ## Get all possible put pairs that match our criteria:
    ##  1. Days to Expiration (DTE): 20-30 days
    ##  2. Strike Selection:
    ##  3. Higher (Short) Put: 28-40 delta
    ##  4. Lower (Long) Put: 1-2 strike steps below the short put
    ## -------------------------------------------------------
    def GetPutPairs(self, chain):
        from datetime import timedelta
        
        # Current time
        current_time = self.Time

        
        # Filter all puts with expiration within the desired DTE range
        eligible_puts = []
        for i in chain:
            if i.Right == OptionRight.Put:
                dte = i.Expiry - current_time
                if self.min_dte <= dte <= self.max_dte:
                    eligible_puts.append(i)
        

        # Filter puts with delta between -0.40 and -0.28
        high_puts = [put for put in eligible_puts if -0.40 <= put.Greeks.Delta <= -0.28]
                
        # Get unique strike prices sorted
        strike_prices = sorted(set(put.Strike for put in eligible_puts))
        
        # Initialize list for put pairs
        put_pairs = []

        # Loop over each high_put to find suitable low_puts
        for high_put in high_puts:
            high_strike = high_put.Strike
            expiry = high_put.Expiry
            
            # Find the index of the high_put strike in the sorted strike prices list
            high_strike_index = strike_prices.index(high_strike)
            
            # Define possible lower strikes as 1 or 2 steps below the high strike
            possible_low_strikes = strike_prices[max(0, high_strike_index - 2):high_strike_index]
            
            # Find puts that match the criteria for lower strikes
            low_puts = [put for put in eligible_puts if put.Expiry == expiry and put.Strike in possible_low_strikes]
            
            # Create pairs of high and low puts
            for low_put in low_puts:
                put_pairs.append((high_put, low_put))
        
        return put_pairs

    ## Get the Pair with the most favorable Reward/Risk ratio
    ## ------------------------------------------------------
    def GetTopRRRatioPair(self, put_pairs):
        # Initialize variables to keep track of the best pair and highest RR ratio
        top_pair = None
        highest_rr_ratio = float('-inf')

        # self.Debug("------------------------")
        # self.Debug("Pairs Under Consideration:")
        # self.Debug("spread | collected credit | reward-risk ratio")
        # self.Debug(self.Time.strftime("%Y-%m-%d %H:%M:%S"))
        
        # Iterate through each put pair
        for high_put, low_put in put_pairs:
            # Calculate collected credit (difference in strikes)
            collected_credit = round((high_put.BidPrice - low_put.AskPrice),2)
            if( collected_credit < 0):
                continue
            # Calculate max loss (difference in strikes minus collected credit)
            max_loss = (high_put.Strike - low_put.Strike) - collected_credit
            
            # Calculate RR ratio (collected credit divided by max loss)
            if max_loss > 0:  # Ensure max_loss is positive to avoid division by zero
                rr_ratio = round((collected_credit / max_loss),2)
                
                # Debug: Print RR ratio for the current pair
                # self.Debug(f"RR Ratio for pair ({high_put.Strike}, {low_put.Strike}): {rr_ratio}")

                # Update the top pair if the current RR ratio is higher than the highest found so far
                if rr_ratio > highest_rr_ratio:
                    highest_rr_ratio = rr_ratio
                    top_pair = (high_put, low_put)


        #     self.Debug(f" {high_put.Strike} ~ {low_put.Strike} | {collected_credit} | {rr_ratio}")        
        # self.Debug("------------------------")

        # Return the pair with the highest RR ratio
        return top_pair

    # Example usage within the same class:
    # top_pair = self.GetTopRRRatioPair(put_pairs)


    ## Get all the PCS pairs in the portfolio
    ## Identify put credit spreads in the portfolio.
    ##    
    ## :return: List of tuples, each containing (short_contract, long_contract)
    ## ------------------------------------------
    def GetPutCreditSpreadsInPortfolio(self):

        # Group put options by underlying symbol and expiration
        put_options = defaultdict(lambda: defaultdict(list))
        
        for holding in self.Portfolio.Values:
            if holding.Type == SecurityType.Option and \
                holding.Symbol.ID.OptionRight == OptionRight.Put and \
                holding.Invested:

                underlying = holding.Symbol.Underlying
                expiry = holding.Symbol.ID.Date
                put_options[underlying][expiry].append(holding)
        
        credit_spreads = []
        
        for underlying in put_options:
            for expiry in put_options[underlying]:
                # Sort puts by strike price, descending
                sorted_puts = sorted(put_options[underlying][expiry], 
                                     key=lambda x: x.Symbol.ID.StrikePrice, reverse=True)
                
                for i in range(len(sorted_puts) - 1):
                    higher_strike = sorted_puts[i]
                    lower_strike = sorted_puts[i + 1]
                    
                    # Check if it's a valid put credit spread
                    if (higher_strike.Quantity < 0 and lower_strike.Quantity > 0 and
                        higher_strike.Symbol.ID.StrikePrice > lower_strike.Symbol.ID.StrikePrice):
                        credit_spreads.append((higher_strike, lower_strike))
        
        return credit_spreads

    ## Calculate the percentage of max profit achieved for a put credit spread.
    ##
    ## :param short_position: The short put option position
    ## :param long_position: The long put option position
    ## :return: Percentage of max profit achieved (0-100)
    ## ------—------—------—------—------—------—------—------—------—------—
    def percent_max_profit_achieved(self, short_position, long_position):
        
        # Calculate initial and current net premiums
        initial_net_premium = abs(short_position.AveragePrice) - abs(long_position.AveragePrice)
        current_net_premium = self.GetNetPremium(short_position.Symbol, long_position.Symbol)
        # current_net_premium = abs(self.Securities[short_position.Symbol].Price) - abs(self.Securities[long_position.Symbol].Price)
        
        
        if initial_net_premium == 0:
            pctOfMaxProfit = 0  # Avoid division by zero
        else:
            percent = (initial_net_premium - current_net_premium) / initial_net_premium * 100
            pctOfMaxProfit = np.clip(percent, 0, 100)  # Ensure result is between 0% and 100%

        
        return pctOfMaxProfit

    ## Calculate the percentage of max loss achieved for a put credit spread.
    ##
    ## :param short_position: The short put option position
    ## :param long_position: The long put option position
    ## :return: Percentage of max loss achieved (0-100)
    ## ------------------------------------------------------------------------
    def percent_max_loss_achieved(self, short_position, long_position):

        # Calculate initial and current net premiums
        initial_net_premium = abs(short_position.AveragePrice) - abs(long_position.AveragePrice)
        current_net_premium = self.GetNetPremium(short_position.Symbol, long_position.Symbol)
        # current_net_premium = abs(self.Securities[short_position.Symbol].Price) - abs(self.Securities[long_position.Symbol].Price)

        # Calculate max loss
        max_loss = (short_position.Symbol.ID.StrikePrice - long_position.Symbol.ID.StrikePrice) - initial_net_premium

        if max_loss - initial_net_premium == 0:
            pctOfMaxLoss = 0  # Avoid division by zero
        else:
            percent = (current_net_premium - initial_net_premium) / (max_loss - initial_net_premium) * 100
            pctOfMaxLoss = np.clip(percent, 0, 100)  # Ensure result is between 0% and 100%

        return pctOfMaxLoss

    ## Calculate the return on the cost basis (credit received) of a put credit spread.
    ##
    ## :param short_position: The short put option position
    ## :param long_position: The long put option position
    ## :return: Percentage return on the credit received
    ## --------------------------------------------
    def calculate_return_on_credit(self, short_position, long_position):

        # Calculate the initial credit received (cost basis)
        initial_credit = abs(short_position.AveragePrice) - abs(long_position.AveragePrice)
        
        # Get the current cost to close the position
        current_cost = self.GetNetPremium(short_position.Symbol, long_position.Symbol)
        
        # Calculate the current profit/loss
        profit_loss = initial_credit - current_cost
        
        # Calculate the return as a percentage of the initial credit
        if initial_credit == 0:
            return 0  # Avoid division by zero
        
        return_percentage = round(((profit_loss / initial_credit) * 100),3)
        
        return return_percentage

    ## Calculate the current net premium of a put credit spread.
    ##
    ## :param short_symbol: Symbol of the short put option
    ## :param long_symbol: Symbol of the long put option
    ## :return: Current net premium of the spread
    ## ----------------------------------------------------------------
    def GetNetPremium(self, short_symbol, long_symbol):

        short_option = self.Securities[short_symbol]
        long_option = self.Securities[long_symbol]
        
        # Use ask price for short put (cost to buy back)
        short_price = short_option.AskPrice
        
        # Use bid price for long put (what we'd receive to sell)
        long_price = long_option.BidPrice
        
        # If ask or bid is zero or not available, fall back to last price
        if short_price == 0:
            short_price = short_option.Price
        if long_price == 0:
            long_price = long_option.Price
        
        # Calculate net premium
        net_premium = short_price - long_price
        
        return abs(net_premium)  # Return absolute value for consistency


######################################
## Custom Fill Model 
## TODO: Evluate necessity 
######################################
class CustomFillModel(FillModel):
    def MarketFill(self, asset, order):
        if order.Direction == OrderDirection.Buy:
            fill_price = asset.AskPrice
        else:
            fill_price = asset.BidPrice
        
        fill = super().MarketFill(asset, order)
        
        fill.FillPrice = fill_price
        
        return fill