Overall Statistics
Total Orders
481
Average Win
1.34%
Average Loss
-2.50%
Compounding Annual Return
14.246%
Drawdown
41.500%
Expectancy
0.114
Start Equity
10000000
End Equity
16972104.34
Net Profit
69.721%
Sharpe Ratio
0.433
Sortino Ratio
0.42
Probabilistic Sharpe Ratio
13.868%
Loss Rate
28%
Win Rate
72%
Profit-Loss Ratio
0.54
Alpha
0.071
Beta
0.208
Annual Standard Deviation
0.199
Annual Variance
0.039
Information Ratio
0.072
Tracking Error
0.225
Treynor Ratio
0.414
Total Fees
$70034.38
Estimated Strategy Capacity
$0
Lowest Capacity Asset
LO 32OBGVM6LT9B8|CL YPCDYN02Y5KX
Portfolio Turnover
0.64%
# region imports
from AlgorithmImports import *
import json
# endregion

# Load the JSON from a file
def load_backtest_config(algo: QCAlgorithm, json_path: str):
    try:
        with open(json_path, 'r') as file:
            backtest_config = json.load(file)
            return backtest_config["backtest_setup"]
    except FileNotFoundError:
        algo.debug(f"Error: Config file '{json_path}' not found.")
    except json.JSONDecodeError:
        algo.debug("Error: Failed to decode JSON config file.")

    return None
# region imports
from AlgorithmImports import *
from abc import abstractmethod, ABC
# endregion

class DeltaHedgeModule(ABC):
  def __init__(
    self, 
    algo: QCAlgorithm, 
    ) -> None:

    self._algo: QCAlgorithm = algo

  @abstractmethod
  def delta_hedge(self, symbol: Symbol) -> None:
      ...

  @abstractmethod
  def _get_delta_hedge_quantity(self, symbol: Symbol) -> float:
      ...
# region imports
from AlgorithmImports import *
from delta_hedge_module import DeltaHedgeModule
import QuantLib as ql 
# endregion

class FuturesOptionsDeltaHedgeModule(DeltaHedgeModule):

    def __init__(
        self, 
        algo: QCAlgorithm
    ) -> None:

        super(FuturesOptionsDeltaHedgeModule, self).__init__(algo)

    def delta_hedge(self, symbol: Symbol) -> None:
        delta_hedge_quantity: float = self._get_delta_hedge_quantity(symbol)
        if delta_hedge_quantity != 0:
            self._algo.market_order(symbol, delta_hedge_quantity)

    def _get_delta_hedge_quantity(self, symbol: Symbol) -> float:
        chains: List[OptionChain] = [
            opt_chain for opt_chain in self._algo.current_slice.option_chains.values()
            if opt_chain.underlying.symbol == symbol
        ]
        if not chains:
            self._algo.log(f'DeltaHedgeModule._get_delta_hedge_quantity: No option chain found for {symbol}. Number of option chains present in the algorithm {self._algo.current_slice.option_chains.count}')
            return 0.
        
        chain: OptionChain = chains[0]

        contracts_in_portfolio: List[OptionContract] = [
            contract.value for contract in chain.contracts
            if self._algo.securities[contract.value.symbol].type == SecurityType.FUTURE_OPTION
            and self._algo.portfolio[contract.value.symbol].invested
        ]

        delta_hedge_quantity: float = np.floor(sum([
            c.greeks.delta * self._algo.portfolio[c.symbol].quantity
            for c in contracts_in_portfolio
        ]) / self._algo.securities[symbol].ask_price)

        additional_quantity: float = delta_hedge_quantity - self._algo.portfolio[symbol].quantity
        
        return additional_quantity

    def black_model(
        self,
        option_price: float, 
        forward_price: float, 
        strike_price: float, 
        option_type: int,
        expiration_date: datetime, 
        calc_date: datetime,
        discount_factor: float = 1
        ) -> Tuple[float, float]:

        implied_vol = ql.blackFormulaImpliedStdDev(option_type, strike_price, forward_price, option_price, discount_factor)
        strikepayoff = ql.PlainVanillaPayoff(option_type, strike_price)
        black = ql.BlackCalculator(strikepayoff, forward_price, implied_vol, discount_factor)
        t = (expiration_date - calc_date).days / 360
        implied_vol = implied_vol / np.sqrt(t)

        return implied_vol, black.delta(discount_factor * forward_price)
