Created with Highcharts 12.1.2EquityJan 2021May 2021Sep 2021Jan 2022May 2022Sep 2022Jan 2023May 2023Sep 2023Jan 2024May 2024Sep 2024Jan 2025010M20M-10-5000.050.1-0.5-0.2500.250100k200k02M4M02550
Overall Statistics
Total Orders
979
Average Win
1.29%
Average Loss
-0.26%
Compounding Annual Return
243.892%
Drawdown
23.700%
Expectancy
5.000
Start Equity
100000
End Equity
14017426.3
Net Profit
13917.426%
Sharpe Ratio
3.629
Sortino Ratio
3.742
Probabilistic Sharpe Ratio
99.982%
Loss Rate
1%
Win Rate
99%
Profit-Loss Ratio
5.05
Alpha
1.515
Beta
0.033
Annual Standard Deviation
0.418
Annual Variance
0.175
Information Ratio
3.301
Tracking Error
0.438
Treynor Ratio
45.99
Total Fees
$106753.70
Estimated Strategy Capacity
$12000.00
Lowest Capacity Asset
SVXY 32N73JS5UWN0M|SVXY V0H08FY38ZFP
Portfolio Turnover
0.36%
# region imports
from AlgorithmImports import *
import cvxopt as cvx
from scipy import special
from scipy.stats import gamma, invweibull, norm
# endregion

class MaxLossVaRShortPut(QCAlgorithm):
    def initialize(self):
        self.set_start_date(2021, 1, 1)
        self.set_end_date(2025, 1, 1)
        self.set_cash(100000)
        self.set_security_initializer(VolumeShareFillSecurityInitializer(self, 1))
        self.universe_settings.data_normalization_mode = DataNormalizationMode.RAW
        
        # We want to trade the 95%VaR.
        self._alpha = 0.95
        self.lookback = 1000
        self.trade_period = 5
        self._orders = {}

        self.symbols = [self.add_equity(ticker).symbol for ticker in ["TQQQ", "SVXY", "TMF", "EDZ", "UGL", "UUP"]]
        # Rebalance weekly since we're trading the option expiring this week to avoid over-trading.
        self.schedule.on(
            self.date_rules.week_start(self.symbols[0]), 
            self.time_rules.after_market_open(self.symbols[0], 1), 
            self.rebalance
        )

    def rebalance(self):
        # Call the historical data to fit the GEV distribution to model max loss.
        # We get at least 252 data points.
        ret = self.history(self.symbols, 252+self.lookback+self.trade_period, Resolution.DAILY).close.unstack(0).pct_change().dropna()
        # Obtain the position size and strike levels.
        strikes = self.get_strikes(ret)
        weights = self.get_weight(ret)

        # Short a put to earn credit in N% confidence that it will not be assigned.
        for symbol, strike in strikes.items():
            chain = self.option_chain(symbol)
            # Trade the week-expiring put to ensure short value and liquidity.
            filtered = [x for x in chain if x.right == OptionRight.PUT and x.expiry <= self.time + timedelta(self.trade_period + 1)]
            if filtered:
                expiry = max(x.expiry for x in filtered)
                put = sorted(
                    [x for x in filtered if x.expiry == expiry],
                    key=lambda x: abs(x.strike - strike)
                )
                if not put:
                    continue
                # Request the contract data for trading.
                put_symbol = self.add_option_contract(put[0]).symbol

                # Calculate the actual order size of the contract.
                strike = put_symbol.id.strike_price
                quantity = weights[symbol] * self.portfolio.total_portfolio_value / strike // self.securities[put_symbol].symbol_properties.contract_multiplier
                if quantity:
                    self._orders[put_symbol] = quantity

    def get_strikes(self, ret):
        put_strikes = {}
        # Obtain the rolling max loss to fit the Inverse Weibull distribution to model catastrophic loss.
        max_loss = ((1 + ret).rolling(self.trade_period).apply(np.prod, raw=True) - 1).rolling(self.lookback).min().iloc[self.lookback+self.trade_period:]

        for symbol in max_loss.columns:
            # Fit Inverse Weibull distribution to obtain its parameters to get the VaR.
            params = invweibull.fit(max_loss[symbol])
            shape, loc, scale = params

            # Get N% VaR of each symbol analytically as the N% confident level that the put will not be assigned as our strike candidate.
            pi = scale * (-np.log(1 - self._alpha) ** (1 / shape))
            var_ = loc + pi
            put_strikes[symbol] = (1 - abs(var_)) * self.securities[symbol].price
        
        return put_strikes

    def get_weight(self, ret):
        # Equally dissipate the CVaR as a coherence risk measure among the universe of the underlying.
        # source: https://palomar.home.ece.ust.hk/papers/2015/FengPalomar-TSP2015%20-%20risk_parity_portfolio.pdf
        R = ret.dropna().values
        n = R.shape[1]
        T = R.shape[0]        
        S = np.cov(R.T)
        mu = R.mean(axis=0).reshape(-1, 1)
        e = R.std(axis=0).reshape(-1, 1)

        w_k = budget = np.array(n * [1. / n]).reshape(-1, 1)
        tol = 0.0001; max_iter = 20; iters = 1; fun_ = 1e7
        
        while iters < max_iter:
            w = w_k
            
            A = [None]*n
            gw = [None]*n
            for i in range(n):
                M = np.zeros(S.shape)
                M[i] = S[i]
                
                q = norm.ppf(1 - self._alpha, mu[i], e[i])
                k_2 = norm.pdf(q, mu[i], e[i])
                delta_g = -mu[i] + k_2 * ((w.T @ S @ w) * (M + M.T) @ w - (w.T @ M @ w) * S @ w) / (w.T @ S @ w)**(3/2) \
                          + budget[i] * mu - k_2 * budget[i] * (S @ w) / np.sqrt(w.T @ S @ w)
                g = -mu[i] * w[i] + k_2 * (w.T @ M @ w) / np.sqrt(w.T @ S @ w) + budget[i] * mu.T @ w - k_2 * budget[i] * np.sqrt(w.T @ S @ w)
                
                A[i] = delta_g.flatten()
                gw[i] = float(g)
                
            A = np.array(A)
            gw = np.array(gw).reshape(-1, 1)
            
            Q = 2 * A.T @ A + 0.01 * np.eye(n)
            q = 2 * A.T @ gw - Q @ w
            
            G = -cvx.matrix(np.eye(n))   # negative n x n identity matrix
            h = cvx.matrix(0.0, (n ,1))
            A1 = cvx.matrix(1.0, (1,n))
            b = cvx.matrix(1.0)
            
            opt = cvx.solvers.qp(cvx.matrix(Q), cvx.matrix(q), G, h, A1, b)
            w = np.asarray(opt['x'])
            fun = opt['primal objective']
            w_k += (w - w_k) / iters
            
            if abs(fun - fun_) < tol:
                break
            
            iters += 1
            fun_ = fun
            
        return {symbol: weight for symbol, weight in zip(ret.columns, w)}

    def on_data(self, slice):
        # Order when there is a quote to be more realistic and likely to be filled.
        for symbol, size in self._orders.copy().items():
            bar = slice.quote_bars.get(symbol)
            if bar:
                self.limit_order(symbol, -size, round(bar.high, 2))
                self._orders.pop(symbol)

    def on_assignment_order_event(self, assignment_event):
        # Liquidate the assigned underlyings to avoid volatility.
        self.market_order(
            assignment_event.symbol.underlying, 
            -assignment_event.fill_quantity * self.securities[assignment_event.symbol].symbol_properties.contract_multiplier, 
            tag="liquidate assigned"
        )

