Overall Statistics |
Total Orders 2340 Average Win 0.11% Average Loss -0.07% Compounding Annual Return 20.686% Drawdown 17.200% Expectancy 1.099 Start Equity 1000000 End Equity 2562943.50 Net Profit 156.294% Sharpe Ratio 0.837 Sortino Ratio 1.012 Probabilistic Sharpe Ratio 46.925% Loss Rate 19% Win Rate 81% Profit-Loss Ratio 1.59 Alpha 0.01 Beta 0.895 Annual Standard Deviation 0.14 Annual Variance 0.02 Information Ratio -0.044 Tracking Error 0.049 Treynor Ratio 0.131 Total Fees $2620.56 Estimated Strategy Capacity $25000000.00 Lowest Capacity Asset NOB R735QTJ8XC9X Portfolio Turnover 0.66% |
# region imports from AlgorithmImports import * from universe import TopologicalGraphUniverseSelectionModel # endregion np.random.seed(0) class TopologicalPortfolio(QCAlgorithm): def initialize(self) -> None: self.set_start_date(2020, 3, 25) self.set_end_date(2025, 3, 25) self.set_cash(1000000) # We would like to compare with SPY for the correlation and risk-adjusted return. spy = self.add_equity("SPY").symbol self.set_benchmark(spy) # Lookback window to construct and analyze the topological structure. history_lookback = self.get_parameter("history_lookback", 150) # Set the period to reconstruct the topological complex. recalibrate_period = self.get_parameter("recalibrate_period", 125) # Construct a portfolio with SPY constituents. self.universe_model = TopologicalGraphUniverseSelectionModel( spy, history_lookback, recalibrate_period, lambda u: [x.symbol for x in sorted( [x for x in u if x.weight], key=lambda x: x.weight, reverse=True )[:200]] ) self.add_universe_selection(self.universe_model) # Set a scheduled event to rebalance the portfolio daily. self.schedule.on(self.date_rules.every_day(spy), self.time_rules.at(9, 31), self.rebalance) # Set the warm up to warm up the universe selection. self.set_warm_up(timedelta(365)) def rebalance(self) -> None: if self.universe_model.clustered_symbols: # Obtain the weights invested in each constituents. weights = self.weight_distribution(self.universe_model.clustered_symbols) # Rebalance by designated weights. self.set_holdings([PortfolioTarget(symbol, weight) for symbol, weight in weights.items()], liquidate_existing_holdings=True) def weight_distribution(self, clustered_symbols): # Assign weight between and within giant and small clusters. Note that we do not invest in outliers. weights = {} def assign_weights(nested_list, level=1): num_elements = len(nested_list) if num_elements == 0: return weight_per_element = 1 / num_elements for item in nested_list: if isinstance(item, list): assign_weights(item, level + 1) else: weights[item] = weights.get(item, 0) + weight_per_element / (2 ** (level - 1)) # Calculate the overall weights. assign_weights(clustered_symbols) return pd.Series(weights) / sum(weights.values())
# region imports from AlgorithmImports import * from Selection.ETFConstituentsUniverseSelectionModel import ETFConstituentsUniverseSelectionModel import kmapper as km from sklearn.cluster import DBSCAN from sklearn.decomposition import PCA from umap import UMAP # endregion np.random.seed(0) class TopologicalGraphUniverseSelectionModel(ETFConstituentsUniverseSelectionModel): def __init__(self, etf_symbol: Symbol, lookback_window: int = 250, recalibration_period: timedelta = None, universe_filter_func: Callable[list[ETFConstituentUniverse], list[Symbol]] = None) -> None: self._symbol = etf_symbol self.lookback_window = lookback_window self.recalibration_period = recalibration_period self.clustered_symbols = None super().__init__(etf_symbol, None, universe_filter_func) def create_universes(self, algorithm: QCAlgorithm) -> list[Universe]: universe_list = super().create_universes(algorithm) # Initial warm up of the universe. next_open = algorithm.securities[self._symbol].exchange.hours.get_next_market_open(algorithm.time, False) algorithm.schedule.on( algorithm.date_rules.on([next_open]), algorithm.time_rules.at(9, 31), lambda: self.get_graph_symbols(algorithm) ) return universe_list def get_graph_symbols(self, algorithm: QCAlgorithm) -> None: # Construct simplicial complex. graph, symbol_list = self.construct_simplicial_complex(algorithm, self.lookback_window) if len(symbol_list) > 0: self.clustered_symbols = self.clustering_symbols(graph, symbol_list) # Set schedule event to reconstruct the topological structure. algorithm.schedule.on( algorithm.date_rules.on([algorithm.time + timedelta(self.recalibration_period)]), algorithm.time_rules.at(0, 1), lambda: self.get_graph_symbols(algorithm) ) def construct_simplicial_complex(self, algorithm: QCAlgorithm, lookback_window: int) -> tuple[dict[str, object], list[Symbol]]: if not self.universe.selected: return {}, [] # Obtain historical data to construct a graph of stock relationship. prices = algorithm.history(self.universe.selected, lookback_window, Resolution.DAILY).unstack(0).close # Calculate daily log return. Then, transpose the data since we're relating stocks. log_returns = np.log(prices / prices.shift(1)).dropna().T if log_returns.empty: return {}, [] # Initialize the mapper algorithm. mapper = km.KeplerMapper() # Project the data into a 2d subspace via 2 transformation, PCA and UMAP. # PCA: since it can retain the most variance while denoising, as well as fast. # UMAP: handles non-linear relationships well and preserves both local and global structures. # MDS and Isomap are not included due to their potential sensitivity to noise and outliers in financial data. projected_data = mapper.fit_transform(log_returns, projection=[PCA(n_components=0.8, random_state=1), UMAP(n_components=1, random_state=1, n_jobs=-1)]) # Cluster the data with DBSCAN since it is better in handling noise. # We are interested in the correlation distance to cluster and form a portfolio. graph = mapper.map(projected_data, log_returns, clusterer=DBSCAN(metric='correlation', n_jobs=-1)) return graph, prices.columns def clustering_symbols(self, graph: dict[str, object], symbol_list: list[Symbol]) -> list[list[object]]: # Each connected structure as a giant cluster. linked_clusters = [] for x, y in graph['links'].items(): isin = False for i in range(len(linked_clusters)): if x in linked_clusters[i] or y in linked_clusters[i]: linked_clusters[i] = list(set(linked_clusters[i] + [x] + y)) isin = True if isin: continue linked_clusters.append([x] + y) linked_clusters += [[x] for x in graph['nodes'] if x not in [z for y in linked_clusters for z in y]] # Convert the node into symbol. return [[list([symbol_list[graph['nodes'][x]]][0]) for x in linked_cluster] for linked_cluster in linked_clusters]