# region imports
from AlgorithmImports import *
# endregion

class ImpliedVolatilityIndicator(PythonIndicator):
    def __init__(
        self, 
        name: str, 
        algo: QCAlgorithm,
        continuous_future: Future, 
        moneyness: float,
        target_expiry: int, # NOTE not used yet
    ) -> None:

        super().__init__()
        
        self._algo: QCAlgorithm = algo
        self._selected_option: Optional[Option] = None
        self._moneyness: float = moneyness
        self._continuous_future: Future = continuous_future

        self.name: str = name
        self.value: float = 0.

    @property
    def mapped(self) -> Symbol:
        return self._continuous_future.mapped

    def update(self, input: TradeBar) -> bool:
        if not isinstance(input, TradeBar):
            raise TypeError('ImpliedVolatilityIndicator.update: input must be a TradeBar')

        if self._selected_option is None or \
            (self._selected_option is not None and self._selected_option.expiry < self._algo.time):

            # NOTE could pass input.symbol instead of self.mapped inside the function but for some reason mapped symbol is not rolled yet here (only in this class)
            # self._selected_option = self._find_atm_option(input.symbol)
            self._selected_option = self._find_atm_option()

        if self._selected_option is not None:
            self.value = self._get_implied_volatility()

        return self._selected_option is not None

    def _find_atm_option(self) -> None:
        option_contract_symbols: List[Symbol] = self._algo.option_chain_provider.get_option_contract_list(self.mapped, self._algo.time)

        if len(option_contract_symbols) == 0:
            self._algo.log(f'ImpliedVolatilityIndicator._find_atm_option: No options in option chain found for: {self.mapped}')
            return None

        expiry: datetime.datetime = min(map(lambda x: x.id.date, option_contract_symbols))
        underlying_price: float = self._algo.securities[self.mapped].ask_price

        contracts: List[Symbol] = [
            x for x in option_contract_symbols 
            if x.id.strike_price != 0 
            and x.id.date == expiry
            and (1 - self._moneyness) <= abs(underlying_price / x.id.strike_price) <= (1 + self._moneyness)
        ]
        
        if len(contracts) == 0:
            self._algo.log(f'ImpliedVolatilityIndicator._find_atm_option: No options filtered for: {self.mapped}')
            return None

        strike: float = max(map(lambda x: x.id.strike_price, contracts))

        atm_option: Symbol = next(filter(lambda x: x.id.option_right == OptionRight.CALL and x.id.date == expiry and x.id.strike_price == strike, contracts))
        selected_option: Option = self._algo.add_future_option_contract(atm_option)

        return selected_option
        
    def _get_implied_volatility(self) -> float:
        chains: List[OptionChain] = [
            opt_chain for opt_chain in self._algo.current_slice.option_chains.values()
            if opt_chain.underlying.symbol == self.mapped
        ]
        if not chains:
            self._algo.log(f'DeltaHedgeModule._get_implied_volatility: No option chain found for {self.mapped}. Number of option chains present in the algorithm {self._algo.current_slice.option_chains.count}')
            return 0.
        
        chain: OptionChain = chains[0]

        contracts: List[OptionContract] = [
            contract.value for contract in chain.contracts
            if self._algo.securities[contract.value.symbol].type == SecurityType.FUTURE_OPTION
            and contract.value.symbol == self._selected_option.symbol
        ]
        if len(contracts) == 0:
            self._algo.log(f'DeltaHedgeModule._get_implied_volatility: No options filtered for future: {self.mapped}, and option {self._selected_option.symbol}')
            return 0.
        else:
            return contracts[0].implied_volatility
# region imports
from AlgorithmImports import *
from collections import deque
from typing import Optional
# endregion

