Overall Statistics |
Total Orders 977 Average Win 2.95% Average Loss -1.65% Compounding Annual Return 48.915% Drawdown 87.500% Expectancy 0.454 Start Equity 100000 End Equity 2422983.26 Net Profit 2322.983% Sharpe Ratio 0.922 Sortino Ratio 1.263 Probabilistic Sharpe Ratio 20.484% Loss Rate 48% Win Rate 52% Profit-Loss Ratio 1.79 Alpha 0.483 Beta 1.094 Annual Standard Deviation 0.62 Annual Variance 0.385 Information Ratio 0.821 Tracking Error 0.597 Treynor Ratio 0.523 Total Fees $0.00 Estimated Strategy Capacity $2600000.00 Lowest Capacity Asset JASMYUSD 2XR Portfolio Turnover 1.63% |
# region imports from AlgorithmImports import * import scipy.cluster.hierarchy as sch,random,numpy as np from scipy.cluster.hierarchy import linkage from scipy.spatial.distance import squareform # endregion class StrategicCryptoReserveAlgorithm(QCAlgorithm): def initialize(self) -> None: self.set_end_date(2025, 3, 1) self.set_start_date(self.end_date - timedelta(8*365)) # Getting all the Crypto pairs on Coinbase that have a # quote currency of USD and aren’t a stablecoin. self._market_pairs = [ x.key.symbol for x in self.symbol_properties_database.get_symbol_properties_list(Market.COINBASE) if (x.value.quote_currency == self.account_currency and # Account currency is USD x.value.market_ticker.split('-')[0] not in ['DAI', 'USDT', 'USDC']) # Remove stable coins ] # Add a Crypto universe that updates at the start of each month. self.time_rules.set_default_time_zone(TimeZones.UTC) date_rule = self.date_rules.month_start() self.universe_settings.schedule.on(date_rule) self.universe_settings.resolution = Resolution.DAILY self._universe_size = self.get_parameter('universe_size', 10) self._universe = self.add_universe(CryptoUniverse.coinbase(self._select_assets)) # Schedule rebalances. self.schedule.on(date_rule, self.time_rules.midnight, self._rebalance) # Create the HRP. self._hrp = HeirarchicalRiskParity(self, self.get_parameter('lookback_months', 12)*30) def _select_assets(self, data): selected = [c for c in data if str(c.symbol.id).split()[0] in self._market_pairs] selected = [c.symbol for c in sorted(selected, key=lambda c: c.volume_in_usd)[-self._universe_size:]] self.plot('Universe', 'Size', len(selected)) return selected def _rebalance(self): symbols = self._universe.selected if not symbols: return self.set_holdings([PortfolioTarget(symbol, 0.9*weight) for symbol, weight in self._hrp.weights(symbols).items()], True) class HeirarchicalRiskParity: def __init__(self, algorithm, lookback=365): self._algorithm = algorithm self._lookback = lookback def weights(self, symbols): # Step 1) Cluster assets based on daily returns. daily_returns = self._algorithm.history(symbols, self._lookback, Resolution.DAILY).close.unstack(0).pct_change()[1:] cov, corr = daily_returns.cov(), daily_returns.corr() distance = self._distance(corr) link = sch.linkage(squareform(distance), 'single') # Step 2) Quasi-diagonalization sort_ix = self._quasi_diagonalization(link) sort_ix = corr.index[sort_ix].tolist() # recover labels # Step 3) Recursive bisection return self._recursive_bisection(cov, sort_ix) def _distance(self, corr): # A distance matrix based on correlation, where 0<=d[i,j]<=1 # This is a proper distance metric return ((1 - corr) / 2.0) ** 0.5 # distance matrix def _quasi_diagonalization(self, link): # Sort clustered items by distance link = link.astype(int) sort_ix = pd.Series([link[-1, 0], link[-1, 1]]) num_items = link[-1, 3] # number of original items while sort_ix.max() >= num_items: sort_ix.index = range(0, sort_ix.shape[0] * 2, 2) # make space df0 = sort_ix[sort_ix >= num_items] # find clusters i = df0.index j = df0.values - num_items sort_ix[i] = link[j, 0] # item 1 df0 = pd.Series(link[j, 1], index=i+1) sort_ix = pd.concat([sort_ix, df0]) # item 2 sort_ix = sort_ix.sort_index() # re-sort sort_ix.index = range(sort_ix.shape[0]) # re-index return sort_ix.tolist() def _recursive_bisection(self, cov, sort_ix): # Compute HRP alloc w = pd.Series(1.0, index=sort_ix) cluster_items = [sort_ix] # initialize all items in one cluster while len(cluster_items) > 0: # Bi-section. Drop elements that one 1-element lists. For elements that are multi-element lists, split them into 2 lists. bisected_cluster_items = [] for i in cluster_items: if len(i) > 1: for j, k in ((0, len(i) / 2), (len(i) / 2, len(i))): bisected_cluster_items.append(i[int(j):int(k)]) cluster_items = bisected_cluster_items for i in range(0, len(cluster_items), 2): # parse in pairs cluster_items_0 = cluster_items[i] # cluster 1 cluster_items_1 = cluster_items[i+1] # cluster 2 c_var_0 = self._cluster_variance(cov, cluster_items_0) c_var_1 = self._cluster_variance(cov, cluster_items_1) alpha = 1 - c_var_0 / (c_var_0 + c_var_1) w[cluster_items_0] *= alpha # weight 1 w[cluster_items_1] *= 1 - alpha # weight 2 return w def _cluster_variance(self, cov, cluster_items): # Compute variance per cluster cluster_cov = cov.loc[cluster_items, cluster_items] # matrix slice weights = self._inverse_variance_weights(cluster_cov).reshape(-1, 1) return np.dot(np.dot(weights.T, cluster_cov), weights)[0, 0] def _inverse_variance_weights(self, cov, **kargs): # Compute the inverse-variance portfolio inverse_var = 1 / np.diag(cov) return inverse_var / inverse_var.sum()