Overall Statistics
Total Trades
1254
Average Win
0.07%
Average Loss
-0.06%
Compounding Annual Return
-0.037%
Drawdown
2.600%
Expectancy
-0.003
Net Profit
-0.147%
Sharpe Ratio
-0.017
Probabilistic Sharpe Ratio
0.727%
Loss Rate
53%
Win Rate
47%
Profit-Loss Ratio
1.13
Alpha
-0
Beta
-0.003
Annual Standard Deviation
0.011
Annual Variance
0
Information Ratio
0.158
Tracking Error
0.061
Treynor Ratio
0.075
Total Fees
$0.00
Estimated Strategy Capacity
$72000000.00
Lowest Capacity Asset
AUDUSD 8G
Portfolio Turnover
14.21%
#region imports
from AlgorithmImports import *
from arch.unitroot.cointegration import engle_granger
from pykalman import KalmanFilter
from scipy.optimize import minimize
from statsmodels.tsa.vector_ar.vecm import VECM
#endregion

class KalmanFilterStatisticalArbitrageAlphaModel(AlphaModel):

    def __init__(self, lookback = 30, recalibrateInterval = timedelta(7)):
        self.lookback = lookback
        self.recalibrateInterval = recalibrateInterval

        self.symbol_data = {}
        self.kalmanFilter = None
        self.currentMean = None
        self.currentCov = None
        self.threshold = None
        self.trading_weight = {}
        self.state = 0

        self.rebalance_time = datetime.min

    def Update(self, algorithm, data):
        insights = []

        if algorithm.Time > self.rebalance_time:
            self.Recalibrate()
            self.rebalance_time = algorithm.Time + self.recalibrateInterval

        if not self.trading_weight: return insights
        
        # We take the latest cached price as the next time point input (does not matter if no new data point)
        data = [np.log(algorithm.Securities[symbol].Close) * self.trading_weight[symbol] 
            for symbol in self.symbol_data.keys()]
        spread = np.product(data)

        if not spread: return insights
            
        # If all pairs got consolidated data and updated their daily price, we update the Kalman Filter
        if all([data.updated for data in self.symbol_data.values()]):
            # Update the Kalman Filter with the spread
            (self.currentMean, self.currentCov) = self.kalmanFilter.filter_update(filtered_state_mean = self.currentMean,
                                                                                  filtered_state_covariance = self.currentCov,
                                                                                  observation = spread)
            # reset the flag
            for data in self.symbol_data.values():
                data.updated = False

        # Obtain the normalized spread
        normalized_spread = spread - self.currentMean
        
        # Mean-reversion
        if normalized_spread < -self.threshold and self.state != 1:
            for symbol, weight in self.trading_weight.items():
                if algorithm.IsMarketOpen(symbol):
                    insights.append(
                        Insight.Price(symbol, timedelta(365), InsightDirection.Up, weight=weight))
            self.state = 1
            
        elif normalized_spread > self.threshold and self.state != -1:
            for symbol, weight in self.trading_weight.items():
                if algorithm.IsMarketOpen(symbol):
                    insights.append(
                        Insight.Price(symbol, timedelta(365), InsightDirection.Down, weight=weight))
            self.state = -1
                
        # Out of position if spread converged
        elif (self.state == 1 and normalized_spread > 0) or (self.state == -1 and normalized_spread < 0):
            algorithm.Insights.Cancel(list(self.symbol_data.keys()))
            self.state = 0
    
        return insights

    def Recalibrate(self):
        # Get log price series of all signaled assets
        log_price = np.log(
            pd.DataFrame({symbol: data.Price for symbol, data in self.symbol_data.items() if data.IsReady}))
        
        if log_price.empty: return

        # Get the weighted spread across different cointegration subspaces
        weighted_spread, weights, beta = self.GetSpreads(log_price)
        
        # Set up the Kalman Filter with the weighted spread series, and obtain the adjusted mean series
        mean_series = self.SetKalmanFilter(weighted_spread)

        # Obtain the normalized spread series, the first 20 in-sample will be discarded.
        normalized_spread = (weighted_spread.iloc[20:] - mean_series)

        # Set the threshold of price divergence to optimize profit
        self.SetTradingThreshold(normalized_spread)

        # Set the normalize trading weight
        weights = self.GetTradingWeight(beta, weights)
        for symbol, weight in zip(log_price.columns, weights):
            self.trading_weight[symbol] = weight

    def GetSpreads(self, logPriceDf):
        # Initialize a VECM model following the unit test parameters, then fit to our data.
        # We allow 3 AR difference, and no deterministic term.
        vecm_result = VECM(logPriceDf, k_ar_diff=3, coint_rank=logPriceDf.shape[1]-1, deterministic='n').fit()
        # Obtain the Beta attribute. This is the cointegration subspaces' unit vectors.
        beta = vecm_result.beta
        # get the spread of different cointegration subspaces.
        spread = logPriceDf @ beta
        # Optimize the distribution across cointegration subspaces and return the weighted spread
        return self.OptimizeSpreads(spread, beta)

    def OptimizeSpreads(self, spread, beta):
        # We set the weight on each vector is between -1 and 1. While overall sum is 0.
        x0 = np.array([-1**i / beta.shape[1] for i in range(beta.shape[1])])
        bounds = tuple((-1, 1) for i in range(beta.shape[1]))
        constraints = [{'type': 'eq', 'fun': lambda x: np.sum(x)}]
        
        # Optimize the Portmanteau statistics
        opt = minimize(lambda w: ((w.T @ np.cov(spread.T, spread.shift(1).fillna(0).T)[spread.shape[1]:, :spread.shape[1]] @ w)\
                                 / (w.T @ np.cov(spread.T) @ w))**2,
                        x0=x0,
                        bounds=bounds,
                        constraints=constraints,
                        method="SLSQP")
        
        # Normalize the result
        opt.x = opt.x / np.sum(abs(opt.x))
        # Return the weighted spread series
        return spread @ opt.x, opt.x, beta

    def SetKalmanFilter(self, weighted_spread):
        # Initialize a Kalman Filter. Using the first 20 data points to optimize its initial state. 
        # We assume the market has no regime change so that the transitional matrix and observation matrix is [1].
        self.kalmanFilter = KalmanFilter(transition_matrices = [1],
                            observation_matrices = [1],
                            initial_state_mean = weighted_spread.iloc[:20].mean(),
                            observation_covariance = weighted_spread.iloc[:20].var(),
                            em_vars=['transition_covariance', 'initial_state_covariance'])
        self.kalmanFilter = self.kalmanFilter.em(weighted_spread.iloc[:20], n_iter=5)
        (filtered_state_means, filtered_state_covariances) = self.kalmanFilter.filter(weighted_spread.iloc[:20])
        
        # Obtain the current Mean and Covariance Matrix expectations.
        self.currentMean = filtered_state_means[-1, :]
        self.currentCov = filtered_state_covariances[-1, :]
        
        # Initialize a mean series for spread normalization using the Kalman Filter's results.
        mean_series = np.array([None]*(weighted_spread.shape[0]-20))
        
        # Roll over the Kalman Filter to obtain the mean series.
        for i in range(20, weighted_spread.shape[0]):
            (self.currentMean, self.currentCov) = self.kalmanFilter.filter_update(filtered_state_mean = self.currentMean,
                                                                    filtered_state_covariance = self.currentCov,
                                                                    observation = weighted_spread.iloc[i])
            mean_series[i-20] = float(self.currentMean)

        return mean_series

    def SetTradingThreshold(self, normalized_spread):
        # Initialize 20 set levels for testing.
        s0 = np.linspace(0, max(normalized_spread), 20)
        
        # Calculate the profit levels using the 20 set levels.
        f_bar = np.array([None] * 20)
        for i in range(20):
            f_bar[i] = len(normalized_spread.values[normalized_spread.values > s0[i]]) \
                / normalized_spread.shape[0]
            
        # Set trading frequency matrix.
        D = np.zeros((19, 20))
        for i in range(D.shape[0]):
            D[i, i] = 1
            D[i, i+1] = -1
            
        # Set level of lambda.
        l = 1.0
        
        # Obtain the normalized profit level.
        f_star = np.linalg.inv(np.eye(20) + l * D.T @ D) @ f_bar.reshape(-1, 1)
        s_star = [f_star[i] * s0[i] for i in range(20)]
        self.threshold = s0[s_star.index(max(s_star))]

    def GetTradingWeight(self, beta, weights):
        trading_weight = beta @ weights
        return trading_weight / np.sum(abs(trading_weight))

    def OnSecuritiesChanged(self, algorithm, changes):
        for removed in changes.RemovedSecurities:
            symbolData = self.symbol_data.pop(removed.Symbol, None)
            if symbolData:
                symbolData.Dispose()

        for added in changes.AddedSecurities:
            symbol = added.Symbol
            if symbol not in self.symbol_data and added.Type == SecurityType.Forex:
                self.symbol_data[symbol] = SymbolData(algorithm, symbol, self.lookback)
    
