Overall Statistics |
Total Trades 1254 Average Win 0.07% Average Loss -0.06% Compounding Annual Return -0.037% Drawdown 2.600% Expectancy -0.003 Net Profit -0.147% Sharpe Ratio -0.017 Probabilistic Sharpe Ratio 0.727% Loss Rate 53% Win Rate 47% Profit-Loss Ratio 1.13 Alpha -0 Beta -0.003 Annual Standard Deviation 0.011 Annual Variance 0 Information Ratio 0.158 Tracking Error 0.061 Treynor Ratio 0.075 Total Fees $0.00 Estimated Strategy Capacity $72000000.00 Lowest Capacity Asset AUDUSD 8G Portfolio Turnover 14.21% |
#region imports from AlgorithmImports import * from arch.unitroot.cointegration import engle_granger from pykalman import KalmanFilter from scipy.optimize import minimize from statsmodels.tsa.vector_ar.vecm import VECM #endregion class KalmanFilterStatisticalArbitrageAlphaModel(AlphaModel): def __init__(self, lookback = 30, recalibrateInterval = timedelta(7)): self.lookback = lookback self.recalibrateInterval = recalibrateInterval self.symbol_data = {} self.kalmanFilter = None self.currentMean = None self.currentCov = None self.threshold = None self.trading_weight = {} self.state = 0 self.rebalance_time = datetime.min def Update(self, algorithm, data): insights = [] if algorithm.Time > self.rebalance_time: self.Recalibrate() self.rebalance_time = algorithm.Time + self.recalibrateInterval if not self.trading_weight: return insights # We take the latest cached price as the next time point input (does not matter if no new data point) data = [np.log(algorithm.Securities[symbol].Close) * self.trading_weight[symbol] for symbol in self.symbol_data.keys()] spread = np.product(data) if not spread: return insights # If all pairs got consolidated data and updated their daily price, we update the Kalman Filter if all([data.updated for data in self.symbol_data.values()]): # Update the Kalman Filter with the spread (self.currentMean, self.currentCov) = self.kalmanFilter.filter_update(filtered_state_mean = self.currentMean, filtered_state_covariance = self.currentCov, observation = spread) # reset the flag for data in self.symbol_data.values(): data.updated = False # Obtain the normalized spread normalized_spread = spread - self.currentMean # Mean-reversion if normalized_spread < -self.threshold and self.state != 1: for symbol, weight in self.trading_weight.items(): if algorithm.IsMarketOpen(symbol): insights.append( Insight.Price(symbol, timedelta(365), InsightDirection.Up, weight=weight)) self.state = 1 elif normalized_spread > self.threshold and self.state != -1: for symbol, weight in self.trading_weight.items(): if algorithm.IsMarketOpen(symbol): insights.append( Insight.Price(symbol, timedelta(365), InsightDirection.Down, weight=weight)) self.state = -1 # Out of position if spread converged elif (self.state == 1 and normalized_spread > 0) or (self.state == -1 and normalized_spread < 0): algorithm.Insights.Cancel(list(self.symbol_data.keys())) self.state = 0 return insights def Recalibrate(self): # Get log price series of all signaled assets log_price = np.log( pd.DataFrame({symbol: data.Price for symbol, data in self.symbol_data.items() if data.IsReady})) if log_price.empty: return # Get the weighted spread across different cointegration subspaces weighted_spread, weights, beta = self.GetSpreads(log_price) # Set up the Kalman Filter with the weighted spread series, and obtain the adjusted mean series mean_series = self.SetKalmanFilter(weighted_spread) # Obtain the normalized spread series, the first 20 in-sample will be discarded. normalized_spread = (weighted_spread.iloc[20:] - mean_series) # Set the threshold of price divergence to optimize profit self.SetTradingThreshold(normalized_spread) # Set the normalize trading weight weights = self.GetTradingWeight(beta, weights) for symbol, weight in zip(log_price.columns, weights): self.trading_weight[symbol] = weight def GetSpreads(self, logPriceDf): # Initialize a VECM model following the unit test parameters, then fit to our data. # We allow 3 AR difference, and no deterministic term. vecm_result = VECM(logPriceDf, k_ar_diff=3, coint_rank=logPriceDf.shape[1]-1, deterministic='n').fit() # Obtain the Beta attribute. This is the cointegration subspaces' unit vectors. beta = vecm_result.beta # get the spread of different cointegration subspaces. spread = logPriceDf @ beta # Optimize the distribution across cointegration subspaces and return the weighted spread return self.OptimizeSpreads(spread, beta) def OptimizeSpreads(self, spread, beta): # We set the weight on each vector is between -1 and 1. While overall sum is 0. x0 = np.array([-1**i / beta.shape[1] for i in range(beta.shape[1])]) bounds = tuple((-1, 1) for i in range(beta.shape[1])) constraints = [{'type': 'eq', 'fun': lambda x: np.sum(x)}] # Optimize the Portmanteau statistics opt = minimize(lambda w: ((w.T @ np.cov(spread.T, spread.shift(1).fillna(0).T)[spread.shape[1]:, :spread.shape[1]] @ w)\ / (w.T @ np.cov(spread.T) @ w))**2, x0=x0, bounds=bounds, constraints=constraints, method="SLSQP") # Normalize the result opt.x = opt.x / np.sum(abs(opt.x)) # Return the weighted spread series return spread @ opt.x, opt.x, beta def SetKalmanFilter(self, weighted_spread): # Initialize a Kalman Filter. Using the first 20 data points to optimize its initial state. # We assume the market has no regime change so that the transitional matrix and observation matrix is [1]. self.kalmanFilter = KalmanFilter(transition_matrices = [1], observation_matrices = [1], initial_state_mean = weighted_spread.iloc[:20].mean(), observation_covariance = weighted_spread.iloc[:20].var(), em_vars=['transition_covariance', 'initial_state_covariance']) self.kalmanFilter = self.kalmanFilter.em(weighted_spread.iloc[:20], n_iter=5) (filtered_state_means, filtered_state_covariances) = self.kalmanFilter.filter(weighted_spread.iloc[:20]) # Obtain the current Mean and Covariance Matrix expectations. self.currentMean = filtered_state_means[-1, :] self.currentCov = filtered_state_covariances[-1, :] # Initialize a mean series for spread normalization using the Kalman Filter's results. mean_series = np.array([None]*(weighted_spread.shape[0]-20)) # Roll over the Kalman Filter to obtain the mean series. for i in range(20, weighted_spread.shape[0]): (self.currentMean, self.currentCov) = self.kalmanFilter.filter_update(filtered_state_mean = self.currentMean, filtered_state_covariance = self.currentCov, observation = weighted_spread.iloc[i]) mean_series[i-20] = float(self.currentMean) return mean_series def SetTradingThreshold(self, normalized_spread): # Initialize 20 set levels for testing. s0 = np.linspace(0, max(normalized_spread), 20) # Calculate the profit levels using the 20 set levels. f_bar = np.array([None] * 20) for i in range(20): f_bar[i] = len(normalized_spread.values[normalized_spread.values > s0[i]]) \ / normalized_spread.shape[0] # Set trading frequency matrix. D = np.zeros((19, 20)) for i in range(D.shape[0]): D[i, i] = 1 D[i, i+1] = -1 # Set level of lambda. l = 1.0 # Obtain the normalized profit level. f_star = np.linalg.inv(np.eye(20) + l * D.T @ D) @ f_bar.reshape(-1, 1) s_star = [f_star[i] * s0[i] for i in range(20)] self.threshold = s0[s_star.index(max(s_star))] def GetTradingWeight(self, beta, weights): trading_weight = beta @ weights return trading_weight / np.sum(abs(trading_weight)) def OnSecuritiesChanged(self, algorithm, changes): for removed in changes.RemovedSecurities: symbolData = self.symbol_data.pop(removed.Symbol, None) if symbolData: symbolData.Dispose() for added in changes.AddedSecurities: symbol = added.Symbol if symbol not in self.symbol_data and added.Type == SecurityType.Forex: self.symbol_data[symbol] = SymbolData(algorithm, symbol, self.lookback) class SymbolData: def __init__(self, algorithm, symbol, lookback): self.algorithm = algorithm self.symbol = symbol self.lookback = lookback self.updated = False # To store the historical daily log return self.window = RollingWindow[IndicatorDataPoint](lookback) # Use daily log return to predict cointegrating vector self.consolidator = QuoteBarConsolidator(timedelta(hours=1)) self.price = Identity(f"{symbol} Price") self.price.Updated += self.OnUpdate # Subscribe the consolidator and indicator to data for automatic update algorithm.RegisterIndicator(symbol, self.price, self.consolidator) algorithm.SubscriptionManager.AddConsolidator(symbol, self.consolidator) # historical warm-up on the log return indicator history = algorithm.History[QuoteBar](self.symbol, self.lookback, Resolution.Hour) for bar in history: self.consolidator.Update(bar) def OnUpdate(self, sender, updated): self.window.Add(IndicatorDataPoint(updated.EndTime, updated.Value)) self.updated = True def Dispose(self): self.price.Updated -= self.OnUpdate self.price.Reset() self.window.Reset() self.algorithm.SubscriptionManager.RemoveConsolidator(self.symbol, self.consolidator) @property def IsReady(self): return self.window.IsReady @property def Price(self): return pd.Series( data = [x.Value for x in self.window], index = [x.EndTime for x in self.window])[::-1]
# region imports from AlgorithmImports import * from alpha import KalmanFilterStatisticalArbitrageAlphaModel # endregion class KalmanFilterStatisticalArbitrageAlgorithm(QCAlgorithm): def Initialize(self): self.SetStartDate(2019, 1, 1) self.SetEndDate(2023, 1, 1) self.SetCash(100000) self.SetBrokerageModel(BrokerageName.OandaBrokerage, AccountType.Margin) self.UniverseSettings.Resolution = Resolution.Minute # We focus on major forex pairs symbols = [ Symbol.Create(pair, SecurityType.Forex, Market.Oanda) for pair in ["AUDUSD", "EURUSD", "GBPUSD", "USDCAD", "USDCHF", "USDJPY"] ] self.SetUniverseSelection(ManualUniverseSelectionModel(symbols)) # A custom alpha model for Kalman Filter prediction and statistical arbitrage signaling self.AddAlpha(KalmanFilterStatisticalArbitrageAlphaModel()) # Use the insight weights for sizing, set a very long rebalance period to avoid constant rebalancing self.SetPortfolioConstruction(InsightWeightingPortfolioConstructionModel(Expiry.EndOfYear))