Overall Statistics |
Total Trades 113 Average Win 3.05% Average Loss -4.36% Compounding Annual Return 1.092% Drawdown 41.900% Expectancy 0.032 Net Profit 5.583% Sharpe Ratio 0.14 Probabilistic Sharpe Ratio 1.723% Loss Rate 39% Win Rate 61% Profit-Loss Ratio 0.70 Alpha 0.045 Beta -0.143 Annual Standard Deviation 0.169 Annual Variance 0.028 Information Ratio -0.502 Tracking Error 0.257 Treynor Ratio -0.164 Total Fees $2411.25 |
''' Ostirion Predictive Power Score (Factor) Selector version 1.0 Copyright (C) 2021 Ostirion This program is free software: you can redistribute it and/or modify it under the terms of the GNU General Public License as published by the Free Software Foundation, either version 3 of the License, or (at your option) any later version. This program is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for more details. You should have received a copy of the GNU General Public License along with this program. If not, see <https://www.gnu.org/licenses/>. Contact: www.ostirion.net/contact ''' import pandas as pd import numpy as np import timeseriespps as tspps from sklearn.model_selection import TimeSeriesSplit as tss from sklearn.preprocessing import StandardScaler, MinMaxScaler from sklearn.pipeline import Pipeline from scipy.stats import zscore from statistics import mode import warnings warnings.filterwarnings("ignore") import re from mlfinlab import fracdiff as fd import pipegridmodels as pgm from sklearn.metrics import accuracy_score from sklearn.metrics import roc_auc_score from sklearn.metrics import f1_score class PPSSelector: def __init__(self, algorithm, factors, targets, pasts=[3, 5, 10, 15, 22, 66], futures=[5,10, 15, 22], emas = [5, 15, 22, 56], std_cut_off = 3, training_period=1825): self.algorithm = algorithm self.training_period = training_period self.factors = factors self.targets = targets self.pasts = pasts self.futures = futures self.std_cut_off = std_cut_off self.emas = emas self.models = False self.is_trained = False self.threshold = False self.best_target = False self.best_predictors = False self.prediction_duration = False def get_pps_selection(self): factors = self.factors targets = self.targets algorithm = self.algorithm futures = self.futures pasts = self.pasts symbols = list(algorithm.ActiveSecurities.Keys) history = algorithm.History(symbols, self.training_period, Resolution.Daily) tickers = {str(symbol): str(symbol.ID) for symbol in symbols} inv_symbols = dict(map(reversed, tickers.items())) # Treatment of the price: price_history = history['close'].unstack(level=0) price_history.columns = [inv_symbols[col] for col in price_history.columns] price_history = pd.DataFrame(price_history) # Treatment of the volume: volume_history = history['volume'].unstack(level=0) volume_history.columns = ['VOL_'+inv_symbols[col] for col in volume_history.columns] volume_history = pd.DataFrame(volume_history) # Treatment of Fracdiffs: diff_price = fd.frac_diff(price_history, 0.3, ).dropna() diff_price.columns = ['FD_'+col for col in diff_price.columns] diff_price = pd.DataFrame(diff_price) # Join all price_history = price_history.join(volume_history).join(diff_price) # Valid factors: valid_prices = [value for value in price_history.columns if value in factors] valid_volumes = [value for value in volume_history.columns if value[4:] in factors] valid_difss = list(diff_price.columns) valid_factors = valid_prices + valid_volumes + valid_difss for column in price_history[valid_factors]: for frame in pasts: price_history['P_'+str(frame)+"_"+column+'_PAST'] = price_history[column].shift(frame) price_history['R_'+str(frame)+"_"+column+'_PAST'] = price_history[column].pct_change(frame) for ema in self.emas: price_history['EMA'+str(ema)+'_'+str(frame)+"_"+column+'_PAST'] = price_history[column].shift(frame).ewm(span=ema).mean() for column in price_history[targets]: for frame in futures: price_history['D_'+str(frame)+"_"+column+'_FUT'] = -price_history[column].pct_change(-frame).dropna() > 0 price_history.dropna(inplace=True) pipe = [('1',StandardScaler()), ('2', MinMaxScaler())] all_predictors = pd.DataFrame(columns=['x','y','ppscore']) future_cols = [col for col in price_history.columns if 'FUT' in col and 'D_' in col] past_cols = [col for col in price_history.columns if 'PAST' in col] for column in future_cols: predictors_df = tspps.predictors(price_history[past_cols + [column]].dropna(), y=column, pipeline=pipe, time_series=True, cross_validation=tss(4)) all_predictors = all_predictors.append(predictors_df[['x','y','ppscore']]) sorted_predictors = all_predictors.sort_values('ppscore', ascending=False).reset_index(drop=True) try: sorted_predictors['std_ppscore'] = sorted_predictors[['ppscore']].apply(zscore) except: algorithm.Debug("Zscore Error. Returning no Model. Retrying next slice.") return -1, -1 pred_type = 'D_' good_predictors = sorted_predictors[sorted_predictors['std_ppscore'] > self.std_cut_off] good_predictors = good_predictors[ good_predictors['y'].str.contains(pred_type) ].reset_index(drop=True) good_targets = good_predictors.groupby('y')[['std_ppscore']].mean() good_targets.sort_values('std_ppscore', ascending=False, inplace=True) INDEX = 0 self.best_target = good_targets.iloc[INDEX].name self.best_predictors = list(good_predictors[good_predictors['y'] == self.best_target]['x']) self.prediction_duration = int(re.findall(r'\d+', self.best_target)[0]) # Generate data: data = price_history[self.best_predictors + [self.best_target]] total_samples = len(data) X = data[self.best_predictors] y = data[self.best_target].astype(bool) self.models = [pgm.RfcPipeGrid().model, pgm.SVCPipeGrid().model , pgm.MlpPipeGrid().model ] #self.models = [pgm.SVCPipeGrid().model] # Fit the random search model for estimator in self.models: try: estimator.fit(X, y) except: algorithm.Debug('Estimator Fit Error') return -1, -1 probabilities = [] thresh_values = np.arange(0, 1, 0.01) thresholds = [] def to_labels(pos_probs, threshold): return (pos_probs >= threshold).astype('int') for model in self.models: pred = model.predict(X)[-1] proba = model.predict_proba(X) probabilities.append(proba[-1][1]) # Model threshold: probs = proba[:, 1] # define thresholds scores_f1 = [f1_score(y, to_labels(probs, t), average='weighted') for t in thresh_values] scores_roc_auc = [roc_auc_score(y, to_labels(probs, t), average='weighted') for t in thresh_values] scores_accuracy = [accuracy_score(y, to_labels(probs, t)) for t in thresh_values] ix_f1 = np.argmax(scores_f1) ix_roc_auc = np.argmax(scores_roc_auc) ix_acc = np.argmax(scores_accuracy) all_scores = [thresh_values[ix_f1], thresh_values[ix_roc_auc], thresh_values[ix_acc] ] mean_thresh = np.mean([all_scores[0]]) thresholds.append(mean_thresh) predictions = [] for i in range(len(self.models)): if probabilities[i] > thresholds[i]: predictions.append(1) else: predictions.append(0) prediction = mode(predictions) time = algorithm.Time.date() algorithm.Debug(str(time)+' Predicted: '+str(prediction)) algorithm.Debug(str( pred )) return prediction, 1
''' The MIT License (MIT) Copyright (c) 2021 Ostirion Permission is hereby granted, free of charge, to any person obtaining a copy of this software and associated documentation files (the "Software"), to deal in the Software without restriction, including without limitation the rights to use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of the Software, and to permit persons to whom the Software is furnished to do so, subject to the following conditions: The above copyright notice and this permission notice shall be included in all copies or substantial portions of the Software. THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. ''' from sklearn.pipeline import Pipeline from sklearn.model_selection import GridSearchCV from sklearn.ensemble import RandomForestClassifier as rfc from sklearn.neural_network import MLPClassifier as mlp from sklearn.svm import SVC from sklearn.model_selection import TimeSeriesSplit from sklearn.preprocessing import StandardScaler, MinMaxScaler class MlpPipeGrid: def __init__(self): model_type = mlp() scalers = [('scaler1', StandardScaler() ), ('scaler2', MinMaxScaler() ),] pipeline = Pipeline([*scalers, ('model', model_type )]) search_grid = {'model__hidden_layer_sizes': [(50,50,50), (50,100,50), (100,)], 'model__activation': ['tanh', 'relu'], 'model__solver': ['sgd', 'adam'], 'model__alpha': [0.0001, 0.05], 'model__learning_rate': ['constant','adaptive'], } scoring = 'f1_weighted' tscv = TimeSeriesSplit(n_splits=5) self.model = GridSearchCV(estimator=pipeline, param_grid=search_grid, cv=tscv, scoring=scoring) class SVCPipeGrid: def __init__(self): model_type = SVC() scalers = [('scaler1', StandardScaler() ), ('scaler2', MinMaxScaler() ),] pipeline = Pipeline([*scalers, ('model', model_type )]) search_grid = {'model__C':[1,10,100,1000], 'model__gamma':[1,0.1,0.001,0.0001], 'model__kernel':['linear','rbf'], 'model__probability': [True]} scoring = 'f1_weighted' tscv = TimeSeriesSplit(n_splits=5) self.model = GridSearchCV(estimator=pipeline, param_grid=search_grid, cv=tscv, scoring=scoring) class RfcPipeGrid: def __init__(self): model_type = rfc() scalers = [('scaler1', StandardScaler() ), ('scaler2', MinMaxScaler() ),] pipeline = Pipeline([*scalers, ('model', model_type )]) search_grid = { 'model__n_estimators': [50, 100, 200], 'model__max_features': ['auto'], 'model__max_depth': [5, 10, None], 'model__min_samples_split': [5, 10], 'model__min_samples_leaf': [1, 2, 4], 'model__bootstrap': [True]} scoring = 'f1_weighted' tscv = TimeSeriesSplit(n_splits=5) self.model = GridSearchCV(estimator=pipeline, param_grid=search_grid, cv=tscv, scoring=scoring)
''' Ostirion Predictive Power Score (Factor) Selector Demostration Algorithm version 1.0 Copyright (C) 2021 Ostirion This program is free software: you can redistribute it and/or modify it under the terms of the GNU General Public License as published by the Free Software Foundation, either version 3 of the License, or (at your option) any later version. This program is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for more details. You should have received a copy of the GNU General Public License along with this program. If not, see <https://www.gnu.org/licenses/>. Contact: www.ostirion.net/contact ''' from PPSSelector import PPSSelector class PPSPredictor(QCAlgorithm): def Initialize(self): N_YEARS = 5 test_length = timedelta(days= int(365 * N_YEARS)+1) self.SetStartDate(datetime.today() - test_length) self.SetEndDate(datetime.today()) self.SetCash(1000000) # Hourly and Daily resolutions are posible, minute data minimizes # 'stale data' warning messages and increases model run time. resolution = Resolution.Daily # Add Required Symbols: # Run in Quantconnect. # Using index of the factor lists if parameter testing is required. # Enter candidate factors in factors_0 otherwise. factors_0 = ['XLK', 'XLY', 'XLB', 'XLV', 'XLP', 'XLI', 'XLU', 'XLF', 'XLE'] factors_1 = ['TLT', 'QQQ', 'SPY'] factors_2 = ['QQQ'] factor_list = [factors_0, factors_1, factors_2] try: index = int(self.GetParameter("index")) except: self.Debug('No parameters found, using default.') index = int(0) factors = factor_list[index] targets = ['SPY'] hedges = [] symbols = [self.AddEquity(ticker, resolution).Symbol for ticker in factors + targets + hedges] self.SetBenchmark("SPY") self.UniverseSettings.Resolution = resolution self.SetBrokerageModel(AlphaStreamsBrokerageModel()) self.AddAlpha(PPSAlphaModel(factors, targets, hedges, resolution)) self.SetExecution(ImmediateExecutionModel()) self.SetPortfolioConstruction(InsightWeightingPortfolioConstructionModel(lambda time: None)) class PPSAlphaModel(AlphaModel): """ Predictive Power Score demostrator alpha model. """ def __init__(self, factors, targets, hedges, resolution): self.Name = "PPSAlphaModel" # Factor, prediction target and instruments to exploit the predictions. self.factors = factors self.targets = targets self.hedges = hedges # Dynamically allow for hour/day/minute resolutions: self.resolution = resolution # Parameters for machine learning model: # ML model intial training and retraining: self.selector = False self.training_period = 252*5 # Now passed into history as data points self.std_cut_off = 3 # Other operational parameters: self.time_slack = 10 # Minutes after market open. self.expected_gain = 0.05 # Fixed value, the model is unable predict this, yet. # Directions dictionary: self.directions = [InsightDirection.Down, InsightDirection.Up, InsightDirection.Flat] def Update(self, algorithm, data): # Initialize every slice of data: insights = [] if not data.HasData: return [] # Obtain the market operating time: hours = algorithm.ActiveSecurities[self.targets[0]].Exchange.Hours # Positions are entered as market opens: previous_day = algorithm.Time - timedelta(days=int(True)) slack = timedelta(minutes=self.time_slack) operating_time = hours.GetNextMarketOpen(previous_day, False) + slack if (algorithm.Time.hour == 10 and algorithm.Time.minute == 0) or self.resolution == Resolution.Daily: for target in self.targets: # If invested, wait for the insight to decay: if algorithm.Securities[target].Invested: return[] self.selector = PPSSelector(algorithm, self.factors, self.targets, std_cut_off = self.std_cut_off, training_period=self.training_period) prediction, probability = self.selector.get_pps_selection() # Handle possible error models: if prediction == -1: return [] tdelta = timedelta(days=self.selector.prediction_duration) insight_duration = hours.GetNextMarketClose(algorithm.Time + tdelta, False) - algorithm.Time direction_instrument = self.directions[int(prediction)] # TODO: Alternative bet_size to be implemented in the future: # bet_size = (probability-0.5)*2 # TODO: remove codetags. bet_size = 1 for target in self.targets: if not data.ContainsKey(target): continue insights.append(Insight(target, insight_duration, InsightType.Price, direction_instrument, self.expected_gain, probability, self.Name, bet_size)) for hedge in self.hedges: if not data.ContainsKey(hedge): continue insights.append(Insight(hedge, insight_duration, InsightType.Price, direction_instrument, self.expected_gain, probability, self.Name, 1-bet_size)) return insights return []
import matplotlib.dates as mdates import matplotlib.pyplot as plt import pandas as pd import numpy as np import seaborn as sns def plot_df(df, color='blue', size=(16, 7), legend='Close Price', y_label='Price in USD'): plt.style.use('dark_background') plt.rcParams["figure.figsize"] = size ax = df.plot() plt.ylabel(y_label) x = 0.01 y = 0.01 plt.text(x, y, 'www.ostirion.net', fontsize=15, transform=ax.transAxes) plt.legend(ncol=int(len(df.columns) / 2)) date_form = mdates.DateFormatter("%m-%Y") plt.xticks(rotation=45); plt.show() def plot_corr_hm(df, title='Title', size=(16, 7), annot = True): corr = df.corr() plt.style.use('dark_background') plt.rcParams["figure.figsize"] = size mask = np.triu(np.ones_like(corr, dtype=bool)) cmap = sns.color_palette("RdBu") ax = sns.heatmap(corr, mask=mask, vmax=.3, center=0, cmap=cmap, annot=annot, square=True, linewidths=0, cbar_kws={"shrink": .5}, fmt='g') ax.set_title(title) plt.setp(ax.get_yticklabels(), rotation=0); plt.setp(ax.get_xticklabels(), rotation=90); plt.show() def plot_cm(df, title='Title', size=(16,7)): plt.style.use('dark_background') plt.rcParams["figure.figsize"] = size cmap = sns.color_palette("Blues") ax = sns.heatmap(df, cmap=cmap, annot=True, linewidths=0, cbar_kws={"shrink": .5}, fmt='g') ax.set_title(title) plt.xlabel('Predicted') plt.ylabel('True') plt.setp(ax.get_xticklabels(), rotation=0);
''' The MIT License (MIT) Copyright (c) 2020 8080 Labs Copyright (c) 2021 Ostirion Permission is hereby granted, free of charge, to any person obtaining a copy of this software and associated documentation files (the "Software"), to deal in the Software without restriction, including without limitation the rights to use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of the Software, and to permit persons to whom the Software is furnished to do so, subject to the following conditions: The above copyright notice and this permission notice shall be included in all copies or substantial portions of the Software. THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. ''' from sklearn import tree from sklearn import preprocessing from sklearn.model_selection import cross_val_score from sklearn.metrics import mean_absolute_error, f1_score from sklearn.preprocessing import StandardScaler, MinMaxScaler from sklearn.pipeline import Pipeline import pandas as pd from pandas.api.types import ( is_numeric_dtype, is_bool_dtype, is_object_dtype, is_categorical_dtype, is_string_dtype, is_datetime64_any_dtype, is_timedelta64_dtype, ) NOT_SUPPORTED_ANYMORE = "NOT_SUPPORTED_ANYMORE" TO_BE_CALCULATED = -1 def _calculate_model_cv_score_( df, target, feature, task, cross_validation, random_seed, **kwargs ): "Calculates the mean model score based on cross-validation" # Sources about the used methods: # https://scikit-learn.org/stable/modules/tree.html # https://scikit-learn.org/stable/modules/cross_validation.html # https://scikit-learn.org/stable/modules/generated/sklearn.model_selection.cross_val_score.html metric = task["metric_key"] model = task["model"] # shuffle the rows - this is important for cross-validation # because the cross-validation just takes the first n lines # if there is a strong pattern in the rows eg 0,0,0,0,1,1,1,1 # then this will lead to problems because the first cv sees mostly 0 and the later 1 # this approach might be wrong for timeseries because it might leak information # Fixing the time_series: if not TIME_SERIES: df = df.sample(frac=1, random_state=random_seed, replace=False) # preprocess target if task["type"] == "classification": label_encoder = preprocessing.LabelEncoder() df[target] = label_encoder.fit_transform(df[target]) target_series = df[target] else: target_series = df[target] # preprocess feature if _dtype_represents_categories(df[feature]): one_hot_encoder = preprocessing.OneHotEncoder() array = df[feature].__array__() sparse_matrix = one_hot_encoder.fit_transform(array.reshape(-1, 1)) feature_input = sparse_matrix else: # reshaping needed because there is only 1 feature feature_input = df[feature].values.reshape(-1, 1) # Cross-validation is stratifiedKFold for classification, KFold for regression # CV on one core (n_job=1; default) has shown to be fastest try: scores = cross_val_score( model, feature_input, target_series, cv=cross_validation, scoring=metric ) except: return 0 return scores.mean() def _normalized_mae_score(model_mae, naive_mae): "Normalizes the model MAE score, given the baseline score" # # Value range of MAE is [0, infinity), 0 is best # 10, 5 ==> 0 because worse than naive # 10, 20 ==> 0.5 # 5, 20 ==> 0.75 = 1 - (mae/base_mae) if model_mae > naive_mae: return 0 else: return 1 - (model_mae / naive_mae) def _mae_normalizer(df, y, model_score, **kwargs): "In case of MAE, calculates the baseline score for y and derives the PPS." df["naive"] = df[y].median() baseline_score = mean_absolute_error(df[y], df["naive"]) # true, pred ppscore = _normalized_mae_score(abs(model_score), baseline_score) return ppscore, baseline_score def _normalized_f1_score(model_f1, baseline_f1): "Normalizes the model F1 score, given the baseline score" # # F1 ranges from 0 to 1 # # 1 is best # 0.5, 0.7 ==> 0 because model is worse than naive baseline # 0.75, 0.5 ==> 0.5 if model_f1 < baseline_f1: return 0 else: scale_range = 1.0 - baseline_f1 # eg 0.3 f1_diff = model_f1 - baseline_f1 # eg 0.1 return f1_diff / scale_range # 0.1/0.3 = 0.33 def _f1_normalizer(df, y, model_score, random_seed): "In case of F1, calculates the baseline score for y and derives the PPS." label_encoder = preprocessing.LabelEncoder() df["truth"] = label_encoder.fit_transform(df[y]) df["most_common_value"] = df["truth"].value_counts().index[0] random = df["truth"].sample(frac=1, random_state=random_seed) baseline_score = max( f1_score(df["truth"], df["most_common_value"], average="weighted"), f1_score(df["truth"], random, average="weighted"), ) ppscore = _normalized_f1_score(model_score, baseline_score) return ppscore, baseline_score TIME_SERIES = False VALID_CALCULATIONS = { "regression": { "type": "regression", "is_valid_score": True, "model_score": TO_BE_CALCULATED, "baseline_score": TO_BE_CALCULATED, "ppscore": TO_BE_CALCULATED, "metric_name": "mean absolute error", "metric_key": "neg_mean_absolute_error", "model": tree.DecisionTreeRegressor(), "score_normalizer": _mae_normalizer, }, "classification": { "type": "classification", "is_valid_score": True, "model_score": TO_BE_CALCULATED, "baseline_score": TO_BE_CALCULATED, "ppscore": TO_BE_CALCULATED, "metric_name": "weighted F1", "metric_key": "f1_weighted", "model": tree.DecisionTreeClassifier(), "score_normalizer": _f1_normalizer, }, "predict_itself": { "type": "predict_itself", "is_valid_score": True, "model_score": 1, "baseline_score": 0, "ppscore": 1, "metric_name": None, "metric_key": None, "model": None, "score_normalizer": None, }, "target_is_constant": { "type": "target_is_constant", "is_valid_score": True, "model_score": 1, "baseline_score": 1, "ppscore": 0, "metric_name": None, "metric_key": None, "model": None, "score_normalizer": None, }, "target_is_id": { "type": "target_is_id", "is_valid_score": True, "model_score": 0, "baseline_score": 0, "ppscore": 0, "metric_name": None, "metric_key": None, "model": None, "score_normalizer": None, }, "feature_is_id": { "type": "feature_is_id", "is_valid_score": True, "model_score": 0, "baseline_score": 0, "ppscore": 0, "metric_name": None, "metric_key": None, "model": None, "score_normalizer": None, }, } INVALID_CALCULATIONS = [ "target_is_datetime", "target_data_type_not_supported", "empty_dataframe_after_dropping_na", "unknown_error", ] def _dtype_represents_categories(series) -> bool: "Determines if the dtype of the series represents categorical values" return ( is_bool_dtype(series) or is_object_dtype(series) or is_string_dtype(series) or is_categorical_dtype(series) ) def _determine_case_and_prepare_df(df, x, y, sample=5_000, random_seed=123): "Returns str with the name of the determined case based on the columns x and y" if x == y: return df, "predict_itself" df = df[[x, y]] # IDEA: log.warning when values have been dropped df = df.dropna() if len(df) == 0: return df, "empty_dataframe_after_dropping_na" # IDEA: show warning # raise Exception( # "After dropping missing values, there are no valid rows left" # ) df = _maybe_sample(df, sample, random_seed=random_seed) if _feature_is_id(df, x): return df, "feature_is_id" category_count = df[y].value_counts().count() if category_count == 1: # it is helpful to separate this case in order to save unnecessary calculation time return df, "target_is_constant" if _dtype_represents_categories(df[y]) and (category_count == len(df[y])): # it is important to separate this case in order to save unnecessary calculation time return df, "target_is_id" if _dtype_represents_categories(df[y]): return df, "classification" if is_numeric_dtype(df[y]): # this check needs to be after is_bool_dtype (which is part of _dtype_represents_categories) because bool is considered numeric by pandas return df, "regression" if is_datetime64_any_dtype(df[y]) or is_timedelta64_dtype(df[y]): # IDEA: show warning # raise TypeError( # f"The target column {y} has the dtype {df[y].dtype} which is not supported. A possible solution might be to convert {y} to a string column" # ) return df, "target_is_datetime" # IDEA: show warning # raise Exception( # f"Could not infer a valid task based on the target {y}. The dtype {df[y].dtype} is not yet supported" # ) # pragma: no cover return df, "target_data_type_not_supported" def _feature_is_id(df, x): "Returns Boolean if the feature column x is an ID" if not _dtype_represents_categories(df[x]): return False category_count = df[x].value_counts().count() return category_count == len(df[x]) def _maybe_sample(df, sample, random_seed=None): """ Maybe samples the rows of the given df to have at most `sample` rows If sample is `None` or falsy, there will be no sampling. If the df has fewer rows than the sample, there will be no sampling. Parameters ---------- df : pandas.DataFrame Dataframe that might be sampled sample : int or `None` Number of rows to be sampled random_seed : int or `None` Random seed that is forwarded to pandas.DataFrame.sample as `random_state` Returns ------- pandas.DataFrame DataFrame after potential sampling """ if sample and len(df) > sample: # this is a problem if x or y have more than sample=5000 categories # TODO: dont sample when the problem occurs and show warning df = df.sample(sample, random_state=random_seed, replace=False) return df def _is_column_in_df(column, df): try: return column in df.columns except: return False def _score( df, x, y, task, sample, cross_validation, random_seed, invalid_score, catch_errors ): df, case_type = _determine_case_and_prepare_df( df, x, y, sample=sample, random_seed=random_seed ) task = _get_task(case_type, invalid_score) if case_type in ["classification", "regression"]: model_score = _calculate_model_cv_score_( df, target=y, feature=x, task=task, cross_validation=cross_validation, random_seed=random_seed, ) # IDEA: the baseline_scores do sometimes change significantly, e.g. for F1 and thus change the PPS # we might want to calculate the baseline_score 10 times and use the mean in order to have less variance ppscore, baseline_score = task["score_normalizer"]( df, y, model_score, random_seed=random_seed ) else: model_score = task["model_score"] baseline_score = task["baseline_score"] ppscore = task["ppscore"] return { "x": x, "y": y, "ppscore": ppscore, "case": case_type, "is_valid_score": task["is_valid_score"], "metric": task["metric_name"], "baseline_score": baseline_score, "model_score": abs(model_score), # sklearn returns negative mae "model": task["model"], } def score( df, x, y, pipeline = [], time_series = False, task=NOT_SUPPORTED_ANYMORE, sample=5_000, cross_validation=4, random_seed=123, invalid_score=0, catch_errors=True, ): """ Calculate the Predictive Power Score (PPS) for "x predicts y" The score always ranges from 0 to 1 and is data-type agnostic. A score of 0 means that the column x cannot predict the column y better than a naive baseline model. A score of 1 means that the column x can perfectly predict the column y given the model. A score between 0 and 1 states the ratio of how much potential predictive power the model achieved compared to the baseline model. Parameters ---------- df : pandas.DataFrame Dataframe that contains the columns x and y x : str Name of the column x which acts as the feature y : str Name of the column y which acts as the target sample : int or `None` Number of rows for sampling. The sampling decreases the calculation time of the PPS. If `None` there will be no sampling. cross_validation : int Number of iterations during cross-validation. This has the following implications: For example, if the number is 4, then it is possible to detect patterns when there are at least 4 times the same observation. If the limit is increased, the required minimum observations also increase. This is important, because this is the limit when sklearn will throw an error and the PPS cannot be calculated random_seed : int or `None` Random seed for the parts of the calculation that require random numbers, e.g. shuffling or sampling. If the value is set, the results will be reproducible. If the value is `None` a new random number is drawn at the start of each calculation. invalid_score : any The score that is returned when a calculation is invalid, e.g. because the data type was not supported. catch_errors : bool If `True` all errors will be catched and reported as `unknown_error` which ensures convenience. If `False` errors will be raised. This is helpful for inspecting and debugging errors. Returns ------- Dict A dict that contains multiple fields about the resulting PPS. The dict enables introspection into the calculations that have been performed under the hood """ global TIME_SERIES if time_series: TIME_SERIES = True else: TIME_SERIES = False global VALID_CALCULATIONS if pipeline: VALID_CALCULATIONS['regression']['model'] = Pipeline(pipeline + [('tree', tree.DecisionTreeRegressor())] ) VALID_CALCULATIONS['classification']['model'] = Pipeline( pipeline + [('tree', tree.DecisionTreeClassifier())] ) else: VALID_CALCULATIONS['regression']['model'] = tree.DecisionTreeRegressor() VALID_CALCULATIONS['classification']['model'] = tree.DecisionTreeClassifier() if not isinstance(df, pd.DataFrame): raise TypeError( f"The 'df' argument should be a pandas.DataFrame but you passed a {type(df)}\nPlease convert your input to a pandas.DataFrame" ) if not _is_column_in_df(x, df): raise ValueError( f"The 'x' argument should be the name of a dataframe column but the variable that you passed is not a column in the given dataframe.\nPlease review the column name or your dataframe" ) if len(df[[x]].columns) >= 2: raise AssertionError( f"The dataframe has {len(df[[x]].columns)} columns with the same column name {x}\nPlease adjust the dataframe and make sure that only 1 column has the name {x}" ) if not _is_column_in_df(y, df): raise ValueError( f"The 'y' argument should be the name of a dataframe column but the variable that you passed is not a column in the given dataframe.\nPlease review the column name or your dataframe" ) if len(df[[y]].columns) >= 2: raise AssertionError( f"The dataframe has {len(df[[y]].columns)} columns with the same column name {y}\nPlease adjust the dataframe and make sure that only 1 column has the name {y}" ) if task is not NOT_SUPPORTED_ANYMORE: raise AttributeError( "The attribute 'task' is no longer supported because it led to confusion and inconsistencies.\nThe task of the model is now determined based on the data types of the columns. If you want to change the task please adjust the data type of the column.\nFor more details, please refer to the README" ) if random_seed is None: from random import random random_seed = int(random() * 1000) try: return _score( df, x, y, task, sample, cross_validation, random_seed, invalid_score, catch_errors, ) except Exception as exception: if catch_errors: case_type = "unknown_error" task = _get_task(case_type, invalid_score) return { "x": x, "y": y, "ppscore": task["ppscore"], "case": case_type, "is_valid_score": task["is_valid_score"], "metric": task["metric_name"], "baseline_score": task["baseline_score"], "model_score": task["model_score"], # sklearn returns negative mae "model": task["model"], } else: raise exception def _get_task(case_type, invalid_score): if case_type in VALID_CALCULATIONS.keys(): return VALID_CALCULATIONS[case_type] elif case_type in INVALID_CALCULATIONS: return { "type": case_type, "is_valid_score": False, "model_score": invalid_score, "baseline_score": invalid_score, "ppscore": invalid_score, "metric_name": None, "metric_key": None, "model": None, "score_normalizer": None, } raise Exception(f"case_type {case_type} is not supported") def _format_list_of_dicts(scores, output, sorted): """ Format list of score dicts `scores` - maybe sort by ppscore - maybe return pandas.Dataframe - output can be one of ["df", "list"] """ if sorted: scores.sort(key=lambda item: item["ppscore"], reverse=True) if output == "df": df_columns = [ "x", "y", "ppscore", "case", "is_valid_score", "metric", "baseline_score", "model_score", "model", ] data = {column: [score[column] for score in scores] for column in df_columns} scores = pd.DataFrame.from_dict(data) return scores def predictors(df, y, output="df", pipeline = [], time_series=False, sorted=True, **kwargs): """ Calculate the Predictive Power Score (PPS) of all the features in the dataframe against a target column Parameters ---------- df : pandas.DataFrame The dataframe that contains the data y : str Name of the column y which acts as the target output: str - potential values: "df", "list" Control the type of the output. Either return a pandas.DataFrame (df) or a list with the score dicts sorted: bool Whether or not to sort the output dataframe/list by the ppscore pipeline: list list of transformstions to be included in the processing of the data kwargs: Other key-word arguments that shall be forwarded to the pps.score method, e.g. `sample, `cross_validation, `random_seed, `invalid_score`, `catch_errors` Returns ------- pandas.DataFrame or list of Dict Either returns a tidy dataframe or a list of all the PPS dicts. This can be influenced by the output argument """ if not isinstance(df, pd.DataFrame): raise TypeError( f"The 'df' argument should be a pandas.DataFrame but you passed a {type(df)}\nPlease convert your input to a pandas.DataFrame" ) if not _is_column_in_df(y, df): raise ValueError( f"The 'y' argument should be the name of a dataframe column but the variable that you passed is not a column in the given dataframe.\nPlease review the column name or your dataframe" ) if len(df[[y]].columns) >= 2: raise AssertionError( f"The dataframe has {len(df[[y]].columns)} columns with the same column name {y}\nPlease adjust the dataframe and make sure that only 1 column has the name {y}" ) if not output in ["df", "list"]: raise ValueError( f"""The 'output' argument should be one of ["df", "list"] but you passed: {output}\nPlease adjust your input to one of the valid values""" ) if not sorted in [True, False]: raise ValueError( f"""The 'sorted' argument should be one of [True, False] but you passed: {sorted}\nPlease adjust your input to one of the valid values""" ) scores = [score(df, column, y, pipeline, time_series, **kwargs) for column in df if column != y] return _format_list_of_dicts(scores=scores, output=output, sorted=sorted) def matrix(df, output="df", sorted=False, **kwargs): """ Calculate the Predictive Power Score (PPS) matrix for all columns in the dataframe Parameters ---------- df : pandas.DataFrame The dataframe that contains the data output: str - potential values: "df", "list" Control the type of the output. Either return a pandas.DataFrame (df) or a list with the score dicts sorted: bool Whether or not to sort the output dataframe/list by the ppscore kwargs: Other key-word arguments that shall be forwarded to the pps.score method, e.g. `sample, `cross_validation, `random_seed, `invalid_score`, `catch_errors` Returns ------- pandas.DataFrame or list of Dict Either returns a tidy dataframe or a list of all the PPS dicts. This can be influenced by the output argument """ if not isinstance(df, pd.DataFrame): raise TypeError( f"The 'df' argument should be a pandas.DataFrame but you passed a {type(df)}\nPlease convert your input to a pandas.DataFrame" ) if not output in ["df", "list"]: raise ValueError( f"""The 'output' argument should be one of ["df", "list"] but you passed: {output}\nPlease adjust your input to one of the valid values""" ) if not sorted in [True, False]: raise ValueError( f"""The 'sorted' argument should be one of [True, False] but you passed: {sorted}\nPlease adjust your input to one of the valid values""" ) scores = [score(df, x, y, **kwargs) for x in df for y in df] return _format_list_of_dicts(scores=scores, output=output, sorted=sorted)