Overall Statistics |
Total Orders 12 Average Win 8.98% Average Loss -7.04% Compounding Annual Return 4.021% Drawdown 7.300% Expectancy 0.517 Start Equity 100000 End Equity 121814.88 Net Profit 21.815% Sharpe Ratio 0.317 Sortino Ratio 0.228 Probabilistic Sharpe Ratio 28.535% Loss Rate 33% Win Rate 67% Profit-Loss Ratio 1.28 Alpha 0.014 Beta -0.037 Annual Standard Deviation 0.035 Annual Variance 0.001 Information Ratio -0.406 Tracking Error 0.168 Treynor Ratio -0.298 Total Fees $125.70 Estimated Strategy Capacity $35000000.00 Lowest Capacity Asset SLV TI6HUUU1DDUT Portfolio Turnover 0.48% |
#region imports from AlgorithmImports import * import ou_mle as ou from OptimalStopping import OptimalStopping from datetime import datetime from collections import deque #endregion class Model: ''' How to use Model: 1. Model.Update() in OnData (including during Warmup) 2. if Model.ready2_train() -> Model.Train() 2.1. Retrain periodically 3. Buy Portfolio if Model.is_enter() 4. If bought, sell if Model.is_exit() ''' def __init__(self): self._optimal_stopping = None self._alloc_b = -1 self._time = deque(maxlen=252) # RW's aren't supported for datetimes self._close_a = deque(maxlen=252) self._close_b = deque(maxlen=252) self._portfolio = None # represents portfolio value of holding # $1 of stock A and -$alloc_b of stock B def update(self, time, close_a, close_b): ''' Adds a new point of data to our model, which will be used in the future for training/retraining ''' if self._portfolio is not None: self._portfolio.update(close_a, close_b) self._time.append(time) self._close_a.append(close_a) self._close_b.append(close_b) # @property basically a function to a field @property def ready2_train(self): ''' returns true iff our model has enough data to train ''' return len(self._close_a) == self._close_a.maxlen @property def is_ready(self): ''' returns true iff our model is ready to provide signals ''' return self._optimal_stopping is not None def train(self, r=.05, c=.05): ''' Computes our OU and B-Allocation coefficients ''' if not self.ready2_train: return ts_a = np.array(self._close_a) ts_b = np.array(self._close_b) days = (self._time[-1] - self._time[0]).days dt = 1.0 / days theta, mu, sigma, self._alloc_b = self._argmax_B_alloc(ts_a, ts_b, dt) try: if self._optimal_stopping is None: self._optimal_stopping = OptimalStopping(theta, mu, sigma, r, c) else: self._optimal_stopping.update_fields(theta, mu, sigma, r, c) except: # sometimes we get weird OU Coefficients that lead to unsolveable Optimal Stopping self._optimal_stopping = None self._portfolio = Portfolio(ts_a[-1], ts_b[-1], self._alloc_b) def allocation_b(self): return self._alloc_b def is_enter(self): ''' Return True if it is optimal to enter the Pairs Trade, False otherwise ''' return self._portfolio.value() <= self._optimal_stopping.entry() def is_exit(self): ''' Return True if it is optimal to exit the Pairs Trade, False otherwise ''' return self._portfolio.value() >= self._optimal_stopping.exit() def _compute_portfolio_values(self, ts_a, ts_b, alloc_b): ''' Compute the portfolio values over time when holding $1 of stock A and -$alloc_b of stock B input: ts_a - time-series of price data of stock A, ts_b - time-series of price data of stock B outputs: Portfolio values of holding $1 of stock A and -$alloc_b of stock B ''' ts_a = ts_a.copy() # defensive programming ts_b = ts_b.copy() ts_a = ts_a / ts_a[0] ts_b = ts_b / ts_b[0] return ts_a - alloc_b * ts_b def _argmax_B_alloc(self, ts_a, ts_b, dt): ''' Finds the $ allocation ratio to stock B to maximize the log likelihood from the fit of portfolio values to an OU process input: ts_a - time-series of price data of stock A, ts_b - time-series of price data of stock B dt - time increment (1 / days(start date - end date)) returns: θ*, µ*, σ*, B* ''' theta = mu = sigma = alloc_b = 0 max_log_likelihood = 0 def compute_coefficients(x): portfolio_values = self._compute_portfolio_values(ts_a, ts_b, x) return ou.estimate_coefficients_MLE(portfolio_values, dt) vectorized = np.vectorize(compute_coefficients) linspace = np.linspace(.01, 1, 100) res = vectorized(linspace) index = res[3].argmax() return res[0][index], res[1][index], res[2][index], linspace[index] def __repr__(self): ''' String representation of the OU coefficients of our model ''' return f'θ: {self._optimal_stopping.theta:.2} μ: {self._optimal_stopping.mu:.2} σ: {self._optimal_stopping.sigma:.2}' \ if self.is_ready else 'Not ready' class Portfolio: ''' Represents a portfolio of holding $1 of stock A and -$alloc_b of stock B ''' def __init__(self, price_a, price_b, alloc_b): self._init_price_a = price_a self._init_price_b = price_b self._curr_price_a = price_a self._curr_price_b = price_b self._alloc_b = alloc_b def update(self, new_price_a, new_price_b): self._curr_price_a = new_price_a self._curr_price_b = new_price_b def value(self): return self._curr_price_a / self._init_price_a - self._alloc_b * self._curr_price_b / self._init_price_b
#region imports from AlgorithmImports import * from math import sqrt, exp import scipy.integrate as si import scipy.optimize as so #endregion # source for computation: https://arxiv.org/pdf/1411.5062.pdf class OptimalStopping: ''' Optimal Stopping Provides Functions for computing the Optimal Entry and Exit for our Pairs Portfolio Functions V and J are the functions used to calculate the Exit and Entry values, respectively ''' def __init__(self, theta, mu, sigma, r, c): ''' x - current portfolio value theta, mu, sigma - Ornstein-Uhlenbeck Coefficients (note we use self.theta for mean and self.mu for drift, while some sources use self.mu for mean and self.theta for drift) r - investor's subject discount rate c - cost of trading ''' self.theta = theta self.mu = mu self.sigma = sigma self._r = r self._c = c self._b_star = self._b() self._f_of_b = self._f(self._b_star) self._d_star = self._d() def update_fields(self, theta=None, mu=None, sigma=None, r=None, c=None): ''' Update our OU Coefficients ''' if theta is not None: self.theta = theta if mu is not None: self.mu = mu if sigma is not None: self.sigma = sigma if r is not None: self._r = r if c is not None: self._c = c self._b_star = self._b() self._f_of_b = self._f(self._b_star) self._d_star = self._d() def entry(self): ''' Optimal value to enter/buy the portfolio ''' return self._d_star def exit(self): ''' Optimal value to exit/liquidate the portfolio ''' return self._b_star def _v(self, x): # equation 4.2, solution of equation posed by 2.3 if x < self._b_star: return (self._b_star - self._c) * self._f(x) / self._f_of_b else: return x - self._c def _f(self, x): # equation 3.3 def integrand(u): return u ** (self._r / self.mu - 1) * exp(sqrt(2 * self.mu / self.sigma ** 2) * (x - self.theta) * u - u ** 2 / 2) return si.quad(integrand, 0, np.inf)[0] def _g(self, x): # equation 3.4 def integrand(u): return u ** (self._r / self.mu - 1) * exp(sqrt(2 * self.mu / self.sigma ** 2) * (self.theta - x) * u - u ** 2 / 2) return si.quad(integrand, 0, np.inf)[0] def _b(self): # estimates b* using equation 4.3 def func(b): return self._f(b) - (b - self._c) * self._prime(self._f, b) # finds the root of function between the interval [0, 1] return so.brentq(func, 0, 1) def _d(self): # estimates d* using equation 4.11 def func(d): return (self._g(d) * (self._prime(self._v, d) - 1)) - (self._prime(self._g, d) * (self._v(d) - d - self._c)) # finds the root of function between the interval [0, 51 return so.brentq(func, 0, 1) def _prime(self, f, x, h=1e-4): # given f, estimates f'(x) using the difference quotient forself.mula # WARNING: LOWER h VALUES CAN LEAD TO WEIRD RESULTS return (f(x + h) - f(x)) / h
#region imports from AlgorithmImports import * from Model import Model #endregion class ModulatedMultidimensionalAtmosphericScrubbers(QCAlgorithm): def initialize(self): self.set_start_date(2015, 8, 15) self.set_end_date(2020, 8, 15) self.set_cash(100000) self.set_benchmark('SPY') self._a = self.add_equity('GLD', Resolution.DAILY).symbol self._b = self.add_equity('SLV', Resolution.DAILY).symbol self.set_warmup(252) self._model = Model() # retrain our model periodically self.train(self.date_rules.month_start('GLD'), self.time_rules.midnight, self._train_model) def on_data(self, data): self._model.update(self.time, data[self._a].close, data[self._b].close) if self.is_warming_up: return if not self._model.is_ready: return # if we aren't holding the portfolio and our model tells us to buy # the portfolio, we buy the portfolio if not self.portfolio.invested and self._model.is_enter(): self.set_holdings(self._a, 1) self.set_holdings(self._b, -self._model.allocation_b()) # if we are holding the portfolio and our model tells us to sell # the portfolio, we liquidate our holdings elif self.portfolio.invested and self._model.is_exit(): self.liquidate() def _train_model(self): if not self._model.ready2_train: return # retrain quarterly if self.time.month % 3 != 1: return self._model.train() if not self._model.is_ready: self.liquidate() return self.log(self._model)
#region imports from AlgorithmImports import * #endregion # source for computation: https://arxiv.org/pdf/1411.5062.pdf ### IMPORTANT: PLEASE NOTE WE USE THETA FOR MEAN AND MU FOR DRIFT ### WHILE OTHER SOURCES, E.G. WIKIPEDIA, USES MU FOR MEAN AND THETA FOR DRIFT import math from math import sqrt, exp, log # exp(n) == e^n, log(n) == ln(n) import scipy.optimize as so import numpy as np def __compute_log_likelihood(params, *args): ''' Compute the average Log Likelihood, this function will by minimized by scipy. Find in (2.2) in linked paper returns: the average log likelihood from given parameters ''' # functions passed into scipy's minimize() needs accept one parameter, a tuple of # of values that we adjust to minimize the value we return. # optionally, *args can be passed, which are values we don't change, but still want # to use in our function (e.g. the measured heights in our sample or the value Pi) theta, mu, sigma = params X, dt = args n = len(X) sigma_tilde_squared = sigma ** 2 * (1 - exp(-2 * mu * dt)) / (2 * mu) summation_term = 0 for i in range(1, len(X)): summation_term += (X[i] - X[i - 1] * exp(-mu * dt) - theta * (1 - exp(-mu * dt))) ** 2 summation_term = -summation_term / (2 * n * sigma_tilde_squared) log_likelihood = (-log(2 * math.pi) / 2) + (-log(sqrt(sigma_tilde_squared))) + summation_term return -log_likelihood # since we want to maximize this total log likelihood, we need to minimize the # negation of the this value (scipy doesn't support maximize) def estimate_coefficients_MLE(X, dt, tol=1e-4): ''' Estimates Ornstein-Uhlenbeck coefficients (θ, µ, σ) of the given array using the Maximum Likelihood Estimation method input: X - array-like time series data to be fit as an OU process dt - time increment (1 / days(start date - end date)) tol - tolerance for determination (smaller tolerance means higher precision) returns: θ, µ, σ, Average Log Likelihood ''' bounds = ((None, None), (1e-5, None), (1e-5, None)) # theta ∈ ℝ, mu > 0, sigma > 0 # we need 1e-10 b/c scipy bounds are inclusive of 0, # and sigma = 0 causes division by 0 error theta_init = np.mean(X) initial_guess = (theta_init, 100, 100) # initial guesses for theta, mu, sigma result = so.minimize(__compute_log_likelihood, initial_guess, args=(X, dt), bounds=bounds) theta, mu, sigma = result.x max_log_likelihood = -result.fun # undo negation from __compute_log_likelihood # .x gets the optimized parameters, .fun gets the optimized value return theta, mu, sigma, max_log_likelihood