Overall Statistics |
Total Trades 36 Average Win 0.05% Average Loss -0.05% Compounding Annual Return 1.619% Drawdown 0.300% Expectancy 0.119 Net Profit 0.114% Sharpe Ratio 1.474 Probabilistic Sharpe Ratio 54.867% Loss Rate 44% Win Rate 56% Profit-Loss Ratio 1.01 Alpha -0.032 Beta 0.08 Annual Standard Deviation 0.011 Annual Variance 0 Information Ratio -8.995 Tracking Error 0.065 Treynor Ratio 0.202 Total Fees $36.00 |
import json import pandas as pd import numpy as np from io import StringIO from numpy.fft import fft, ifft import numba from talib.abstract import ( DEMA, EMA, MIDPRICE, SMA, T3, TEMA, TRIMA, WMA, ADX, ADXR, AROONOSC, BOP, CMO, DX, MFI, MINUS_DM, MOM, ROC, RSI, TRIX , WILLR, ATR, NATR, BBANDS, AROON, STOCHRSI, HT_TRENDLINE, AD, OBV, HT_DCPERIOD, HT_DCPHASE, HT_TRENDMODE, TRANGE, AVGPRICE, MEDPRICE, TYPPRICE, WCLPRICE, ULTOSC, MAMA, SAR, SAREXT, APO, MACD, ADOSC, HT_PHASOR, HT_SINE, STOCHF, STOCH ) import mlfinlab as ml from pipelines import TripleBarierLabeling, OutlierStdRemove from model_loader import deserialize_random_forest from statsmodels.tsa.stattools import adfuller from method_timer import time_method class CalibratedResistanceAtmosphericScrubbers(QCAlgorithm): periods = [5, 30, 60, 300, 480, 2400, 12000, 96000] std_outlier = 10 volatility_lookback = 50 olatility_scaler = 1 tb_triplebar_num_days = 3 tb_triplebar_pt_sl = [1, 1] tb_triplebar_min_ret = 0.003 rand_state = 3 def Initialize(self): # date, equity, brokerage and bencmark self.SetStartDate(2016, 7, 7) self.SetEndDate(2016, 8, 1) # 2020, 5, 15 self.SetCash(100000) self.spy = self.AddEquity("SPY", Resolution.Minute, fillDataForward=True).SetDataNormalizationMode(DataNormalizationMode.Adjusted) self.SetBrokerageModel(BrokerageName.InteractiveBrokersBrokerage, AccountType.Cash) self.Settings.FreePortfolioValuePercentage = 0.5 self.SetBenchmark("SPY") # OHLCV init self.open = pd.Series() self.high = pd.Series() self.low = pd.Series() self.close = pd.Series() self.volume = pd.Series() # warmp up period self.lookback = 96100 self.SetWarmUp(self.lookback) # ML model self.model = self.load_model("https://github.com/MislavSag/trademl/blob/master/trademl/modeling/random_forest/rf_model.json?raw=true") self.model_features = pd.read_csv(StringIO(self.Download('https://raw.githubusercontent.com/MislavSag/trademl/master/trademl/modeling/random_forest/feature_names.csv')), sep=',', index_col=[0]) self.model_features = self.model_features.squeeze() self.min_d = pd.read_csv(StringIO(self.Download('https://raw.githubusercontent.com/MislavSag/trademl/master/trademl/modeling/random_forest/min_d.csv')), sep=';', names=['feature', 'value']) self.min_d = self.min_d[1:] self.min_d = self.min_d.loc[self.min_d['feature'].isin(self.model_features)] self.min_d.set_index(self.min_d['feature'], inplace=True) self.stationary_cols = self.min_d['feature'].loc[self.min_d['value'] > 0] self.min_d = self.min_d['value'].loc[self.min_d['value'] > 0] # timezone self.SetTimeZone("Europe/Zagreb") def load_model(self, url): model = deserialize_random_forest(json.loads(self.Download(url))) self.Log("Successfully loaded model") return model @time_method def OnData(self, data): '''OnData event is the primary entry point for your algorithm. Each new data point will be pumped in here. Arguments: data: Slice object keyed by symbol containing the stock data ''' # if there are no bars data (only stok splits, dividends etc) than cont if "SPY" not in data.Bars: return ### GET HISTORICAL OHLCV DATA open_ = data["SPY"].Open high_ = data["SPY"].High low_ = data["SPY"].Low close_ = data["SPY"].Close volume_ = data["SPY"].Volume self.open = self.open.append(pd.Series([open_], index=[self.Time]))[-self.lookback:] self.high = self.high.append(pd.Series([high_], index=[self.Time]))[-self.lookback:] self.low = self.low.append(pd.Series([low_], index=[self.Time]))[-self.lookback:] self.close = self.close.append(pd.Series([close_], index=[self.Time]))[-self.lookback:] self.volume = self.volume.append(pd.Series([volume_], index=[self.Time]))[-self.lookback:] # continue if warm up has finished if self.IsWarmingUp: return ### CALCULATES EVENTS WHEN TO TRADE close_stationary = self.frac_diff_ffd(self.close.values, self.min_d.loc['close']) close_stationary = pd.Series(close_stationary, index=self.close.index) close_stationary = close_stationary.dropna() daily_vol = ml.util.get_daily_vol(self.close, lookback=50) cusum_events = ml.filters.cusum_filter(self.close, threshold=daily_vol.mean()*1) if cusum_events.empty: return if cusum_events[-1] == self.Time: self.Debug(self.Time) # create pandas data framr df = pd.DataFrame({'open': self.open, 'high': self.high, 'low': self.low, 'close': self.close, 'volume': self.volume}, index=self.close.index) ### ADD FEATURES TO OHLCV # add tecnical indicators df = self.add_technical_indicators(df, self.periods) df.columns = [cl[0] if isinstance(cl, tuple) else cl for cl in df.columns] # add ohlc transformations df['high_low'] = df ['high'] - df ['low'] df['close_open'] = df ['close'] - df ['open'] # simple momentum df['mom1'] = df['close'].pct_change(periods=1) df['mom2'] = df['close'].pct_change(periods=2) df['mom3'] = df['close'].pct_change(periods=3) df['mom4'] = df['close'].pct_change(periods=4) df['mom5'] = df['close'].pct_change(periods=5) # Volatility df['volatility_60'] = np.log(df['close']).diff().rolling( window=60, min_periods=60, center=False).std() df['volatility_30'] = np.log(df['close']).diff().rolling( window=30, min_periods=30, center=False).std() df['volatility_15'] = np.log(df['close']).diff().rolling( window=15, min_periods=15, center=False).std() df['volatility_10'] = np.log(df['close']).diff().rolling( window=10, min_periods=10, center=False).std() df['volatility_5'] =np.log(df['close']).diff().rolling( window=5, min_periods=5, center=False).std() # Serial Correlation (Takes time) # window_autocorr = 50 # df['autocorr_1'] = np.log(df['close']).diff().rolling( # window=window_autocorr, min_periods=window_autocorr, # center=False).apply(lambda x: x.autocorr(lag=1), raw=False) # df['autocorr_2'] = np.log(df['close']).diff().rolling( # window=window_autocorr, min_periods=window_autocorr, # center=False).apply(lambda x: x.autocorr(lag=2), raw=False) # df['autocorr_3'] = np.log(df['close']).diff().rolling( # window=window_autocorr, min_periods=window_autocorr, # center=False).apply(lambda x: x.autocorr(lag=3), raw=False) # df['autocorr_4'] = np.log(df['close']).diff().rolling( # window=window_autocorr, min_periods=window_autocorr, # center=False).apply(lambda x: x.autocorr(lag=4), raw=False) # df['autocorr_5'] = np.log(df['close']).diff().rolling( # window=window_autocorr, min_periods=window_autocorr, # center=False).apply(lambda x: x.autocorr(lag=5), raw=False) # Skewness df['skew_60'] = np.log(df['close']).diff().rolling( window=60, min_periods=60, center=False).skew() df['skew_30'] = np.log(df['close']).diff().rolling( window=30, min_periods=30, center=False).skew() df['skew_15'] = np.log(df['close']).diff().rolling( window=15, min_periods=15, center=False).skew() df['skew_10'] = np.log(df['close']).diff().rolling( window=10, min_periods=10, center=False).skew() df['skew_5'] =np.log(df['close']).diff().rolling( window=5, min_periods=5, center=False).skew() # kurtosis df['kurtosis_60'] = np.log(df['close']).diff().rolling( window=60, min_periods=60, center=False).kurt() df['kurtosis_30'] = np.log(df['close']).diff().rolling( window=30, min_periods=30, center=False).kurt() df['kurtosis_15'] = np.log(df['close']).diff().rolling( window=15, min_periods=15, center=False).kurt() df['kurtosis_10'] = np.log(df['close']).diff().rolling( window=10, min_periods=10, center=False).kurt() df['kurtosis_5'] =np.log(df['close']).diff().rolling( window=5, min_periods=5, center=False).kurt() ### KEEP ONLY COLUMNS NEEDED FOR MODEL df = df[self.model_features.to_list()] ### REMOVE NAN VALUES df = df.dropna() # MAKES SEIRES STATIONARY df = self.unstat_cols_to_stat(df, self.min_d, self.stationary_cols) ### PREDICTIONS # prediction = np.random.choice([0, 1], replace=True, p=[.5, .5]) prediction = self.model.predict(df.iloc[[-1], :]) if self.Securities["SPY"].Invested and prediction == -1: self.Liquidate("SPY") elif not self.Securities["SPY"].Invested and prediction == 1: self.SetHoldings("SPY", .5) def OnOrderEvent(self, orderEvent): pass @time_method def add_ind(self, ohlcv, f, n, periods): """ Add technical indicator to pd.DataFrame Parameters ---------- f : function function from ta_lib package. n : str Nme prefix. Returns ------- pd.Data.Frame. """ ind = pd.concat([f(ohlcv, p).rename(n + str(p)) for p in periods], axis=1) return ind @time_method def add_ind_df(self, ohlcv, f, n, periods): """ Add technical indicator to pd.DataFrame when indicator has multiplie outputs. Parameters ---------- f : function function from ta_lib package. n : str Nme prefix. Returns ------- pd.Data.Frame. """ ind = [f(ohlcv, p).add_prefix((f._Function__namestr + '_' + str(p) + '_')) for p in periods] # ind = [f(ohlcv, p). # set_axis((f._Function__namestr + '_' + # pd.Series(f.output_names) + '_' + str(p)), axis=1) # for p in periods] ind = pd.concat(ind, axis=1) return ind @time_method def add_technical_indicators(self, data, periods): """Add tecnical indicators as featues. Arguments: data {pd.DataFrame} -- Pandas data frame with OHLC data periods {list} -- List that contain periods as arguments. Returns: pd.dataFrame -- Pandas data frame with additional indicators """ # add technical indicators for variuos periods when ind has 1 output indsList = [DEMA, EMA, MIDPRICE, SMA, T3, # MIDPOINT TEMA, TRIMA, WMA, # KAMA memory intensive! ADX, ADXR, AROONOSC, BOP, CMO, DX, MFI, MINUS_DM, MOM, ROC, RSI, TRIX , WILLR, # CCI NE RADI (VALJDA) ATR, NATR] inds = [self.add_ind(data, f, f._Function__name.decode('ascii'), periods) for f in indsList] inds = pd.concat(inds, axis=1) data = pd.concat([data, inds], axis=1) # add technical indicators for variuos periods when ind has multiplie # outputs indsList = [BBANDS, AROON, STOCHRSI] inds = [self.add_ind_df(data, f, f._Function__name.decode('ascii'), periods) for f in indsList] inds = pd.concat(inds, axis=1) data = pd.concat([data, inds], axis=1) # add tecnical indicators with no arguments indsList = [HT_TRENDLINE, AD, OBV, HT_DCPERIOD, HT_DCPHASE, HT_TRENDMODE, TRANGE, AVGPRICE, MEDPRICE, TYPPRICE, WCLPRICE, ULTOSC] inds = [f(data).rename(f._Function__name.decode('ascii')) for f in indsList] inds = pd.concat(inds, axis=1) data = pd.concat([data, inds], axis=1) # add other indicators data[['MAMA', 'FAMA']] = MAMA(data) # MAVP ne radi data[['MAMA_25', 'FAMA_25']] = MAMA(data, fastlimit=0.25, slowlimit=0.02) # MAVP ne radi data[['MAMA_5', 'FAMA_5']] = MAMA(data, fastlimit=0.5, slowlimit=0.05) # MAVP ne radi data['SAR'] = SAR(data) data['SAR_1'] = SAR(data, acceleration=0.01, maximum=0.01) data['SAR_2'] = SAR(data, acceleration=0.02, maximum=0.02) data['SAREXT'] = SAREXT(data) startvalue, offsetonreverse, accelerationinitlong, accelerationlong,\ accelerationmaxlong, accelerationinitshort, accelerationshort,\ accelerationmaxshort = np.random.uniform(low=0.01, high=0.4, size=8) data['SAREXT_rand'] = SAREXT(data, startvalue=startvalue, offsetonreverse=offsetonreverse, accelerationinitlong=accelerationinitlong, accelerationlong=accelerationlong, accelerationmaxlong=accelerationmaxlong, accelerationinitshort=accelerationinitshort, accelerationshort=accelerationshort, accelerationmaxshort=accelerationmaxshort) data['APO'] = APO(data) data['APO_1'] = APO(data, fastperiod=24, slowperiod=52, matype=0) data['APO_2'] = APO(data, fastperiod=50, slowperiod=100, matype=0) data['APO_3'] = APO(data, fastperiod=100, slowperiod=200, matype=0) data['APO_4'] = APO(data, fastperiod=200, slowperiod=400, matype=0) data['APO_5'] = APO(data, fastperiod=12000, slowperiod=24000, matype=0) data['ADOSC'] = ADOSC(data) data[['MACD', 'MACDSIGNAL', 'MACDHIST']] = MACD (data) data[['inphase', 'quadrature']] = HT_PHASOR(data) data[['sine', 'leadsine']] = HT_SINE(data) data[['fastk', 'fastd']]= STOCHF(data) data[['fastk_20', 'fastd_20']]= STOCHF(data, fastk_period=20, fastd_period=9, fastd_matype=0) data[['fastk_200', 'fastd_200']]= STOCHF(data, fastk_period=200, fastd_period=80, fastd_matype=0) data[['fastk_3600', 'fastd_3600']]= STOCHF(data, fastk_period=3600, fastd_period=400, fastd_matype=0) data[['slowk', 'slowd ']]= STOCH(data) data[['slowk_30', 'slowd_30']]= STOCH(data, fastk_period=30, slowk_period=15, slowk_matype=0, slowd_period=9, slowd_matype=0) return data @time_method def get_weights(self, d, size): """Expanding window fraction difference weights.""" w = [1.0] for k in range(1, size): w_ = -w[-1] / k * (d - k + 1) w.append(w_) w = np.array(w[::-1]).reshape(-1, 1) return w @time_method #@numba.njit def get_weights_ffd(self, d, thres, lim=99999): """Fixed width window fraction difference weights. Set lim to be large if you want to only stop at thres. Set thres to be zero if you want to ignore it. """ w = [1.0] k = 1 for i in range(1, lim): w_ = -w[-1] / k * (d - k + 1) if abs(w_) < thres: break w.append(w_) k += 1 w = np.array(w[::-1]).reshape(-1, 1) return w @time_method def frac_diff_ffd(self, x, d, thres=1e-4, lim=None): assert isinstance(x, np.ndarray) assert x.ndim == 1 if lim is None: lim = len(x) w, out = self._frac_diff_ffd(x, d, lim, thres=thres) # print(f'weights is shape {w.shape}') return out @time_method #@numba.njit def _frac_diff_ffd(self, x, d, lim, thres=1e-4): """d is any positive real""" w = self.get_weights_ffd(d, thres, lim) width = len(w) - 1 output = [] output.extend([np.nan] * width) # the first few entries *were* zero, should be nan? for i in range(width, len(x)): output.append(np.dot(w.T, x[i - width: i + 1])[0]) return w, np.array(output) @time_method def fast_frac_diff(self, x, d): """expanding window version using fft form""" assert isinstance(x, np.ndarray) T = len(x) np2 = int(2 ** np.ceil(np.log2(2 * T - 1))) k = np.arange(1, T) b = (1,) + tuple(np.cumprod((k - d - 1) / k)) z = (0,) * (np2 - T) z1 = b + z z2 = tuple(x) + z dx = ifft(fft(z1) * fft(z2)) return np.real(dx[0:T]) @time_method def test_frac_diff_ffd_equals_original_impl(self, d=3): from .prado_orig import fracDiff_FFD_original_impl import pandas as pd x = np.random.randn(100) a = self.frac_diff_ffd(x, d, thres=1e-4) b = fracDiff_FFD_original_impl(pd.DataFrame(x), d, thres=1e-4) assert np.allclose(a, b) # return locals() @time_method def test_fast_frac_diff_equals_fracDiff_original_impl(self, d=3): from .prado_orig import fracDiff_original_impl import pandas as pd x = np.random.randn(100) a = fast_frac_diff(x, d) b = fracDiff_original_impl(pd.DataFrame(x), d, thres=None) b = b.values assert a.shape == b.shape assert np.allclose(a, b) # return locals() @time_method def min_ffd_value(self, unstationary_series, d_domain, pvalue_threshold=0.05): """ Source: Chapter 5, AFML (section 5.5, page 83); Minimal value of d which makes pandas series stationary. References: https://www.wiley.com/en-us/Advances+in+Financial+Machine+Learning-p-9781119482086 https://wwwf.imperial.ac.uk/~ejm/M3S8/Problems/hosking81.pdf Constant width window (new solution) Note 1: thresh determines the cut-off weight for the window Note 2: diff_amt can be any positive fractional, not necessarity bounded [0, 1]. :param unstationary_series: (pd.Series) :param d_domain: (np.array) numpy linspace; possible d values :param pvalue_threshold: (float) ADF p-value threshold above which nonstationary :return: (float) minimum value of d which makes series stationary """ d_min = None for d_i in d_domain: # resaample series to daily frequency df1 = unstationary_series.resample('1D').last() df1.dropna(inplace=True) df1 = df1.squeeze() # fracDiff for d df2 = self.frac_diff_ffd(df1.values, d=d_i, thres=1e-4, lim=None) df2 = pd.Series(df2, index=df1.index).dropna() # ADF test df2 = adfuller(df2.squeeze(), maxlag=1, regression='c', autolag=None) # if p-value is grater than threshold stop and return d if df2[1] <= pvalue_threshold: d_min = d_i break return d_min @time_method def unstat_cols_to_stat(self, data, min_d, stationaryCols): """ Convert unstationary columns to stationary. :param data: (pd.DataFrame) Pandas DF with unstationary columns. :return: (pd.DataFrame) Pandas DF with stationary columns. """ # make stationary spy dataStationary = data[stationaryCols].loc[:, min_d > 0] diff_amt_args = min_d[min_d > 0].to_list() for i, col in enumerate(dataStationary.columns): dataStationary[col] = self.frac_diff_ffd(dataStationary[col].values, diff_amt_args[i]) # add stationry spy to spy columnsToChange = data[stationaryCols].loc[:, min_d > 0].columns data[columnsToChange] = dataStationary data.dropna(inplace=True) return data
import time from functools import wraps def time_method(func): @wraps(func) def timed(*args, **kw): time_thresh = 1 # Function time taken printed if greater than this number ts = time.time() result = func(*args, **kw) te = time.time() if te - ts > time_thresh: algo = args[0] algo.Debug("%r took %2.2f seconds to run." % (func.__name__, te - ts)) return result return timed
# Your New Python Fileimport pandas as pd import numpy as np import mlfinlab as ml import pandas as pd class CalibratedResistanceAtmosphericScrubbers(QCAlgorithm): def Initialize(self): self.SetStartDate(2019, 1, 1) # Set Start Date self.SetEndDate(2019, 3, 1) self.SetCash(100000) # Set Strategy Cash self.spy = self.AddEquity("SPY", Resolution.minute) self.spy.SetDataNormalizationMode(DataNormalizationMode.Adjusted) # Raw, SplitAdjusted, TotalReturn self.SetBrokerageModel(BrokerageName.InteractiveBrokersBrokerage, AccountType.Cash) # init close prices self.open = np.array([]) self.high = np.array([]) self.low = np.array([]) self.close = np.array([]) self.volume = np.array([]) self.lookback = max(self.periods) self.SetWarmUp(self.lookback * 2) def OnData(self, data): '''OnData event is the primary entry point for your algorithm. Each new data point will be pumped in here. Arguments: data: Slice object keyed by symbol containing the stock data ''' if "SPY" not in data.Bars: return open_ = data["SPY"].Open high_ = data["SPY"].High low_ = data["SPY"].Low close_ = data["SPY"].Close volume_ = data["SPY"].Volume self.open = np.append(self.open, close_)[-self.lookback*2:] self.high = np.append(self.high, close_)[-self.lookback*2:] self.low = np.append(self.low, close_)[-self.lookback*2:] self.close = np.append(self.close, close_)[-self.lookback*2:] self.volume = np.append(self.volume, close_)[-self.lookback*2:] self.time = self.Time if self.IsWarmingUp: return df = pd.DataFrame({'open': self.open, 'high': self.high, 'low': self.low, 'close': self.close, 'volume': self.volume}) # HERE I SHOULD SOMEHOW CREATE INDEX VECTOR WITH FOR DF WITH ALL PASSED CLOSE PRICES # Compute volatility - THATS THE FUNCTION I NEED TO APPLY INE EVERY STEP daily_vol = ml.util.get_daily_vol(self.close, lookback=self.volatility_lookback)
import numpy as np import pandas as pd from sklearn.base import BaseEstimator, TransformerMixin from sklearn.pipeline import Pipeline import mlfinlab as ml class TripleBarierLabeling(BaseEstimator, TransformerMixin): def __init__(self, close_name='close', volatility_lookback=50, volatility_scaler=1, triplebar_num_days=3, triplebar_pt_sl=[1, 1], triplebar_min_ret=0.003, num_threads=1): # hyperparameters for all functions self.close_name = close_name self.volatility_lookback = volatility_lookback self.volatility_scaler = volatility_scaler self.triplebar_num_days = triplebar_num_days self.triplebar_pt_sl = triplebar_pt_sl self.triplebar_min_ret = triplebar_min_ret self.num_threads = num_threads def fit(self, X, y=None): # extract close series close = X.loc[:, self.close_name] # Compute volatility daily_vol = ml.util.get_daily_vol( close, lookback=self.volatility_lookback) # Apply Symmetric CUSUM Filter and get timestamps for events cusum_events = ml.filters.cusum_filter( close, threshold=daily_vol.mean()*self.volatility_scaler) # Compute vertical barrier vertical_barriers = ml.labeling.add_vertical_barrier( t_events=cusum_events, close=close, num_days=self.triplebar_num_days) # tripple barier events triple_barrier_events = ml.labeling.get_events( close=close, t_events=cusum_events, pt_sl=self.triplebar_pt_sl, target=daily_vol, min_ret=self.triplebar_min_ret, num_threads=self.num_threads, vertical_barrier_times=vertical_barriers) # labels labels = ml.labeling.get_bins(triple_barrier_events, close) labels = ml.labeling.drop_labels(labels) # merge labels and triple barrier events self.triple_barrier_info = pd.concat([triple_barrier_events.t1, labels], axis=1) self.triple_barrier_info.dropna(inplace=True) return self def transform(self, X, y=None): # subsample X = X.reindex(self.triple_barrier_info.index) return X class OutlierStdRemove(BaseEstimator, TransformerMixin): def __init__(self, std_threshold): self.std_threshold = std_threshold def fit(self, X, y=None): return self def transform(self, X, y=None): X = X[X.apply(lambda x: np.abs(x - x.mean()) / x.std() < self.std_threshold). all(axis=1)] return X ### TESTS # DATA_PATH = 'C:/Users/Mislav/algoAItrader/data/spy_with_vix.h5' # df = pd.read_hdf(DATA_PATH, start=0, stop=4000) # ### HYPER PARAMETERS # std_outlier = 10 # tb_volatility_lookback = 50 # tb_volatility_scaler = 1 # tb_triplebar_num_days = 3 # tb_triplebar_pt_sl = [1, 1] # tb_triplebar_min_ret = 0.003 # # triple barrier alone # triple_barrier_pipe= TripleBarierLabeling( # close_name='close_orig', # volatility_lookback=tb_volatility_lookback, # volatility_scaler=tb_volatility_scaler, # triplebar_num_days=tb_triplebar_num_days, # triplebar_pt_sl=tb_triplebar_pt_sl, # triplebar_min_ret=tb_triplebar_min_ret, # num_threads=1 # ) # tb_fit = triple_barrier_pipe.fit(df) # tb_fit.triple_barrier_info # X = triple_barrier_pipe.transform(df) # # # pipeline = Pipeline([ # ('remove_outlier', OutlierStdRemove(10)), # ('triple_barrier_labeling', TripleBarierLabeling(close_name='close_orig')), # ]) # pipe_out = pipeline.fit_transform(df)
import numpy as np from sklearn.tree import DecisionTreeClassifier from sklearn.tree._tree import Tree from sklearn.ensemble import RandomForestClassifier def serialize_tree(tree): serialized_tree = tree.__getstate__() dtypes = serialized_tree['nodes'].dtype serialized_tree['nodes'] = serialized_tree['nodes'].tolist() serialized_tree['values'] = serialized_tree['values'].tolist() return serialized_tree, dtypes def deserialize_tree(tree_dict, n_features, n_classes, n_outputs): tree_dict['nodes'] = [tuple(lst) for lst in tree_dict['nodes']] names = ['left_child', 'right_child', 'feature', 'threshold', 'impurity', 'n_node_samples', 'weighted_n_node_samples'] tree_dict['nodes'] = np.array(tree_dict['nodes'], dtype=np.dtype({'names': names, 'formats': tree_dict['nodes_dtype']})) tree_dict['values'] = np.array(tree_dict['values']) tree = Tree(n_features, np.array([n_classes], dtype=np.intp), n_outputs) tree.__setstate__(tree_dict) return tree def serialize_decision_tree(model): tree, dtypes = serialize_tree(model.tree_) serialized_model = { 'meta': 'decision-tree', 'feature_importances_': model.feature_importances_.tolist(), 'max_features_': model.max_features_, 'n_classes_': int(model.n_classes_), 'n_features_': model.n_features_, 'n_outputs_': model.n_outputs_, 'tree_': tree, 'classes_': model.classes_.tolist(), 'params': model.get_params() } tree_dtypes = [] for i in range(0, len(dtypes)): tree_dtypes.append(dtypes[i].str) serialized_model['tree_']['nodes_dtype'] = tree_dtypes return serialized_model def deserialize_decision_tree(model_dict): deserialized_model = DecisionTreeClassifier(**model_dict['params']) deserialized_model.classes_ = np.array(model_dict['classes_']) deserialized_model.max_features_ = model_dict['max_features_'] deserialized_model.n_classes_ = model_dict['n_classes_'] deserialized_model.n_features_ = model_dict['n_features_'] deserialized_model.n_outputs_ = model_dict['n_outputs_'] tree = deserialize_tree(model_dict['tree_'], model_dict['n_features_'], model_dict['n_classes_'], model_dict['n_outputs_']) deserialized_model.tree_ = tree return deserialized_model def serialize_random_forest(model): serialized_model = { 'meta': 'rf', 'max_depth': model.max_depth, 'min_samples_split': model.min_samples_split, 'min_samples_leaf': model.min_samples_leaf, 'min_weight_fraction_leaf': model.min_weight_fraction_leaf, 'max_features': model.max_features, 'max_leaf_nodes': model.max_leaf_nodes, 'min_impurity_decrease': model.min_impurity_decrease, 'min_impurity_split': model.min_impurity_split, 'n_features_': model.n_features_, 'n_outputs_': model.n_outputs_, 'classes_': model.classes_.tolist(), 'estimators_': [serialize_decision_tree(decision_tree) for decision_tree in model.estimators_], 'params': model.get_params() } if 'oob_score_' in model.__dict__: serialized_model['oob_score_'] = model.oob_score_ if 'oob_decision_function_' in model.__dict__: serialized_model['oob_decision_function_'] = model.oob_decision_function_.tolist() if isinstance(model.n_classes_, int): serialized_model['n_classes_'] = model.n_classes_ else: serialized_model['n_classes_'] = model.n_classes_.tolist() return serialized_model def deserialize_random_forest(model_dict): model = RandomForestClassifier(**model_dict['params']) estimators = [deserialize_decision_tree(decision_tree) for decision_tree in model_dict['estimators_']] model.estimators_ = np.array(estimators) model.classes_ = np.array(model_dict['classes_']) model.n_features_ = model_dict['n_features_'] model.n_outputs_ = model_dict['n_outputs_'] model.max_depth = model_dict['max_depth'] model.min_samples_split = model_dict['min_samples_split'] model.min_samples_leaf = model_dict['min_samples_leaf'] model.min_weight_fraction_leaf = model_dict['min_weight_fraction_leaf'] model.max_features = model_dict['max_features'] model.max_leaf_nodes = model_dict['max_leaf_nodes'] model.min_impurity_decrease = model_dict['min_impurity_decrease'] model.min_impurity_split = model_dict['min_impurity_split'] if 'oob_score_' in model_dict: model.oob_score_ = model_dict['oob_score_'] if 'oob_decision_function_' in model_dict: model.oob_decision_function_ = model_dict['oob_decision_function_'] if isinstance(model_dict['n_classes_'], list): model.n_classes_ = np.array(model_dict['n_classes_']) else: model.n_classes_ = model_dict['n_classes_'] return model