Created with Highcharts 12.1.2EquityJan 2024Jan…Feb 2024Mar 2024Apr 2024May 2024Jun 2024Jul 2024Aug 2024Sep 2024Oct 2024Nov 2024Dec 2024Jan 2025Feb 202595k100k105k110k-4-2001010200M050k100k0510
Overall Statistics
Total Orders
48
Average Win
0.52%
Average Loss
-0.30%
Compounding Annual Return
4.783%
Drawdown
2.800%
Expectancy
0.554
Start Equity
100000
End Equity
105199.46
Net Profit
5.199%
Sharpe Ratio
-0.683
Sortino Ratio
-0.759
Probabilistic Sharpe Ratio
51.869%
Loss Rate
43%
Win Rate
57%
Profit-Loss Ratio
1.75
Alpha
-0.042
Beta
0.161
Annual Standard Deviation
0.032
Annual Variance
0.001
Information Ratio
-1.619
Tracking Error
0.092
Treynor Ratio
-0.134
Total Fees
$72.00
Estimated Strategy Capacity
$75000000.00
Lowest Capacity Asset
EWG R735QTJ8XC9X
Portfolio Turnover
1.57%
from AlgorithmImports import *
from datetime import datetime, timedelta


def get_third_friday(year, month):
    """
    Get the date of the third Friday of the given month and year.
    """
    # Start with the first day of the month
    first_day = datetime(year, month, 1)
    first_friday = first_day + timedelta(days=(4 - first_day.weekday() + 7) % 7)
    
    # Add two weeks to get to the third Friday
    third_friday = first_friday + timedelta(weeks=2)
    
    return third_friday.date()

def is_monthly_expiration_week(current_date):
    """
    Check if current date is in the monthly options expiration week.
    Returns:
        tuple: (bool, int, str) - (is_expiration_week, days_until_expiration, day_description)
              days_until_expiration will be:
              2 for Wednesday before expiration
              1 for Thursday before expiration
              0 for expiration Friday
              -1 if not in expiration week
              day_description will describe the current day (e.g., 'Wednesday before expiration').
    """

    third_friday = get_third_friday(current_date.year, current_date.month)
    
    # Find the Wednesday of expiration week
    expiration_wednesday = third_friday - timedelta(days=2)  # Wednesday
    expiration_thursday = third_friday - timedelta(days=1)    # Thursday
    
    # Calculate days until expiration
    days_until = (third_friday - current_date).days
    
    # Check if current date is within Wednesday to Friday of expiration week
    if expiration_wednesday <= current_date <= third_friday:
        if days_until == 2:
            return 2
        elif days_until == 1:
            return 1
    
    return False
# region imports
from AlgorithmImports import *
from date_utils import get_third_friday, is_monthly_expiration_week
# endregion