class RealizedVolatilityIndicator(PythonIndicator):
    def __init__(
        self,
        algo: QCAlgorithm, 
        symbol: Symbol, 
        name: str, 
        daily_period: int,
        auto_updates: bool, 
        resolution: Resolution = Resolution.DAILY
    ) -> None:

        super().__init__()

        self._daily_period: int = daily_period
        self._auto_updates: bool = auto_updates

        self.name: str = name
        self.value: float = 0.

        self._first_bar: Optional[TradeBar] = None
        self._recent_bar: Optional[TradeBar] = None
        self._return_values = deque()
        self._rolling_sum: float = 0.
        self._rolling_sum_of_squares: float = 0.
        self._n: Optional[float] = None
        
        # register indicator for automatic updates
        if auto_updates:
            algo.register_indicator(symbol, self, resolution)

    @property
    def is_auto_updated(self) -> bool:
        return self._auto_updates

    def update(self, input: TradeBar) -> bool:
        if not isinstance(input, TradeBar):
            raise TypeError('RealizedVolatilityIndicator.update: input must be a TradeBar')
        
        if not self._first_bar:
            # store first bar
            self._first_bar = input
        else:
            log_return: float = np.log(input.close / self._recent_bar.close)
            self._return_values.append(log_return)
            
            # update rolling sums
            self._rolling_sum += log_return
            self._rolling_sum_of_squares += log_return ** 2

            is_ready: bool = (input.end_time - self._first_bar.time).days >= self._daily_period
            if is_ready:
                # store number of bars
                if not self._n:
                    self._n = len(self._return_values)
                
                mean_value_1: float = self._rolling_sum / self._n
                mean_value_2: float = self._rolling_sum_of_squares / self._n

                # adjust rolling sums
                removed_return: float = self._return_values.popleft()
                self._rolling_sum -= removed_return
                self._rolling_sum_of_squares -= removed_return ** 2

                self.value = np.sqrt(mean_value_2 - (mean_value_1 ** 2))
        
        self._recent_bar = input
        
        return self._n is not None
# region imports
from AlgorithmImports import *
from enum import Enum
from dataclasses import dataclass
from option_construction.future_options_straddle import FutureOptionsStraddle
from option_construction.future_options_call import FutureOptionsCall
from delta_hedge.futures_options_delta_hedge_module import FuturesOptionsDeltaHedgeModule
from indicator.realized_volatility_indicator import RealizedVolatilityIndicator
from indicator.implied_volatility_indicator import ImpliedVolatilityIndicator
# endregion

class FinecoFuturesOptionStrategy(QCAlgorithm):

    def initialize(self) -> None:
        self.set_start_date(2021, 1, 1)
        self.set_cash(10_000_000)
        
        self._moneyness: float = 0.01

        # traded futures
        self._future: Future = self.add_future(
            Futures.Energy.CRUDE_OIL_WTI,
            extended_market_hours=True,
            data_mapping_mode=DataMappingMode.LAST_TRADING_DAY, #OPEN_INTEREST
            data_normalization_mode=DataNormalizationMode.BACKWARDS_RATIO,
            contract_depth_offset=0, 
            resolution=Resolution.MINUTE
        )
        self._future.set_filter(0, 60)

        # delta hedge module
        self._delta_hedge_module: FuturesOptionsDeltaHedgeModule = FuturesOptionsDeltaHedgeModule(self)

        # option construction
        # self._opt_construction: FutureOptionsConstruction = FutureOptionsCall(
        self._opt_construction: FutureOptionsConstruction = FutureOptionsStraddle(
            self, 
            self._future, 
            self._moneyness
        )
        
        # realized volatility indicator
        vol_period: int = 21
        self._realized_vol_indicator: RealizedVolatilityIndicator = RealizedVolatilityIndicator(
            self,
            self._future.symbol, 
            'RealizedVolatility',
            daily_period = vol_period,
            auto_updates = False,
            resolution = Resolution.HOUR # only relevant with auto_updates = True
        )
        
        # implied volatility indicator
        self._implied_vol_indicator: ImpliedVolatilityIndicator = ImpliedVolatilityIndicator(
            'ImpliedVolatility',
            self,
            self._future,
            self._moneyness,
            21 # NOTE not used yet
        )
        self.register_indicator(self._future.symbol, self._implied_vol_indicator, Resolution.DAILY)

        # charting
        # TODO move to charting class
        self._vol_chart_name: str = "Implied vs Realized Volatility"
        vol_chart: Chart = Chart(self._vol_chart_name)
        vol_chart.add_series(Series("Realized Volatility", SeriesType.LINE, '%'))
        vol_chart.add_series(Series("Implied Volatility", SeriesType.LINE, '%'))
        self.add_chart(vol_chart)

        # get last known price after subscribing option contract
        self.set_security_initializer(CustomSecurityInitializer(self.brokerage_model, FuncSecuritySeeder(self.get_last_known_prices)))
        self.settings.daily_precise_end_time = True

        self.schedule.on(
            self.date_rules.every_day(self._future.symbol),
            # self.time_rules.at(13, 0),
            self.time_rules.after_market_open(self._future.symbol, 30),
            self._rebalance_construction
        )
        
        self.schedule.on(
            self.date_rules.every_day(self._future.symbol),
            # self.time_rules.at(14, 0),
            self.time_rules.after_market_open(self._future.symbol, 10),
            self._delta_hedge
        )

    def on_data(self, slice: Slice) -> None:
        pass

    def _rebalance_construction(self) -> None:
        self._opt_construction.rebalance_construction()
        
        # plot volatility indicators
        if self._realized_vol_indicator.value != 0:
            self.plot(self._vol_chart_name, "Realized Volatility", self._realized_vol_indicator.value * 100)
        
        if self._implied_vol_indicator.value != 0:
            self.plot(self._vol_chart_name, "Implied Volatility", self._implied_vol_indicator.value * 100)

    def _delta_hedge(self) -> None:
        # update indicator manually
        if not self._realized_vol_indicator.is_auto_updated:
            self._realized_vol_indicator.update(
                self.current_slice.bars.get(self._future.symbol)
            )

        self._delta_hedge_module.delta_hedge(self._future.mapped)
    
    # TODO move this to construction class?
    def on_symbol_changed_events(self, symbol_changed_events) -> None:
        # track when the continuous contract switches from one contract to the next
        for symbol, changed_event in symbol_changed_events.items():
            if self._opt_construction.mapped.value == symbol.value:
                old_symbol: Symbol = changed_event.old_symbol
                new_symbol: Symbol = changed_event.new_symbol

                self._opt_construction.find_construction()

