Created with Highcharts 12.1.2Equity201020112012201320142015201620172018201920202021202220232024202520260320k0.751.35-32000.800.8-0.41.2020M040M060
Overall Statistics
Total Orders
835
Average Win
1.21%
Average Loss
-1.20%
Compounding Annual Return
7.121%
Drawdown
32.000%
Expectancy
0.226
Start Equity
100000
End Equity
284625.96
Net Profit
184.626%
Sharpe Ratio
0.345
Sortino Ratio
0.316
Probabilistic Sharpe Ratio
1.007%
Loss Rate
39%
Win Rate
61%
Profit-Loss Ratio
1.00
Alpha
0
Beta
0
Annual Standard Deviation
0.108
Annual Variance
0.012
Information Ratio
0.509
Tracking Error
0.108
Treynor Ratio
0
Total Fees
$5110.85
Estimated Strategy Capacity
$10000.00
Lowest Capacity Asset
SPY 32POP8D6MAAUE|SPY R735QTJ8XC9X
Portfolio Turnover
6.70%
# region imports
from AlgorithmImports import *
from datetime import timedelta
from pandas.core.frame import DataFrame
from vix_filter import VixFilter, VixFilterType
# endregion

class OptionTradeFunction(Enum):
    BUY = 1
    SELL = 2

