Created with Highcharts 12.1.2EquityJan 2019Jan…Jul 2019Jan 2020Jul 2020Jan 2021Jul 2021Jan 2022Jul 2022Jan 2023Jul 2023Jan 2024Jul 2024Jan 2025Jul 2025010M20M-500020510050M100M0100M200M050100
Overall Statistics
Total Orders
603
Average Win
2.72%
Average Loss
-2.04%
Compounding Annual Return
48.442%
Drawdown
50.300%
Expectancy
0.533
Start Equity
1000000
End Equity
10697773.81
Net Profit
969.777%
Sharpe Ratio
0.987
Sortino Ratio
1.062
Probabilistic Sharpe Ratio
36.153%
Loss Rate
34%
Win Rate
66%
Profit-Loss Ratio
1.33
Alpha
0.296
Beta
0.976
Annual Standard Deviation
0.405
Annual Variance
0.164
Information Ratio
0.789
Tracking Error
0.372
Treynor Ratio
0.41
Total Fees
$41465.63
Estimated Strategy Capacity
$8100000.00
Lowest Capacity Asset
MGC YONRSAUV1BQ5
Portfolio Turnover
15.96%
# region imports
from AlgorithmImports import *
from scipy.optimize import minimize
from hmmlearn.hmm import GMMHMM
# endregion

class DrawdownRegimeGoldHedgeAlgorithm(QCAlgorithm):
    def initialize(self) -> None:
        self.set_end_date(2025, 1, 1)
        self.set_start_date(self.end_date - timedelta(6*365))
        self.set_cash(1000000)
        self.set_security_initializer(BrokerageModelSecurityInitializer(self.brokerage_model, FuncSecuritySeeder(self.get_last_known_prices)))

        # Determine the lookback window (in weeks).
        self.history_lookback = self.get_parameter("history_lookback", 70)
        self.drawdown_lookback = self.get_parameter("drawdown_lookback", 35)
        self.investor_view = self.get_parameter("investor_view", 0.5)

        # Request SPY as market representative for trading.
        self.spy = self.add_equity("SPY", Resolution.MINUTE).symbol
        self.set_benchmark(self.spy)
        # Map the future by open interest to trade with the most liquid contract.
        self._future = self.add_future(Futures.Metals.MICRO_GOLD)

        # Schdeuled a weekly rebalance.
        self.schedule.on(self.date_rules.week_start(self.spy), self.time_rules.after_market_open(self.spy, 1), self.rebalance)
    
    def rebalance(self) -> None:
        # Get the drawdown as the input to the drawdown regime. Since we're rebalancing weekly, we resample to study weekly drawdown.
        history = self.history([self.spy, self._future.symbol], self.history_lookback*5, Resolution.DAILY).droplevel([0]).unstack(0).close.resample('W').last()
        drawdown = history.rolling(self.drawdown_lookback).apply(lambda a: (a.iloc[-1] - a.max()) / a.max()).dropna()
        try:
            # Initialize the HMM, then fit by the drawdown data, as we're interested in the downside risk regime.
            # McLachlan & Peel (2000) suggested 2-3 components are used in GMMs to capture the main distribution and the tail to balance between complexity and characteristics capture.
            # By studying the ACF and PACF plots, the 1-lag drawdown series is suitable to supplement as exogenous variable.
            inputs = np.concatenate([drawdown[[self.spy]].iloc[1:].values, drawdown[[self.spy]].diff().iloc[1:].values], axis=1)
            model = GMMHMM(n_components=2, n_mix=3, covariance_type='tied', n_iter=100, random_state=0).fit(inputs)
            # Obtain the current market regime.
            regime_probs = model.predict_proba(inputs)
            current_regime_prob = regime_probs[-1]
            regime = 0 if current_regime_prob[0] > current_regime_prob[1] else 1

            # Determine the regime number: the higher the coefficient, the larger the drawdown in this state.
            high_regime = 1 if model.means_[0][1][0] < model.means_[1][1][0] else 0
            # Check the transitional probability of the next regime being the high volatility regime.
            # Calculated by the probability of the current regime being 1/0, then multiplied by the posterior probabilities of each scenario.
            next_prob_zero = current_regime_prob @ model.transmat_[:, 0]
            next_prob_high = next_prob_zero if high_regime == 0 else 1 - next_prob_zero

            # Optimization per each regime, then weighted by the posterior probabilities.
            weights = self.position_sizing(drawdown.iloc[1:], regime_probs.argmax(axis=1), high_regime, next_prob_high)
            denominator = weights[1] / self._future.symbol_properties.contract_multiplier + weights[0]
            self.set_holdings([PortfolioTarget(self._future.mapped, weights[1] / self._future.symbol_properties.contract_multiplier / denominator),
                                PortfolioTarget(self.spy, weights[0] / denominator)],
                                liquidate_existing_holdings=True)
        except:
            pass

    def position_sizing(self, ret, regime_labels, high_regime, next_prob_high):
        # Identify the returns by regime.
        if high_regime == 1:
            high_ret = ret[regime_labels.astype(bool)]
            low_ret = ret[abs(regime_labels - 1).astype(bool)]
        else:
            low_ret = ret[regime_labels.astype(bool)]
            high_ret = ret[abs(regime_labels - 1).astype(bool)]

        weights = np.array([0, 0])
        # Separately calculate 2 sets of weight per regime, combine by the probabilities that currently in the regime.
        next_probs = np.array([next_prob_high, 1 - next_prob_high])
        for ret_, weight_ in zip([high_ret, low_ret], next_probs):
            # Parameters for the risk parity optimization.
            x0 = np.array([1/ret_.shape[1]] * ret_.shape[1])
            constraints = {"type": "eq", "fun": lambda w: np.sum(w) - 1}
            bounds = [(0, 1)] * ret_.shape[1]
            # Obtain weight by mean-risk-parity optimization.
            # Buy more Gold and less SPY if the current regime is easier to have large drawdown.
            # Fund will shift to hedge asset like gold to drive up its price.
            opt = minimize(lambda w: self.investor_view * (-next_probs[::-1] @ w) + (1 - self.investor_view) * (0.5 * (w.T @ np.cov(ret_.T) @ w) - x0 @ np.log(w)), 
                           x0=x0, constraints=constraints, bounds=bounds, method="SLSQP")
            weights = weights + (x0 if any(np.isnan(opt.x)) else opt.x) * weight_

        return np.nan_to_num(weights)