class CustomSecurityInitializer(BrokerageModelSecurityInitializer):
    def __init__(self, brokerage_model: IBrokerageModel, security_seeder: ISecuritySeeder) -> None:
        super().__init__(brokerage_model, security_seeder)    

    def initialize(self, security: Security) -> None:
        super().initialize(security)

        # overwrite the price model
        if security.type == SecurityType.OPTION: # option type
            security.price_model = OptionPriceModels.crank_nicolson_fd()
# region imports
from AlgorithmImports import *
from option_construction.future_options_construction import FutureOptionsConstruction
# endregion

class FutureOptionsCall(FutureOptionsConstruction):
  def __init__(
    self, 
    algo: QCAlgorithm, 
    continuous_future: Future, 
    moneyness: float
  ) -> None:

    super(FutureOptionsCall, self).__init__(algo, continuous_future)
    self._moneyness: float = moneyness
    
    self._call: Optional[Option] = None

  @property
  def _has_construction(self) -> bool:
    return self._call is not None
  
  @property
  def _construction_quantity(self) -> bool:
    return self._algo.portfolio[self._call.symbol].quantity
  
  def find_construction(self) -> None:
    self._close_construction_position()

    option_contract_symbols: List[Symbol] = self._algo.option_chain_provider.get_option_contract_list(self.mapped, self._algo.time)

    if len(option_contract_symbols) == 0:
      self._algo.log(f'FutureOptionsCall.find_construction: No options in option chain found for: {self.mapped}')
      return

    expiry: datetime.datetime = min(map(lambda x: x.id.date, option_contract_symbols))
    underlying_price: float = self._algo.securities[option_contract_symbols[0].underlying].ask_price

    contracts: List[Symbol] = [
      x for x in option_contract_symbols 
      if x.id.strike_price != 0 
      and x.id.date == expiry
      and (1 - self._moneyness) <= abs(underlying_price / x.id.strike_price) <= (1 + self._moneyness)
    ]
    
    if len(contracts) == 0:
      self._algo.log(f'FutureOptionsCall.find_construction: No options filtered for: {self.mapped}')
      return

    strike: float = max(map(lambda x: x.id.strike_price, contracts))

    atm_call: Symbol = next(filter(lambda x: x.id.option_right == OptionRight.CALL and x.id.date == expiry and x.id.strike_price == strike, contracts))
    self._call = self._algo.add_future_option_contract(atm_call)

  def rebalance_construction(self) -> None:
    if self._has_construction:
      # construction is initialized
      if self._construction_quantity != 0:
        # construction is traded
        if self._call.expiry < self._algo.time:
          # construction is expired
          self._close_construction_position()
        else:
          # hold position
          return
      else:
        self._open_construction_position()

  def _open_construction_position(self) -> None:
    underlying_price: float = self._algo.securities[self.mapped].ask_price
    if underlying_price != 0:
      options_quantity: int = self._algo.portfolio.total_portfolio_value // (self._call.contract_multiplier * underlying_price)
      self._algo.sell(self._call.symbol, options_quantity)

  def _close_construction_position(self) -> None:
    if self._has_construction:
      # unsubscribe options and close underlying position
      self._algo.remove_option_contract(self._call.symbol)
      
    self._call = None