class Options(QCAlgorithm):

    _equity_vix_filter_type: VixFilterType = VixFilterType.VIX_RATIO
    _equity_vix_filter_threshold: float = 1
    _equity_vix_filter_condition = staticmethod(lambda x, y: x > y) # meaning VIX_RATIO > 1 => trade SPY
    _market_asset_ticker: str = 'SPY'
    _safe_asset_ticker: str = 'IEF'

    _option_vix_filter_type: VixFilterType = VixFilterType.VIX_RANK
    _option_vix_filter_threshold: float = 0.5
    _option_vix_filter_condition = staticmethod(lambda x, y: x < y) # meaning VIX_RANK < 0.5 => trade option strategy
    
    _DTE: int = 30
    _OTM: float = 0.01
    _vix_rank_lookback: int = 150
    _percentage_traded: float = 1.
    
    _option_right: OptionRight = OptionRight.PUT
    _option_trade_function: OptionTradeFunction = OptionTradeFunction.SELL

    def initialize(self) -> None:
        self.init_cash: int = 100_000

        self.set_start_date(2010, 1, 1)
        self.set_cash(self.init_cash)

        self._equity: Equity = self.add_equity(self._market_asset_ticker, Resolution.MINUTE)
        self._equity.set_data_normalization_mode(DataNormalizationMode.RAW)
        self._benchmark_values: List[float] = []
        
        self._market: Symbol = self._equity.symbol
        
        # volatility filters
        self._equity_vf: VixFilter = VixFilter(
            self, 
            self._equity_vix_filter_type,
            self._equity_vix_filter_threshold,
            Options._equity_vix_filter_condition
        )

        self._option_vf: VixFilter = VixFilter(
            self, 
            self._option_vix_filter_type,
            self._option_vix_filter_threshold,
            Options._option_vix_filter_condition
        )

        # option storage
        self._contract: Optional[Symbol] = None
        self._contracts_added: Set = set()

        # safe asset
        self._safe_asset: Symbol = self.add_equity(self._safe_asset_ticker, Resolution.MINUTE).symbol

        # get last known price after subscribing option contract
        self.set_security_initializer(
            CustomSecurityInitializer(self.brokerage_model, FuncSecuritySeeder(self.get_last_known_prices))
        )
        self.set_brokerage_model(BrokerageName.INTERACTIVE_BROKERS_BROKERAGE, AccountType.MARGIN)
        self.settings.minimum_order_margin_portfolio_percentage = 0
        self.settings.daily_precise_end_time = False

        self._recent_day: int = -1

    def on_securities_changed(self, changes: SecurityChanges) -> None:
        for sec in changes.added_securities:
            if sec.type == SecurityType.EQUITY:
                sec.set_leverage(2)

    def on_data(self, slice: Slice) -> None:
        if self.is_warming_up:
            return
        
        if not self.is_market_open(self._market):
            return

        # once a day execution
        if self._recent_day != self.time.day:
            self._recent_day = self.time.day

            if not all(vf.is_ready() for vf in [self._equity_vf, self._option_vf]):
                return

            self._equity_vf.plot('Equity VIX Filter')
            self._option_vf.plot('Option VIX Filter')

            # trade equity asset
            self._equity_strategy(self._percentage_traded, self._equity_vf)

            # trade options
            self._option_strategy(self._percentage_traded, self._option_vf)

    def _equity_strategy(self, percentage_traded: float, vix_filter: VixFilter) -> None:
        traded_eq_asset: Symbol = self._market if vix_filter.trade_signal() else self._safe_asset
        eq_invested: List[Symbol] = [
            x.key for x in self.portfolio if x.value.invested and x.value.type == SecurityType.EQUITY
        ]
        filter_name, filter_value = vix_filter.get_value()
        if traded_eq_asset not in eq_invested:
            for symbol in eq_invested:
                self.liquidate(symbol, tag=f'{filter_name}: {filter_value}')

        if not self.portfolio[traded_eq_asset].invested:
            self.set_holdings(traded_eq_asset, percentage_traded, tag=f'{filter_name}: {filter_value}')

    def _option_strategy(self, percentage_traded: float, vix_filter: VixFilter) -> None:
        if self._contract is not None and self.time + timedelta(days=1) >= self._contract.id.date:
            self.remove_option_contract(self._contract)
            self._contract = None
        
        if self._contract is None:
            self._contract = self._filter_options(self._option_right, self._DTE, self._OTM)

        if self._contract is not None and self.current_slice.contains_key(self._contract):
            trade_option_strategy: bool = vix_filter.trade_signal()
            filter_name, filter_value = vix_filter.get_value()
            tag: str = f'{filter_name}: {filter_value}'

            options_invested: List[Symbol] = [
                x.key for x in self.portfolio if x.value.invested and x.value.type == SecurityType.OPTION
            ]
            if trade_option_strategy:
                if len(options_invested) == 0:
                    self._trade_option(self._contract, percentage_traded, tag)
            else:
                # liquidate options
                for symbol in options_invested:
                    self.liquidate(symbol, tag=tag)

    def _trade_option(self, contract: Symbol, percentage_traded: float, tag: str) -> None:
        if contract and not self.portfolio[contract].invested:
            trade_direction: int = 1 if self._option_trade_function == OptionTradeFunction.BUY else -1
            q: float = self._get_amount_as_fraction_of_portfolio(contract, percentage_traded)
            # q: float = self._get_amount_as_fraction_of_cash(contract, percentage_traded)
            # q: float = self.calculate_order_quantity(contract, percentage_traded)

            self.market_order(
                contract, 
                trade_direction * q, 
                tag=tag
            )

    def _filter_options(self, option_right: OptionRight, dte: int, moneyness: float) -> Optional[Symbol]:
        contracts: List[Symbol] = self.option_chain_provider.get_option_contract_list(self._market, self.time)
        
        underlying_price: float = self.securities[self._market].price
        
        if option_right == OptionRight.CALL:
            options: List[Symbol] = [
                i for i in contracts if i.id.option_right == OptionRight.CALL and   
                i.id.strike_price >= underlying_price * (1 + moneyness) and
                i.id.date >= self.time + timedelta(days=dte)
            ]
        else:
            options: List[Symbol] = [
                i for i in contracts if i.id.option_right == OptionRight.PUT and   
                i.id.strike_price <= underlying_price * (1 - moneyness) and
                i.id.date >= self.time + timedelta(days=dte)
            ]

        if len(options) > 0:
            expiry: datetime.datetime = min(list(map(lambda x: x.id.date, options)))

            index: int = 0 if option_right == OptionRight.PUT else -1
            contract: Symbol = sorted([o for o in options if o.id.date == expiry], key = lambda x: underlying_price - x.id.strike_price)[index]

            # prevent multiple subscriptions
            if contract not in self._contracts_added:
                self._contracts_added.add(contract)
                option = self.add_option_contract(contract, Resolution.MINUTE)
                option.is_tradable = True

            return contract
        else:
            self.log(f'Connot find filtered options for {self._market}')
            return None

    def on_end_of_day(self, symbol: Symbol) -> None:
        # print benchmark in main equity plot
        mkt_price_df: DataFrame = self.history(self._equity.symbol, 2, Resolution.DAILY)
        if not mkt_price_df.empty:
            benchmark_price: float = mkt_price_df['close'].unstack(level=0).iloc[-1]
            if len(self._benchmark_values) == 2:
                self._benchmark_values[-1] = benchmark_price
                benchmark_perf: float = self.init_cash * (self._benchmark_values[-1] / self._benchmark_values[0])
                self.plot('Strategy Equity', self._equity.symbol, benchmark_perf)
            else:
                self._benchmark_values.append(benchmark_price)

    def _get_amount_as_fraction_of_portfolio(self, option_symbol: Symbol, fraction: float) -> float:
        multiplier: int = self.securities[option_symbol].contract_multiplier
        target_notional: float = self.portfolio.total_portfolio_value * fraction
        notional_of_contract: float = multiplier * self.securities[option_symbol.underlying].price
        amount: float = target_notional / notional_of_contract

        return amount

    def _get_amount_as_fraction_of_cash(self, option_symbol: Symbol, fraction: float) -> float:
        holding_value: float = self.portfolio.cash
        multiplier: int = self.securities[option_symbol].contract_multiplier
        target_notional: float = holding_value * fraction
        notional_of_contract: float = multiplier * self.securities[option_symbol.underlying].price
        amount: float = target_notional / notional_of_contract

        return amount