class MonthlyIVAlgorithm(QCAlgorithm):
    def Initialize(self):
        self.set_start_date(2024, 1, 1)
        self.set_end_date(2025, 1, 30)
        self.InitCash = 100000
        self.set_cash(self.InitCash)

        # Strategy Parameters - can be optimized
        self.params = {
            # Portfolio Construction
            'low_iv_allocation': 0.33,      # Allocation to low IV assets
            'mid_iv_allocation': 0.33,      # Allocation to mid IV assets
            'high_iv_allocation': 0.33,     # Allocation to high IV assets
            
            # IV Categorization
            'low_iv_percentile': 25,       # Threshold for low IV category (0-25th percentile)
            'high_iv_percentile': 75,      # Threshold for high IV category (75-100th percentile)
            
            # Risk Management
            'min_sharpe_ratio': 0.0,       # Minimum Sharpe ratio for inclusion
            'max_position_size': 0.15,     # Maximum allocation % to any single position
            'min_position_size': 0.01,     # Minimum allocation % to any single position
            
            # Volatility Parameters
            'lookback_period': 252,        # Days for calculating Sharpe ratio
            'risk_free_rate': 0.04,        # Annual risk-free rate for Sharpe calculation
            
            # Rebalancing Parameters
            'days_before_expiry': 1,       # Days before expiry to rebalance
            
            # Sector/Asset Constraints
            'max_correlation': 0.7,        # Maximum correlation between assets
            
            # Technical Indicators
            'use_trend_filter': True,      # Use trend filter for entries
            'trend_lookback': 50,          # Days for trend calculation
            'momentum_lookback': 20,       # Days for momentum calculation
            'rsi_oversold': 30,            # RSI oversold threshold
            'rsi_overbought': 70,          # RSI overbought threshold
        }

        self.MKT = self.AddEquity("SPY", Resolution.Daily).Symbol
        self.spy = []

        self.tickers = ["TLT", "LQD", "IEF", "BNDX", "EMB", "XLU", "GLD", "SLV", "UNG", "CPER", "DBA", "XLP", "NLR",
                       "SPY", "ACWI", "IWM", "EFA", "EWG", "EWU", "EWJ", "IYR", "VNQI", "XLF", "XLB", "XLE", "IXJ",
                       "ARKK", "SMH", "XLK", "CQQQ", "EMQQ", "FXI", "EEM", "EWZ", "INDA", "EWT", "ILF", "EUFN", "XBI", "XRT", "XHB", "XOP", "XLY"]


        self.symbols = [self.add_equity(ticker, Resolution.DAILY).Symbol for ticker in self.tickers]

        self.universe_settings.asynchronous = True

        for symbol in self.symbols:
            option = self.add_option(symbol, Resolution.DAILY)
            option.set_filter(self._contract_selector)
        
        self.Schedule.On(self.DateRules.EveryDay(), self.TimeRules.BeforeMarketClose('SPY', 0), 
            self.record_vars)


    def _contract_selector(self, option_filter_universe: OptionFilterUniverse) -> OptionFilterUniverse:
        return (
            option_filter_universe
            .include_weeklys()
            #.include_weeklys(False) # monthlies only
            .expiration(0, 45)
            .IV(0, 100)    # Filter contracts between 0% and 100%
        )
   
    def get_forward_implied_volatility(self, underlying_symbol):
        # Fetch the option chain for the given symbol
        option_chain = self.option_chain(underlying_symbol, flatten=True).data_frame
        self.Debug(f"Fetched option chain for {underlying_symbol} at {self.Time}")

        if option_chain is None or option_chain.empty:
            self.Debug(f"Option chain empty: {option_chain}")
            self.Log(f"Option chain empty: {option_chain}")
            return None

        # Calculate the next month
        next_month = self.Time.month % 12 + 1  # Get the next month
        next_year = self.Time.year if next_month > self.Time.month else self.Time.year + 1

        # Filter for options expiring in the next month (long-term options)
        next_month_options = option_chain[
            (option_chain['expiry'].dt.month == next_month) &
            (option_chain['expiry'].dt.year == next_year)
        ]

        if next_month_options.empty:
            self.error(f"No options available for the next month ({next_month})")
            self.Log(f"No options available for the next month ({next_month})")
            return None

        # Filter for options expiring this month (short-term options)
        current_month_options = option_chain[
            (option_chain['expiry'].dt.month == self.Time.month) &
            (option_chain['expiry'].dt.year == self.Time.year)
        ]

        if current_month_options.empty:
            self.error(f"No options available for the current month ({self.Time.month})")
            self.Log(f"No options available for the current month ({self.Time.month})")
            return None

        # Calculate moneyness for both sets of options
        underlying_price = self.Securities[underlying_symbol].Price
        option_chain['moneyness'] = option_chain['strike'] / underlying_price

        # Select ATM options for current month based on moneyness
        atm_threshold = 0.05  # Only options within 5% of ATM are selected.
        short_term_atm_options = current_month_options[
            abs(option_chain['moneyness'] - 1) <= atm_threshold
        ]
        # Select ATM options for next month
        long_term_atm_options = next_month_options[
            abs(option_chain['moneyness'] - 1) <= atm_threshold
        ]

        if short_term_atm_options.empty:
            self.error(f"No ATM options available for the current month ({self.Time.month})")
            self.Log(f"No ATM options available for the current month ({self.Time.month})")
            return None

        if long_term_atm_options.empty:
            self.error(f"No ATM options available for the next month ({next_month})")
            self.Log(f"No ATM options available for the next month ({next_month})")
            return None

        # Extract the implied volatility for both sets of ATM options
        short_term_iv = short_term_atm_options['impliedvolatility'].mean()
        long_term_iv = long_term_atm_options['impliedvolatility'].mean()

        # Calculate the time to expiry between options (in years)
        T1 = 0
        T2 = (next_month_options['expiry'].iloc[0] - self.Time).days / 365.0

        if T2 == T1:
            self.Debug(f"Error: Time to expiry for both options is the same for {underlying_symbol}. Cannot calculate forward IV.")
            self.Log(f"Error: Time to expiry for both options is the same for {underlying_symbol}. Cannot calculate forward IV.")
            return None

        if (T2 * long_term_iv**2 - T1 * short_term_iv**2) < 0:
            self.Debug(f"Error: The calculated value for forward IV is negative for {underlying_symbol}, T1={T1} T2={T2}")
            self.Log(f"Error: The calculated value for forward IV is negative for {underlying_symbol}, T1={T1} T2={T2}")
            return None

        # Calculate forward implied volatility using the formula
        try:
            forward_iv = math.sqrt((T2 * long_term_iv**2 - T1 * short_term_iv**2) / (T2 - T1))
            return forward_iv
        except ValueError as e:
            self.Debug(f"Math error while calculating forward IV for {underlying_symbol}: {e}")
            return None

    def get_sharpe_ratio(self, symbol, lookback_period=None):
        """
        Calculate the Sharpe Ratio for a given symbol using QuantConnect's History API
        
        Args:
            symbol: The stock symbol to calculate Sharpe Ratio for
            lookback_period: Number of trading days to look back (default 1 year)
        
        Returns:
            float: The Sharpe Ratio value
        """
        if lookback_period is None:
            lookback_period = self.params.get('lookback_period', 252)

        # Get historical daily price data
        history = self.History(symbol, lookback_period, Resolution.Daily)
        if history.empty:
            self.Debug(f"No historical data found for {symbol}")
            return None
        
        # Calculate daily returns
        returns = history['close'].pct_change().dropna()
        
        # Calculate excess returns over risk-free rate
        risk_free_rate = self.params['risk_free_rate']
        excess_returns = returns - (risk_free_rate / self.params['lookback_period'])  # Daily risk-free rate
        
        # Calculate Sharpe Ratio
        sharpe_ratio = np.sqrt(self.params['lookback_period']) * (excess_returns.mean() / excess_returns.std())
        
        return sharpe_ratio
  
    def categorize_by_iv(self, forward_iv_values):

        # Convert to numpy array for percentile calculation
        iv_values = np.array(list(forward_iv_values.values()))

        p_low = np.percentile(iv_values, self.params['low_iv_percentile'])
        p_high = np.percentile(iv_values, self.params['high_iv_percentile'])
        
        # Calculate percentile thresholds
        p25 = np.percentile(iv_values, 25)
        p75 = np.percentile(iv_values, 75)
        
        
        # Initialize dictionaries for each category
        low_iv = {}
        mid_iv = {}
        high_iv = {}
        
        # Categorize each ticker based on its IV value
        for ticker, iv in forward_iv_values.items():
            if iv <= p_low:
                low_iv[ticker] = iv
            elif iv <= p_high:
                mid_iv[ticker] = iv
            else:
                high_iv[ticker] = iv
        
        self.Debug(f"Low IV (0-25th): {len(low_iv)} tickers")
        self.Debug(f"Mid IV (26-75th): {len(mid_iv)} tickers")
        self.Debug(f"High IV (76-100th): {len(high_iv)} tickers")
        
        return low_iv, mid_iv, high_iv

    def calculate_category_weights(self, category_tickers, sharpe_ratios):
        """
        Calculate weights within a category based on Sharpe ratios
        """
        # Get Sharpe ratios for tickers in this category (only positive values)
        category_sharpes = {ticker: sharpe_ratios[ticker] for ticker in category_tickers 
                          if ticker in sharpe_ratios and sharpe_ratios[ticker] > self.params['min_sharpe_ratio']}
        
        if not category_sharpes:
            return {}
            
        # Calculate total Sharpe ratio
        total_sharpe = sum(category_sharpes.values())
        
        # Calculate weights based on Sharpe ratio contribution
        weights = {ticker: sharpe/total_sharpe for ticker, sharpe in category_sharpes.items()}
        
        return weights

    def check_correlation(self, ticker, current_holdings=None):
        """
        Check if a ticker's correlation with existing holdings exceeds the threshold
        
        Args:
            ticker: The ticker to check
            current_holdings: Optional list of tickers to check against. If None, uses current portfolio
        
        Returns:
            bool: True if correlations are acceptable, False if too highly correlated
        """
        # If no holdings specified, get current invested holdings
        if current_holdings is None:
            current_holdings = [symbol.Value for symbol in self.Portfolio.Keys 
                              if self.Portfolio[symbol].Invested]
        
        # If no holdings, correlation check passes
        if not current_holdings:
            return True
            
        lookback = self.params['lookback_period']
        
        try:
            # Get historical data for ticker and holdings
            history = self.history([ticker] + current_holdings, 
                                 lookback, 
                                 Resolution.Daily)
            
            if history.empty:
                self.Debug(f"No historical data for correlation check: {ticker}")
                return False
                
            # Calculate price returns
            closes = history['close'].unstack(level=0)
            returns = closes.pct_change().dropna()
            
            # Calculate correlation matrix
            corr_matrix = returns.corr()
            
            # Ensure the ticker exists in the correlation matrix before accessing
            if ticker not in corr_matrix.columns:
                self.Debug(f"{ticker} not found in correlation matrix columns: {list(corr_matrix.columns)}")
                return False

            # Check correlations between ticker and holdings
            ticker_correlations = corr_matrix[ticker].drop(ticker, errors="ignore")
            max_corr = abs(ticker_correlations).max()
            
            self.Debug(f"\nCorrelation check for {ticker}:")
            for holding in current_holdings:
                corr = corr_matrix[ticker][holding]
                self.Debug(f"  {holding}: {corr:.2f}")
            self.Debug(f"Max correlation: {max_corr:.2f}")
            
            # Return True if max correlation is below threshold
            return max_corr <= self.params['max_correlation']
            
        except Exception as e:
            self.Debug(f"Error in correlation check for {ticker}: {str(e)}")
            return False

    def get_technical_signals(self, symbol):
        """Calculate technical indicators for filtering with debugging"""
        if not self.params['use_trend_filter']:
            self.Debug(f"{symbol} - Trend filter disabled. Returning True.")
            return True

        lookback = max(self.params['trend_lookback'], self.params['momentum_lookback']) + 5

        # Convert MemoizingEnumerable to a list
        history = list(self.History(symbol, lookback, Resolution.Daily))
        if not history or len(history) < self.params['trend_lookback']:
            self.Debug(f"{symbol} - Not enough historical data. History length: {len(history)}")
            return True

        # Convert history to DataFrame
        history_df = pd.DataFrame(
            [(bar.Time, bar.Close) for bar in history],
            columns=['time', 'close']
        ).set_index('time')
        #self.Debug(f"{symbol} - History DataFrame (last 5 rows):\n{history_df.tail()}")

        # Calculate daily returns from closing prices
        close_prices = history_df['close'].pct_change().dropna()
        #self.Debug(f"{symbol} - Daily returns (last 5 values):\n{close_prices.tail()}")

        # ----- Trend Indicator -----
        # Rolling simple moving average (SMA) of returns
        sma = close_prices.rolling(self.params['trend_lookback']).mean().dropna()
        if sma.empty:
            self.Debug(f"{symbol} - SMA series is empty. Not enough data for trend calculation.")
            trend_signal = False
        else:
            last_return = close_prices.iloc[-1]
            last_sma = sma.iloc[-1]
            trend_signal = last_return > last_sma
            self.Debug(f"{symbol} - Trend Indicator: Last return = {last_return:.4f}, "
                    f"SMA = {last_sma:.4f}, Trend Signal = {trend_signal}")

        # ----- Momentum Indicator -----
        # Calculate momentum on returns over momentum lookback period
        momentum = close_prices.pct_change(self.params['momentum_lookback']).dropna()
        if momentum.empty:
            self.Debug(f"{symbol} - Momentum series is empty. Not enough data for momentum calculation.")
            momentum_signal = False
        else:
            last_momentum = momentum.iloc[-1]
            momentum_signal = last_momentum > 0
            self.Debug(f"{symbol} - Momentum Indicator: Last momentum = {last_momentum:.4f}, "
                    f"Momentum Signal = {momentum_signal}")

        # ----- RSI Indicator -----
        # Compute RSI on the daily returns (note: typically RSI is calculated on prices)
        delta = close_prices.diff().dropna()
        gain = delta.where(delta > 0, 0).rolling(window=14).mean()
        loss = (-delta.where(delta < 0, 0)).rolling(window=14).mean()
        rs = gain / loss
        rsi = 100 - (100 / (1 + rs))
        if rsi.empty:
            self.Debug(f"{symbol} - RSI series is empty. Not enough data for RSI calculation.")
            return False  # No RSI data available

        rsi_value = rsi.iloc[-1]
        rsi_signal = self.params['rsi_oversold'] < rsi_value < self.params['rsi_overbought']
        self.Debug(f"{symbol} - RSI Indicator: Last RSI value = {rsi_value:.2f}, "
                f"RSI Signal = {rsi_signal}")

        # Combine all signals
        final_signal = trend_signal and momentum_signal and rsi_signal
        self.Debug(f"{symbol} - Final Technical Signal: {final_signal} "
                f"(Trend: {trend_signal}, Momentum: {momentum_signal}, RSI: {rsi_signal})")

        return final_signal



    def OnData(self, data):
        days_until_exp = is_monthly_expiration_week(self.Time.date())

        if days_until_exp != 1:
            return

        forward_iv_values = {}
        sharpe_ratios = {}
        valid_tickers = []

        for symbol in self.symbols:
            forward_iv = self.get_forward_implied_volatility(symbol)
            sharpe = self.get_sharpe_ratio(symbol)
            
            # Only include tickers with positive Sharpe, acceptable correlation, and trending
            if sharpe is not None and sharpe > self.params['min_sharpe_ratio']:
                if self.check_correlation(symbol.Value):
                    if self.get_technical_signals(symbol.Value):
                        sharpe_ratios[symbol.Value] = sharpe
                        valid_tickers.append(symbol.Value)
                        if forward_iv is not None:
                            forward_iv_values[symbol.Value] = forward_iv
                    else:
                        self.Debug(f"Excluding {symbol.Value} because it is not trending")
                else:
                    self.Debug(f"Excluding {symbol.Value} due to high correlation")
                    
        self.Debug(f"Tickers with positive Sharpe ratio: {len(sharpe_ratios)} on {self.time}")
        self.Debug(f"\nValid tickers after trend/correlation checks: {len(valid_tickers)}")
        self.Debug(f"Excluded tickers: {set(s.Value for s in self.symbols) - set(valid_tickers)}")
        

        if forward_iv_values:
            # Categorize tickers based on IV percentiles
            low_iv, mid_iv, high_iv = self.categorize_by_iv(forward_iv_values)
            
            # Portfolio allocations for each category
            category_allocations = {
                'low_iv': self.params['low_iv_allocation'],    # low-IV assets (0-25th percentile)
                'mid_iv': self.params['mid_iv_allocation'],    # mid-IV assets (26-75th percentile)
                'high_iv': self.params['high_iv_allocation']    # high-IV assets (76-100th percentile)
            }

            # Track current holdings for correlation checks
            current_holdings = []
            
            # Calculate weights within each category based on Sharpe ratios
            for category, tickers in [('low_iv', low_iv), ('mid_iv', mid_iv), ('high_iv', high_iv)]:
                category_weight = category_allocations[category]
                ticker_weights = self.calculate_category_weights(tickers.keys(), sharpe_ratios)

                # Set holdings for each ticker
                for ticker, weight in ticker_weights.items():
                    # Final correlation check against current holdings
                    if self.check_correlation(ticker, current_holdings):
                        symbol = self.Securities[ticker].Symbol
                        allocation = category_weight * weight

                        # Apply position size limits
                        allocation = min(allocation, self.params['max_position_size'])
                        if allocation >= self.params['min_position_size']:
                            self.set_holdings(symbol, allocation)
                            current_holdings.append(ticker)
                            self.Log(f"{ticker}: Category={category}, IV={forward_iv_values.get(ticker, 'N/A')}, " +
                                   f"Sharpe={sharpe_ratios.get(ticker, 'N/A')}, Weight={allocation:.4f}")
                            self.Debug(f"{ticker}: Category={category}, IV={forward_iv_values.get(ticker, 'N/A')}, " +
                                   f"Sharpe={sharpe_ratios.get(ticker, 'N/A')}, Weight={allocation:.4f}")
                    else:
                        self.Debug(f"Skipping {ticker} due to high correlation with current holdings")
            
                    
                    self.Debug(f"{ticker}: Category={category}, IV={forward_iv_values.get(ticker, 'N/A')}, " +
                           f"Sharpe={sharpe_ratios.get(ticker, 'N/A')}, Weight={allocation:.4f}")

            # Liquidate positions in tickers that no longer meet criteria
            for symbol in self.symbols:
                if (symbol.Value not in valid_tickers and 
                    self.Portfolio[symbol].Invested):
                    self.Liquidate(symbol)
                    self.Log(f"Liquidating {symbol.Value} due to correlation/Sharpe criteria")
                    self.Debug(f"Liquidating {symbol.Value} due to correlation/Sharpe criteria")
    
    # Plot SPY benchmark
    def record_vars(self):             
        hist = self.History(self.MKT, 2, Resolution.Daily)['close'].unstack(level= 0).dropna() 
        self.spy.append(hist[self.MKT].iloc[-1])
        spy_perf = self.spy[-1] / self.spy[0] * self.InitCash
        self.Plot('Strategy Equity', 'SPY', spy_perf)