# region imports
from AlgorithmImports import *
from abc import abstractmethod, ABC
# endregion

class FutureOptionsConstruction(ABC):
  def __init__(
    self, 
    algo: QCAlgorithm, 
    continuous_future: Future
    ) -> None:

    self._algo = algo
    self._continuous_future: Future = continuous_future
  
  @property
  def mapped(self) -> Symbol:
    return self._continuous_future.mapped

  @abstractmethod
  def _has_construction(self) -> bool:
    ...

  @abstractmethod
  def find_construction(self) -> None:
    ...

  @abstractmethod
  def rebalance_construction(self) -> None:
    ...
# region imports
from AlgorithmImports import *
from option_construction.future_options_construction import FutureOptionsConstruction
# endregion

class FutureOptionsStraddle(FutureOptionsConstruction):
  def __init__(
    self, 
    algo: QCAlgorithm, 
    continuous_future: Future, 
    moneyness: float
    ) -> None:

    super(FutureOptionsStraddle, self).__init__(algo, continuous_future)
    self._moneyness: float = moneyness
    
    # straddle
    self._call: Optional[Option] = None
    self._put: Optional[Option] = None

  @property
  def _has_construction(self) -> bool:
    return self._call is not None and self._put is not None
  
  @property
  def _construction_quantity(self) -> bool:
    return self._algo.portfolio[self._call.symbol].quantity

  def find_construction(self) -> None:
    self._close_construction_position()

    option_contract_symbols: List[Symbol] = self._algo.option_chain_provider.get_option_contract_list(self.mapped, self._algo.time)

    if len(option_contract_symbols) == 0:
      self._algo.log(f'FutureOptionsStraddle.find_construction: No options in option chain found for: {self.mapped}')
      return

    expiry: datetime.datetime = min(map(lambda x: x.id.date, option_contract_symbols))
    underlying_price: float = self._algo.securities[option_contract_symbols[0].underlying].ask_price

    contracts: List[Symbol] = [
      x for x in option_contract_symbols if x.id.date == expiry and x.id.strike_price != 0 and (1 - self._moneyness) <= abs(underlying_price / x.id.strike_price) <= (1 + self._moneyness)
    ]
    
    if len(contracts) == 0:
      self._algo.log(f'FutureOptionsStraddle.find_construction: No options filtered for: {self.mapped}')
      return

    strike: float = max(map(lambda x: x.id.strike_price, contracts))

    atm_call: Symbol = next(filter(lambda x: x.id.option_right == OptionRight.CALL and x.id.date == expiry and x.id.strike_price == strike, contracts))
    atm_put: Symbol = next(filter(lambda x: x.id.option_right == OptionRight.PUT and x.id.date == expiry and x.id.strike_price == strike, contracts))

    self._call = self._algo.add_future_option_contract(atm_call)
    self._put = self._algo.add_future_option_contract(atm_put)

  def rebalance_construction(self) -> None:
    if self._has_construction:
      # construction is initialized
      if self._construction_quantity != 0:
        # construction is traded
        if self._call.expiry < self._algo.time:
          # construction is expired
          self._close_construction_position()
        else:
          # hold position
          return
      else:
        self._open_construction_position()

  def _open_construction_position(self) -> None:
    underlying_price: float = self._algo.securities[self.mapped].ask_price
    if underlying_price != 0:
      options_quantity: int = self._algo.portfolio.total_portfolio_value // (self._call.contract_multiplier * underlying_price)
      self._algo.sell(self._call.symbol, options_quantity)
      self._algo.sell(self._put.symbol, options_quantity)

  def _close_construction_position(self) -> None:
    if self._has_construction:
      # unsubscribe options and close underlying position
      for c in [self._call, self._put]:
        self._algo.remove_option_contract(c.symbol)
      
      underlying_symbol: Symbol = self._call.underlying.symbol
      self._algo.market_order(underlying_symbol, -self._algo.portfolio[underlying_symbol].quantity)
    
    self._call = None
    self._put = None