Created with Highcharts 12.1.2EquityJan 2016Jan…Jul 2016Jan 2017Jul 2017Jan 2018Jul 2018Jan 2019Jul 2019Jan 2020Jul 2020Jan 2021Jul 2021Jan 2022Jul 2022Jan 2023Jul 2023Jan 2024Jul 2024Jan 2025Jul 2025400k2,000k-10000.006-0.0320-0.82.40120M0320k123602.4
Overall Statistics
Total Orders
190
Average Win
1.45%
Average Loss
-1.94%
Compounding Annual Return
6.961%
Drawdown
10.100%
Expectancy
0.345
Start Equity
1000000
End Equity
1848760
Net Profit
84.876%
Sharpe Ratio
0.476
Sortino Ratio
0.313
Probabilistic Sharpe Ratio
38.835%
Loss Rate
23%
Win Rate
77%
Profit-Loss Ratio
0.75
Alpha
0.009
Beta
0.179
Annual Standard Deviation
0.052
Annual Variance
0.003
Information Ratio
-0.48
Tracking Error
0.129
Treynor Ratio
0.14
Total Fees
$0.00
Estimated Strategy Capacity
$36000000.00
Lowest Capacity Asset
SPX 32PB0JBNCVVJI|SPX 31
Portfolio Turnover
0.08%
# region imports
from AlgorithmImports import *

from sklearn.cluster import KMeans
# endregion

class IVRankClustersAlgorithm(QCAlgorithm):

    def initialize(self):
        self.set_start_date(2016, 1, 1)
        self.set_end_date(2025, 2, 14)
        self.set_cash(1_000_000)
        self.set_security_initializer(BrokerageModelSecurityInitializer(self.brokerage_model, FuncSecuritySeeder(self.get_last_known_prices)))
        self._index = self.add_index('SPX')
        self._index.std = self.std(self._index.symbol, 22, Resolution.DAILY)
        self._option = self.add_index_option(self._index.symbol)
        self._option.set_filter(lambda universe: universe.include_weeklys().expiration(30, 90).strikes(-1, 1))  
        self._option.iv_rank = IVRank()
        self._option.strike_availability = StrikeAvailability()
        self._option.contract = None
        self.schedule.on(self.date_rules.every_day(self._index.symbol), self.time_rules.after_market_open(self._index.symbol, 1), self._rebalance)
        self.set_warm_up(timedelta(365))

    def _rebalance(self):
        # Update the Strike Availability indicator.
        chain = self.option_chain(self._index.symbol, flatten=True).data_frame
        if chain.empty:
            return
        if self._option.strike_availability.update(self.time, chain):
            self.plot('Strike Availability', 'Value', self._option.strike_availability.value)
            self.plot('Strike Availability', 'Label', self._option.strike_availability.label)
        # Update the IV Rank indicator.
        universe_chain = self.current_slice.option_chains.get(self._option.symbol)
        if not universe_chain or not self._option.iv_rank.update(universe_chain) or self.is_warming_up:
            return
        self.plot('IV Rank', 'Value', self._option.iv_rank.value)
        self.plot('IV Rank', 'Label', self._option.iv_rank.label)
        if self.portfolio.invested:
            if self._option.iv_rank.label == 2 and self._option.strike_availability.label == 2:
                self.liquidate(tag='IV rank and strike availability is high!')
            # If the contract expires soon, liquidate. 
            elif self._option.contract.id.date - self.time < timedelta(7):
                self.liquidate(tag=f'Expires within 7 days')
            # If the contract approaches ATM, liquidate.
            elif self._index.price <= self._option.contract.id.strike_price:
                self.liquidate(tag='ATM')
        # If IV Rank is low/moderate, we expect low/moderate volatility in the future. 
        # Sell ATM put contracts to collect premium. They should expire OTM since SPX has upward drift.
        elif self._option.iv_rank.label < 2 and self._option.strike_availability.label < 2:
            # Sell a contract (put; closest expiry after 30 days; strike is 3 STD(price, 22 days) below current price)
            chain = chain[(chain.expiry == chain.expiry[chain.expiry - self.time >= timedelta(30)].min()) & (chain.right == OptionRight.PUT) & (chain.strike <= self._index.price)].sort_values('strike')
            self._option.contract = chain.index[-min(int(3*self._index.std.current.value/5), len(chain))]
            self.add_option_contract(self._option.contract)
            self.set_holdings(self._option.contract, -0.25)


class StrikeAvailability:

    def __init__(self, lookback=252, period=10):
        self._roc = RateOfChange(period)
        self._roc.window.size = lookback
        self._roc.window.reset()
    
    def update(self, t, chain):
        self._roc.update(t, len(chain.strike.unique()) / chain.underlyinglastprice.iloc[0])
        self.is_ready = self._roc.window.is_ready
        if self.is_ready:
            self.value = self._roc.current.value
            kmeans = KMeans(n_clusters=3, random_state=0).fit(np.array([x.value for x in self._roc.window][::-1]).reshape(-1, 1))
            # Update the labels so that 0=Low, 1=medium, 2=high.
            label_map = {original: sorted_ for sorted_, original in enumerate(np.argsort(kmeans.cluster_centers_.ravel()))}
            labels = [label_map[label] for label in kmeans.labels_]
            # Save the label of the current value.
            self.label = labels[-1] # 0=Low, 1=Medium, 2=High
        return self.is_ready


class IVRank:

    def __init__(self, lookback=252, min_expiry=30):
        self._min_iv = Minimum(lookback)
        self._max_iv = Maximum(lookback)
        self._min_expiry = timedelta(min_expiry)
        self._history = RollingWindow[float](lookback)

    def update(self, chain):
        # Select contracts to use in the aggregation.
        #  1) Contracts have the closest expiry after 1 month.
        expiries = [c.id.date for c in chain if c.id.date >= chain.end_time + self._min_expiry]
        if not expiries:
            return
        expiry = min([c.id.date for c in chain if c.id.date >= chain.end_time + self._min_expiry])
        contracts = [c for c in chain if c.id.date == expiry]
        #  2) ATM contracts.
        abs_delta_by_symbol = {c.symbol: abs(c.underlying_last_price - c.id.strike_price) for c in contracts}
        abs_delta = min(abs_delta_by_symbol.values())
        contracts = [c for c in contracts if abs_delta_by_symbol[c.symbol] == abs_delta]
        # Aggregate the IVs of the selected contracts.
        agg_iv = float(np.median([c.implied_volatility for c in contracts]))
        self._history.add(agg_iv)
        # Calculate the IV Rank and determine if it's high, medium, or low.
        self._min_iv.update(chain.end_time, agg_iv)
        self.is_ready = self._max_iv.update(chain.end_time, agg_iv)
        if self.is_ready:
            self.value = float((agg_iv - self._min_iv.current.value) / (self._max_iv.current.value - self._min_iv.current.value))
            # Cluster the trailing IV Rank values into high, medium, and low groups.
            kmeans = KMeans(n_clusters=3, random_state=0).fit(np.array(list(self._history)[::-1]).reshape(-1, 1))
            # Update the labels so that 0=Low, 1=Medium, 2=High.
            label_map = {original: sorted_ for sorted_, original in enumerate(np.argsort(kmeans.cluster_centers_.ravel()))}
            labels = [label_map[label] for label in kmeans.labels_]
            # Save the label of the current value.
            self.label = labels[-1] # 0=Low, 1=Medium, 2=High
        return self.is_ready