class SymbolData:

    def __init__(self, algorithm, symbol, lookback):
        self.algorithm = algorithm
        self.symbol = symbol
        self.lookback = lookback
        self.updated = False

        # To store the historical daily log return
        self.window = RollingWindow[IndicatorDataPoint](lookback)

        # Use daily log return to predict cointegrating vector
        self.consolidator = QuoteBarConsolidator(timedelta(hours=1))
        self.price = Identity(f"{symbol} Price")
        self.price.Updated += self.OnUpdate

        # Subscribe the consolidator and indicator to data for automatic update
        algorithm.RegisterIndicator(symbol, self.price, self.consolidator)
        algorithm.SubscriptionManager.AddConsolidator(symbol, self.consolidator)

        # historical warm-up on the log return indicator
        history = algorithm.History[QuoteBar](self.symbol, self.lookback, Resolution.Hour)
        for bar in history:
            self.consolidator.Update(bar)

    def OnUpdate(self, sender, updated):
        self.window.Add(IndicatorDataPoint(updated.EndTime, updated.Value))
        self.updated = True

    def Dispose(self):
        self.price.Updated -= self.OnUpdate
        self.price.Reset()
        self.window.Reset()
        self.algorithm.SubscriptionManager.RemoveConsolidator(self.symbol, self.consolidator)

    @property
    def IsReady(self):
        return self.window.IsReady

    @property
    def Price(self):
        return pd.Series(
            data = [x.Value for x in self.window],
            index = [x.EndTime for x in self.window])[::-1]
# region imports
from AlgorithmImports import *
from alpha import KalmanFilterStatisticalArbitrageAlphaModel
# endregion

class KalmanFilterStatisticalArbitrageAlgorithm(QCAlgorithm):

    def Initialize(self):
        self.SetStartDate(2019, 1, 1)
        self.SetEndDate(2023, 1, 1)
        self.SetCash(100000)

        self.SetBrokerageModel(BrokerageName.OandaBrokerage, AccountType.Margin)

        self.UniverseSettings.Resolution = Resolution.Minute

        # We focus on major forex pairs
        symbols = [ Symbol.Create(pair, SecurityType.Forex, Market.Oanda) for pair in
            ["AUDUSD", "EURUSD", "GBPUSD", "USDCAD", "USDCHF", "USDJPY"] ]
        self.SetUniverseSelection(ManualUniverseSelectionModel(symbols))

        # A custom alpha model for Kalman Filter prediction and statistical arbitrage signaling
        self.AddAlpha(KalmanFilterStatisticalArbitrageAlphaModel())

        # Use the insight weights for sizing, set a very long rebalance period to avoid constant rebalancing
        self.SetPortfolioConstruction(InsightWeightingPortfolioConstructionModel(Expiry.EndOfYear))