class VolumeShareFillModel(FillModel):
    def __init__(self, algorithm: QCAlgorithm, maximum_ratio: float = 1):
        self.algorithm = algorithm
        self.maximum_ratio = maximum_ratio
        self.absolute_remaining_by_order_id = {}

    def market_fill(self, asset, order):
        absolute_remaining = self.absolute_remaining_by_order_id.get(order.id, order.absolute_quantity)

        fill = super().market_fill(asset, order)
        # Set the fill amount to 100% of the previous bar.
        volume = asset.bid_size if order.quantity < 0 else asset.ask_size
        fill.fill_quantity = np.sign(order.quantity) * volume * self.maximum_ratio

        if (min(abs(fill.fill_quantity), absolute_remaining) == absolute_remaining):
            fill.fill_quantity = np.sign(order.quantity) * absolute_remaining
            fill.status = OrderStatus.FILLED
            self.absolute_remaining_by_order_id.pop(order.id, None)
        else:
            fill.status = OrderStatus.PARTIALLY_FILLED
            self.absolute_remaining_by_order_id[order.id] = absolute_remaining - abs(fill.fill_quantity)
            price = fill.fill_price

        return fill

class VolumeShareFillSecurityInitializer(BrokerageModelSecurityInitializer):
    def __init__(self, algorithm: QCAlgorithm, fill_ratio: float = 1) -> None:
        super().__init__(algorithm.brokerage_model, FuncSecuritySeeder(algorithm.get_last_known_prices))
        self.fill_model = VolumeShareFillModel(algorithm, fill_ratio)
        
    def initialize(self, security: Security) -> None:
        super().initialize(security)
        security.set_fill_model(self.fill_model)
        security.set_slippage_model(VolumeShareSlippageModel(1, 0.5))