A subscriber asked me to make a video on Topology in trading. I did this by modifying code from this article: 

To work in QC. I did it on a single stock with a rolling window so we can really see what the math is trying to accomplish. I'll leave you to read the article to get the nitty gritty, but in short this indicator detects changes in market regimes. It detects by Wasserstein Distance how different some parts of the market may be to detect meaningful changes. Here is the QC code and a link to my YouTube! 

# region imports
from AlgorithmImports import *
import numpy as np
from ripser import Rips
import persim
# endregion

class MuscularMagentaLion(QCAlgorithm):

    def Initialize(self):
        self.SetStartDate(2020, 1, 1)
        self.SetCash(100000)
        
        self.eq = self.AddEquity("SPY", Resolution.HOUR).Symbol

        # Rolling window
        self.lookback = 20
        self.threshold = 1
        self.rips = Rips(maxdim=2)

        self.close_window = RollingWindow[float](self.lookback*5)

        self.SetWarmup(self.lookback*5)

        
    def OnData(self, data: Slice):
        
        if self.IsWarmingUp:
            return

        if not (data.ContainsKey(self.eq) and data[self.eq] is not None):
            return

        self.close_window.Add(data[self.eq].close)

        if not self.close_window.IsReady:
            return

        closes_list = list(self.close_window)
        self.prices = np.array(closes_list)

        lgr = np.log(self.prices[1:] / self.prices[:-1])

        wasserstein_dists = self.compute_wasserstein_distances(lgr, self.lookback, self.rips)
        wd = sum(wasserstein_dists)

        self.Plot("wd", "wd", wd)

        
        if self.Portfolio[self.eq].is_short:

            if wd <= self.threshold:
                self.set_holdings(self.eq, 0.80, True)

            else: return

        elif self.Portfolio[self.eq].is_long:
            if wd >= self.threshold:
                self.set_holdings(self.eq, -0.80, True)

            else: return

        else: self.set_holdings(self.eq, 0.80)

    def compute_wasserstein_distances(self, log_returns, window_size, rips):
        """Compute the Wasserstein distances."""

        # https://medium.com/@crisvelasquez/predicting-stock-market-crashes-with-topological-data-analysis-in-python-1dc4f18ca7ca

        n = len(log_returns) - (2 * window_size) + 1
        distances = np.full((n, 1), np.nan)  # Using np.full with NaN values

        for i in range(n):
            segment1 = log_returns[i:i+window_size].reshape(-1, 1)
            segment2 = log_returns[i+window_size:i+(2*window_size)].reshape(-1, 1)

            if segment1.shape[0] != window_size or segment2.shape[0] != window_size:
                continue

            dgm1 = rips.fit_transform(segment1)
            dgm2 = rips.fit_transform(segment2)
            distance = persim.wasserstein(dgm1[0], dgm2[0], matching=False)
            distances[i] = distance

        return distances