class CustomSecurityInitializer(BrokerageModelSecurityInitializer):
    def __init__(self, brokerage_model: IBrokerageModel, security_seeder: ISecuritySeeder) -> None:
        super().__init__(brokerage_model, security_seeder)    

    def initialize(self, security: Security) -> None:
        super().initialize(security)

        # overwrite the price model
        if security.type == SecurityType.OPTION: # option type
            security.price_model = OptionPriceModels.black_scholes()
            security.set_option_assignment_model(NullOptionAssignmentModel())
# region imports
from AlgorithmImports import *
from enum import Enum
# endregion

class VixFilterType(Enum):
    VVIX = 1
    VIX_RANK = 2
    VIX_RATIO = 3

class VixFilter():
    def __init__(
        self,
        algo: QCAlgorithm, 
        vix_filter_type: VixFilterType, 
        vix_filter_value_threshold: float, 
        compare_fn: Callable
    ) -> None:

        self._algo: QCAlgorithm = algo
        self._vix_filter_type: VixFilterType = vix_filter_type
        self._vix_filter_value_threshold: float = vix_filter_value_threshold
        self._compare_fn: Callable = compare_fn
        
        self._signal_assets: List[Symbol] = self._subscribe_assets()

    def is_ready(self) -> bool:
        return all(
            self._algo.securities.contains_key(sa) and self._algo.securities[sa].get_last_data() and self._algo.securities[sa].price != 0 for sa in self._signal_assets
        )

    def _subscribe_assets(self) -> List[Symbol]:
        signal_assets: List[Symbol] = List[Symbol]

        if self._vix_filter_type == VixFilterType.VVIX:
            iv: Symbol = self._algo.add_data(CBOE, "VVIX", Resolution.DAILY).symbol
            signal_assets = [iv]
        elif self._vix_filter_type == VixFilterType.VIX_RANK:
            iv: Symbol = self._algo.add_data(CBOE, "VIX", Resolution.DAILY).symbol
            signal_assets = [iv]
        elif self._vix_filter_type == VixFilterType.VIX_RATIO:
            iv: Symbol = self._algo.add_data(CBOE, "VIX", Resolution.DAILY).symbol
            iv_3m: Symbol = self._algo.add_data(CBOE, "VIX3M", Resolution.DAILY).symbol
            signal_assets = [iv, iv_3m]
        
        return signal_assets

    def trade_signal(self) -> bool:
        result: bool = False

        if self._vix_filter_type == VixFilterType.VVIX:
            vvix: float = self._algo.securities[self._signal_assets[0]].price
            if self._compare_fn(vvix, self._vix_filter_value_threshold):
                result = True
        elif self._vix_filter_type == VixFilterType.VIX_RANK:
            vix_rank: float = self._get_VIX_rank(self._signal_assets[0])
            if self._compare_fn(vix_rank, self._vix_filter_value_threshold):
                result = True
        elif self._vix_filter_type == VixFilterType.VIX_RATIO:
            vix_ratio: float = self._algo.securities[self._signal_assets[0]].price / self._algo.securities[self._signal_assets[1]].price
            if self._compare_fn(vix_ratio, self._vix_filter_value_threshold):
                result = True

        return result

    def _get_VIX_rank(self, vix: Symbol, lookback: int = 150) -> float:
        history: DataFrame = self._algo.history(
            CBOE, vix, lookback, Resolution.DAILY
        )
        
        rank: float = ((self._algo.securities[vix].price - min(history["low"])) / (max(history["high"]) - min(history["low"])))
        return rank
    
    def get_value(self) -> Tuple[str, float]:
        if self._vix_filter_type == VixFilterType.VVIX:
            value: float = self._algo.securities[self._signal_assets[0]].price
        elif self._vix_filter_type == VixFilterType.VIX_RANK:
            value: float = self._get_VIX_rank(self._signal_assets[0])
        elif self._vix_filter_type == VixFilterType.VIX_RATIO:
            value: float = self._algo.securities[self._signal_assets[0]].price / self._algo.securities[self._signal_assets[1]].price
        
        return self._vix_filter_type.name, value

    def plot(self, chart_name: str) -> None:
        filter_name, filter_value = self.get_value()

        self._algo.plot(chart_name, filter_name, filter_value)