Overall Statistics |
Total Trades 0 Average Win 0% Average Loss 0% Compounding Annual Return 0% Drawdown 0% Expectancy 0 Net Profit 0% Sharpe Ratio 0 Probabilistic Sharpe Ratio 0% Loss Rate 0% Win Rate 0% Profit-Loss Ratio 0 Alpha 0 Beta 0 Annual Standard Deviation 0 Annual Variance 0 Information Ratio -19.14 Tracking Error 0.129 Treynor Ratio 0 Total Fees $0.00 Estimated Strategy Capacity $0 Lowest Capacity Asset |
# MIT License # Copyright (c) [2022] [https://github.com/AdamWLabs/tsfracdiff] # Permission is hereby granted, free of charge, to any person obtaining a copy # of this software and associated documentation files (the "Software"), to deal # in the Software without restriction, including without limitation the rights # to use, copy, modify, merge, publish, distribute, sublicense, and/or sell # copies of the Software, and to permit persons to whom the Software is # furnished to do so, subject to the following conditions: # The above copyright notice and this permission notice shall be included in all # copies or substantial portions of the Software. # THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR # IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, # FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE # AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER # LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, # OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE # SOFTWARE.
# region imports from AlgorithmImports import * # endregion from tsfracdiff import FractionalDifferentiator class MuscularOrangeCaribou(QCAlgorithm): def Initialize(self): self.SetStartDate(2010, 1, 1) self.SetEndDate(2010, 1, 5) self.SetCash(1000) self.AddEquity("SPY", Resolution.Minute) self.ModelStuff() def OnData(self, data: Slice): pass def ModelStuff(self): df = self.History(self.Securities.Keys, timedelta(days=3), Resolution.Minute) # Prices close = df.unstack(level=0)['close'] # Fractionally differentiate fracDiff = FractionalDifferentiator() close_stationary = fracDiff.FitTransform( close ) # Invert the transform if needed close = fracDiff.InverseTransform( close_stationary ) # See the estimated orders fracDiff.orders # Model Stuff return
from unit_root_tests import * import pandas as pd import numpy as np class FractionalDifferentiator: def __init__(self, maxOrderBound=1, significance=0.01, precision=0.01, unitRootTest='PP', unitRootTestConfig={}): """ Provides estimation of the minimum fractional order of differentiation required for stationarity and data transformations. The available stationarity/unit root tests are: ----------------------------------------------- - 'PP' : Phillips and Perron (1988) [default] - 'ADF' : Augmented Dickey-Fuller (Said & Dickey, 1984) Parameters: ----------- maxOrderBound (float) Maximum real-valued order to search in (0, maxOrderBound) significance (float) Statistical significance level precision (float) Precision of estimated order unitRootTest (str) Unit-root/stationarity tests: ['PP','ADF'] unitRootTestConfig (dict) Optional keyword arguments to pass to unit root tests Attributes: ----------- orders (list) Estimated minimum orders of differentiation Example: -------- ``` # A pandas.DataFrame/np.array with potentially non-stationary time series df # Automatic stationary transformation with minimal information loss from tsfracdiff import FractionalDifferentiator fracDiff = FractionalDifferentiator() df = fracDiff.fit_transform(df) ``` """ self.maxOrderBound = maxOrderBound self.significance = significance self.precision = precision # Critical value checks checkCV = False cv_sig = None if (self.significance in [0.01, 0.05, 0.1]): checkCV = True cv_sig = str(int(self.significance * 100)) + '%' # Unit-root/Stationarity tests if unitRootTest == 'PP': self.UnitRootTest = PhillipsPerron(significance=significance, checkCV=checkCV, cv_sig=cv_sig) elif unitRootTest == 'ADF': self.UnitRootTest = ADFuller(significance=significance, checkCV=checkCV, cv_sig=cv_sig) else: raise Exception('Please specify a valid unit root test.') self.UnitRootTest.config.update( unitRootTestConfig ) # States self.isFitted = False self.orders = [] self.lagData = None def Fit(self, df, parallel=True): """ Estimates the minimum orders of differencing required for stationarity. Parameters: ----------- df (pandas.DataFrame/np.array) Raw data parallel (bool) Use multithreading if true (default). Requires `joblib`. """ df = pd.DataFrame(df).sort_index() # Estimate minimum order of differencing if parallel: try: import multiprocessing from joblib import Parallel, delayed from functools import partial except ImportError: raise Exception('The module `joblib` is required for parallelization.') def ApplyParallel(df, func, **kwargs): n_jobs = min(df.shape[1], multiprocessing.cpu_count()) res = Parallel(n_jobs=n_jobs, prefer='threads')( delayed(partial(func, **kwargs))(x) for x in np.array_split(df, df.shape[1], axis=1) ) return res orders = ApplyParallel(df, self._MinimumOrderSearch, upperOrder=self.maxOrderBound, first_run=True) else: orders = [] for j in range(df.shape[1]): orders.append( self._MinimumOrderSearch(df.iloc[:,j], upperOrder=self.maxOrderBound, first_run=True) ) self.orders = orders # Store lagged data for inverse-transformations numLags = [ (len(self._GetMemoryWeights(order)) - 1) for order in self.orders ] self.lagData = [ df.iloc[:,j].head(lag) for j,lag in enumerate(numLags) ] self.isFitted = True return def FitTransform(self, df, parallel=True): """ Estimates the minimum orders and returns a fractionally differentiated dataframe. Parameters ---------- df (pandas.DataFrame/np.array) Raw data parallel (bool) Use multithreading if true (default). Requires `joblib`. """ if not self.isFitted: self.Fit(df, parallel=parallel) fracDiffed = self.Transform(df) return fracDiffed def Transform(self, df): """ Applies a fractional differentiation transformation based on estimated orders. Parameters ---------- df (pandas.DataFrame/np.array) Raw data """ if not self.isFitted: raise Exception('Fit the model first.') df = pd.DataFrame(df).sort_index() fracDiffed = [] for j in range(df.shape[1]): x = self._FracDiff(df.iloc[:,j], order=self.orders[j]) fracDiffed.append( x ) fracDiffed = pd.concat(fracDiffed, axis=1) return fracDiffed def InverseTransform(self, fracDiffed): """ Inverts the fractional differentiation transformation. Note that the full dataframe exactly as returned by `.transform()` is required, including any `NaN` padded missing values. Parameters ---------- fracDiffed (pandas.DataFrame/np.array) Fractionally differentiated data """ if not self.isFitted: raise Exception('Fit the model first.') fracDiffed = pd.DataFrame(fracDiffed).sort_index() X = [] for j in range(fracDiffed.shape[1]): memoryWeights = self._GetMemoryWeights(self.orders[j]) K = len(memoryWeights) # Initial values lagData = self.lagData[j] lag_idx = lagData.index # Differenced values X_tilde = fracDiffed.iloc[:,j].dropna() idx = X_tilde.index # Iteratively invert transformation X_vals = np.ravel(self.lagData[j]) X_tilde = np.ravel(X_tilde.values) for t in range(len(X_tilde)): x = -np.sum( memoryWeights[:-1] * X_vals[-(K-1):] ) + X_tilde[t] X_vals = np.append(X_vals, x) idx = np.concatenate( (lag_idx.values, idx.values) ) X_vals = pd.Series(X_vals, index=idx) X.append( X_vals ) X = pd.concat([ x for x in X ], axis=1) X.columns = fracDiffed.columns return X def _GetMemoryWeights(self, order, memoryThreshold=1e-4): """ Returns an array of memory weights for each time lag. Parameters: ----------- order (float) Order of fracdiff memoryThreshold (float) Minimum magnitude of weight significance """ memoryWeights = [1,] k = 1 while True: weight = -memoryWeights[-1] * ( order - k + 1 ) / k # Iteratively generate next lag weight if abs(weight) < memoryThreshold: break memoryWeights.append(weight) k += 1 return np.array(list(reversed(memoryWeights))) def _FracDiff(self, ts, order=1, memoryWeights=None): """ Differentiates a time series based on a real-valued order. Parameters: ----------- ts (pandas.Series) Univariate time series order (float) Order of differentiation memoryWeights (array) Optional pre-computed weights """ if memoryWeights is None: memoryWeights = self._GetMemoryWeights(order) K = len(memoryWeights) fracDiffedSeries = ts.rolling(K).apply(lambda x: np.sum( x * memoryWeights ), raw=True) fracDiffedSeries = fracDiffedSeries.iloc[(K-1):] return fracDiffedSeries def _MinimumOrderSearch(self, ts, lowerOrder=0, upperOrder=1, first_run=False): """ Binary search algorithm for estimating the minimum order of differentiation required for stationarity. Parameters ---------- ts (pandas.Series) Univariate time series lowerOrder (float) Lower bound on order upperOrder (float) Upper bound on order first_run (bool) For testing endpoints of order bounds """ ## Convergence criteria if abs( upperOrder - lowerOrder ) <= self.precision: return upperOrder ## Initial run: Test endpoints if first_run: lowerFracDiff = self._FracDiff(ts, order=lowerOrder).dropna() upperFracDiff = self._FracDiff(ts, order=upperOrder).dropna() # Unit root tests lowerStationary = self.UnitRootTest.IsStationary( lowerFracDiff ) upperStationary = self.UnitRootTest.IsStationary( upperFracDiff ) # Series is I(0) if lowerStationary: return lowerOrder # Series is I(k>>1) if not upperStationary: print('Warning: Time series is explosive. Increase upper bounds.') return upperOrder ## Binary Search: Test midpoint midOrder = ( lowerOrder + upperOrder ) / 2 midFracDiff = self._FracDiff(ts, order=midOrder).dropna() midStationary = self.UnitRootTest.IsStationary( midFracDiff ) # Series is weakly stationary in [lowerOrder, midOrder] if midStationary: return self._MinimumOrderSearch(ts, lowerOrder=lowerOrder, upperOrder=midOrder) # Series is weakly stationary in [midOrder, upperOrder] else: return self._MinimumOrderSearch(ts, lowerOrder=midOrder, upperOrder=upperOrder)
import arch from arch.unitroot import PhillipsPerron as PP from arch.unitroot import ADF ## TODO: Ng and Perron (2001)? class PhillipsPerron: """ Unit root testing via Phillips and Perron (1988). This test is robust to serial correlation and heteroskedasticity. References: ----------- Phillips, P. C. B., & Perron, P. (1988). Testing for a unit root in time series regression. Biometrika, 75(2), 335–346. https://doi.org/10.1093/biomet/75.2.335 """ def __init__(self, config={ 'trend' : 'n', 'test_type' : 'tau'}, significance=0.01, checkCV=False, cv_sig=None): self.config = config self.significance = significance self.checkCV = checkCV self.cv_sig = cv_sig def IsStationary(self, ts): """ Performs a unit root test. """ testResults = PP(ts, trend=self.config['trend'], test_type=self.config['test_type']) pval, cv, stat = testResults.pvalue, testResults.critical_values, testResults.stat result = self.HypothesisTest(pval, cv, stat) return result def HypothesisTest(self, pval, cv, stat): """ Null Hypothesis: Time series is integrated of order I(1) Alt Hypothesis: Time series is integrated of order I(k<1) """ # Reject the hypothesis if (pval < self.significance) or ( self.checkCV and (stat < cv.get(self.cv_sig, 0)) ): return True # Fail to reject the hypothesis else: return False class ADFuller: """ Unit root testing via Said and Dickey (1984). This test assumes a parametric ARMA structure to correct for serial correlation but assumes the errors are homoskedastic. References: ----------- Said E. Said, & Dickey, D. A. (1984). Testing for Unit Roots in Autoregressive-Moving Average Models of Unknown Order. Biometrika, 71(3), 599–607. https://doi.org/10.2307/2336570 """ def __init__(self, config={ 'trend' : 'n', 'method' : 'AIC'}, significance=0.01, checkCV=False, cv_sig=None): self.config = config self.significance = significance self.checkCV = checkCV self.cv_sig = cv_sig ## Compatability workaround // # arch <= 4.17 uses capital letters but newer versions use lowercase if (str(arch.__version__) > '4.17'): if self.config.get('method') == 'AIC': self.config['method'] = 'aic' elif self.config.get('method') == 'BIC': self.config['method'] = 'bic' def IsStationary(self, ts): """ Performs a unit root test. """ testResults = ADF(ts, trend=self.config['trend'], method=self.config['method']) pval, cv, stat = testResults.pvalue, testResults.critical_values, testResults.stat result = self.HypothesisTest(pval, cv, stat) return result def HypothesisTest(self, pval, cv, stat): """ Null Hypothesis: Gamma = 0 (Unit root) Alt Hypothesis: Gamma < 0 """ # Reject the hypothesis if (pval < self.significance) or ( self.checkCV and (stat < cv.get(self.cv_sig, 0)) ): return True # Fail to reject the hypothesis else: return False