Created with Highcharts 12.1.2EquityJul 2017Jan 2018Jul 2018Jan 2019Jul 2019Jan 2020Jul 2020Jan 2021Jul 2021Jan 2022Jul 2022Jan 2023Jul 2023Jan 2024Jul 2024Jan 2025Jul 202502.5M5M-100000.201050M05M010001020
Overall Statistics
Total Orders
977
Average Win
2.95%
Average Loss
-1.65%
Compounding Annual Return
48.915%
Drawdown
87.500%
Expectancy
0.454
Start Equity
100000
End Equity
2422983.26
Net Profit
2322.983%
Sharpe Ratio
0.922
Sortino Ratio
1.263
Probabilistic Sharpe Ratio
20.484%
Loss Rate
48%
Win Rate
52%
Profit-Loss Ratio
1.79
Alpha
0.483
Beta
1.094
Annual Standard Deviation
0.62
Annual Variance
0.385
Information Ratio
0.821
Tracking Error
0.597
Treynor Ratio
0.523
Total Fees
$0.00
Estimated Strategy Capacity
$2600000.00
Lowest Capacity Asset
JASMYUSD 2XR
Portfolio Turnover
1.63%
# region imports
from AlgorithmImports import *

import scipy.cluster.hierarchy as sch,random,numpy as np
from scipy.cluster.hierarchy import linkage
from scipy.spatial.distance import squareform
# endregion

class StrategicCryptoReserveAlgorithm(QCAlgorithm):

    def initialize(self) -> None:
        self.set_end_date(2025, 3, 1)
        self.set_start_date(self.end_date - timedelta(8*365))
        # Getting all the Crypto pairs on Coinbase that have a 
        # quote currency of USD and aren’t a stablecoin.
        self._market_pairs = [
            x.key.symbol 
            for x in self.symbol_properties_database.get_symbol_properties_list(Market.COINBASE) 
            if (x.value.quote_currency == self.account_currency and   # Account currency is USD
                x.value.market_ticker.split('-')[0] not in ['DAI', 'USDT', 'USDC'])  # Remove stable coins
        ]
        # Add a Crypto universe that updates at the start of each month.
        self.time_rules.set_default_time_zone(TimeZones.UTC)
        date_rule = self.date_rules.month_start()
        self.universe_settings.schedule.on(date_rule)
        self.universe_settings.resolution = Resolution.DAILY
        self._universe_size = self.get_parameter('universe_size', 10)
        self._universe = self.add_universe(CryptoUniverse.coinbase(self._select_assets))
        # Schedule rebalances.
        self.schedule.on(date_rule, self.time_rules.midnight, self._rebalance)
        # Create the HRP.
        self._hrp = HeirarchicalRiskParity(self, self.get_parameter('lookback_months', 12)*30)

    def _select_assets(self, data):
        selected = [c for c in data if str(c.symbol.id).split()[0] in self._market_pairs]
        selected = [c.symbol for c in sorted(selected, key=lambda c: c.volume_in_usd)[-self._universe_size:]]
        self.plot('Universe', 'Size', len(selected))
        return selected

    def _rebalance(self):
        symbols = self._universe.selected
        if not symbols:
            return
        self.set_holdings([PortfolioTarget(symbol, 0.9*weight) for symbol, weight in self._hrp.weights(symbols).items()], True)


class HeirarchicalRiskParity:

    def __init__(self, algorithm, lookback=365):
        self._algorithm = algorithm
        self._lookback = lookback

    def weights(self, symbols):
        # Step 1) Cluster assets based on daily returns.
        daily_returns = self._algorithm.history(symbols, self._lookback, Resolution.DAILY).close.unstack(0).pct_change()[1:]
        cov, corr = daily_returns.cov(), daily_returns.corr()
        distance = self._distance(corr)
        link = sch.linkage(squareform(distance), 'single')
        # Step 2) Quasi-diagonalization
        sort_ix = self._quasi_diagonalization(link)
        sort_ix = corr.index[sort_ix].tolist() # recover labels
        # Step 3) Recursive bisection
        return self._recursive_bisection(cov, sort_ix)

    def _distance(self, corr):
        # A distance matrix based on correlation, where 0<=d[i,j]<=1
        # This is a proper distance metric
        return ((1 - corr) / 2.0) ** 0.5 # distance matrix

    def _quasi_diagonalization(self, link):
        # Sort clustered items by distance
        link = link.astype(int)
        sort_ix = pd.Series([link[-1, 0], link[-1, 1]])
        num_items = link[-1, 3] # number of original items
        while sort_ix.max() >= num_items:
            sort_ix.index = range(0, sort_ix.shape[0] * 2, 2) # make space
            df0 = sort_ix[sort_ix >= num_items] # find clusters
            i = df0.index
            j = df0.values - num_items
            sort_ix[i] = link[j, 0] # item 1
            df0 = pd.Series(link[j, 1], index=i+1)
            sort_ix = pd.concat([sort_ix, df0]) # item 2
            sort_ix = sort_ix.sort_index() # re-sort
            sort_ix.index = range(sort_ix.shape[0]) # re-index
        return sort_ix.tolist()

    def _recursive_bisection(self, cov, sort_ix):
        # Compute HRP alloc
        w = pd.Series(1.0, index=sort_ix)
        cluster_items = [sort_ix] # initialize all items in one cluster
        while len(cluster_items) > 0:
            # Bi-section. Drop elements that one 1-element lists. For elements that are multi-element lists, split them into 2 lists.
            bisected_cluster_items = []
            for i in cluster_items:
                if len(i) > 1:
                    for j, k in ((0, len(i) / 2), (len(i) / 2, len(i))):
                        bisected_cluster_items.append(i[int(j):int(k)])
            cluster_items = bisected_cluster_items
            for i in range(0, len(cluster_items), 2): # parse in pairs
                cluster_items_0 = cluster_items[i] # cluster 1
                cluster_items_1 = cluster_items[i+1] # cluster 2
                c_var_0 = self._cluster_variance(cov, cluster_items_0)
                c_var_1 = self._cluster_variance(cov, cluster_items_1)
                alpha = 1 - c_var_0 / (c_var_0 + c_var_1)
                w[cluster_items_0] *= alpha # weight 1
                w[cluster_items_1] *= 1 - alpha # weight 2
        return w

    def _cluster_variance(self, cov, cluster_items):
        # Compute variance per cluster
        cluster_cov = cov.loc[cluster_items, cluster_items] # matrix slice
        weights = self._inverse_variance_weights(cluster_cov).reshape(-1, 1)
        return np.dot(np.dot(weights.T, cluster_cov), weights)[0, 0]

    def _inverse_variance_weights(self, cov, **kargs):
        # Compute the inverse-variance portfolio
        inverse_var = 1 / np.diag(cov)
        return inverse_var / inverse_var.sum()