Overall Statistics
Total Trades
113
Average Win
3.05%
Average Loss
-4.36%
Compounding Annual Return
1.092%
Drawdown
41.900%
Expectancy
0.032
Net Profit
5.583%
Sharpe Ratio
0.14
Probabilistic Sharpe Ratio
1.723%
Loss Rate
39%
Win Rate
61%
Profit-Loss Ratio
0.70
Alpha
0.045
Beta
-0.143
Annual Standard Deviation
0.169
Annual Variance
0.028
Information Ratio
-0.502
Tracking Error
0.257
Treynor Ratio
-0.164
Total Fees
$2411.25
'''
    Ostirion Predictive Power Score (Factor) Selector
    version 1.0
    Copyright (C) 2021  Ostirion

    This program is free software: you can redistribute it and/or modify
    it under the terms of the GNU General Public License as published by
    the Free Software Foundation, either version 3 of the License, or
    (at your option) any later version.

    This program is distributed in the hope that it will be useful,
    but WITHOUT ANY WARRANTY; without even the implied warranty of
    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
    GNU General Public License for more details.

    You should have received a copy of the GNU General Public License
    along with this program.  If not, see <https://www.gnu.org/licenses/>.
    
    Contact: www.ostirion.net/contact
'''

import pandas as pd
import numpy as np
import timeseriespps as tspps

from sklearn.model_selection import TimeSeriesSplit as tss
from sklearn.preprocessing import StandardScaler, MinMaxScaler
from sklearn.pipeline import Pipeline

from scipy.stats import zscore
from statistics import mode

import warnings
warnings.filterwarnings("ignore")

import re
from mlfinlab import fracdiff as fd
import pipegridmodels as pgm

from sklearn.metrics import accuracy_score
from sklearn.metrics import roc_auc_score
from sklearn.metrics import f1_score


class PPSSelector:
    
    def __init__(self, 
                 algorithm,
                 factors,
                 targets,
                 pasts=[3, 5, 10, 15, 22, 66],
                 futures=[5,10, 15, 22],
                 emas = [5, 15, 22, 56],
                 std_cut_off = 3,
                 training_period=1825):

        self.algorithm = algorithm
        self.training_period = training_period
        self.factors = factors
        self.targets = targets
        self.pasts = pasts
        self.futures = futures
        self.std_cut_off = std_cut_off
        self.emas = emas

        self.models = False
        self.is_trained = False
        self.threshold = False

        self.best_target = False
        self.best_predictors = False
        self.prediction_duration = False

    def get_pps_selection(self):

        factors = self.factors
        targets = self.targets
        algorithm = self.algorithm
        futures = self.futures
        pasts = self.pasts

        symbols = list(algorithm.ActiveSecurities.Keys)
        history = algorithm.History(symbols, self.training_period, Resolution.Daily)
        tickers = {str(symbol): str(symbol.ID) for symbol in symbols}
        inv_symbols = dict(map(reversed, tickers.items()))
    
        # Treatment of the price:
        price_history = history['close'].unstack(level=0)
        price_history.columns = [inv_symbols[col] for col in price_history.columns]
        price_history = pd.DataFrame(price_history)

        # Treatment of the volume:
        volume_history = history['volume'].unstack(level=0)
        volume_history.columns = ['VOL_'+inv_symbols[col] for col in volume_history.columns]
        volume_history = pd.DataFrame(volume_history)

        # Treatment of Fracdiffs:
        diff_price = fd.frac_diff(price_history, 0.3, ).dropna()
        diff_price.columns = ['FD_'+col for col in diff_price.columns]
        diff_price = pd.DataFrame(diff_price)

        # Join all
        price_history = price_history.join(volume_history).join(diff_price)

        # Valid factors:
        valid_prices = [value for value in price_history.columns if value in factors]
        valid_volumes = [value for value in volume_history.columns if value[4:] in factors]
        valid_difss = list(diff_price.columns)
        valid_factors = valid_prices + valid_volumes + valid_difss 

        for column in price_history[valid_factors]:
            for frame in pasts:
                price_history['P_'+str(frame)+"_"+column+'_PAST'] = price_history[column].shift(frame)
                price_history['R_'+str(frame)+"_"+column+'_PAST'] = price_history[column].pct_change(frame)
                for ema in self.emas:
                    price_history['EMA'+str(ema)+'_'+str(frame)+"_"+column+'_PAST'] = price_history[column].shift(frame).ewm(span=ema).mean()

        for column in price_history[targets]:
            for frame in futures:
                price_history['D_'+str(frame)+"_"+column+'_FUT'] = -price_history[column].pct_change(-frame).dropna() > 0
        
        price_history.dropna(inplace=True)

        pipe = [('1',StandardScaler()), ('2', MinMaxScaler())]
        all_predictors = pd.DataFrame(columns=['x','y','ppscore'])

        future_cols = [col for col in price_history.columns if 'FUT' in col and 'D_' in col]
        past_cols = [col for col in price_history.columns if 'PAST' in col]

        for column in future_cols:
                predictors_df = tspps.predictors(price_history[past_cols + [column]].dropna(),
                                           y=column,
                                           pipeline=pipe,
                                           time_series=True,
                                           cross_validation=tss(4))

                all_predictors = all_predictors.append(predictors_df[['x','y','ppscore']])

        sorted_predictors = all_predictors.sort_values('ppscore', ascending=False).reset_index(drop=True)
        
        try:
            sorted_predictors['std_ppscore'] = sorted_predictors[['ppscore']].apply(zscore)
        except:
            algorithm.Debug("Zscore Error. Returning no Model. Retrying next slice.")
            return -1, -1

        pred_type = 'D_'
        good_predictors = sorted_predictors[sorted_predictors['std_ppscore'] > self.std_cut_off]
        good_predictors = good_predictors[ good_predictors['y'].str.contains(pred_type) ].reset_index(drop=True)
        good_targets = good_predictors.groupby('y')[['std_ppscore']].mean()
        good_targets.sort_values('std_ppscore', ascending=False, inplace=True)

        INDEX = 0
        self.best_target = good_targets.iloc[INDEX].name
        self.best_predictors = list(good_predictors[good_predictors['y'] == self.best_target]['x'])
        self.prediction_duration = int(re.findall(r'\d+', self.best_target)[0])

        # Generate data:
        data = price_history[self.best_predictors + [self.best_target]]

        total_samples = len(data)
        X = data[self.best_predictors]
        y = data[self.best_target].astype(bool)

        self.models = [pgm.RfcPipeGrid().model, pgm.SVCPipeGrid().model , pgm.MlpPipeGrid().model ]
        #self.models = [pgm.SVCPipeGrid().model]

        # Fit the random search model
        for estimator in self.models:
            try: estimator.fit(X, y)
            except:
                algorithm.Debug('Estimator Fit Error')
                return -1, -1

        probabilities = []
        thresh_values = np.arange(0, 1, 0.01)
        thresholds = []

        def to_labels(pos_probs, threshold):
                return (pos_probs >= threshold).astype('int')

        for model in self.models:
            pred = model.predict(X)[-1]
            proba = model.predict_proba(X)
            probabilities.append(proba[-1][1])

            # Model threshold:
            probs = proba[:, 1]
            # define thresholds

            scores_f1 = [f1_score(y, to_labels(probs, t), average='weighted') for t in thresh_values]
            scores_roc_auc = [roc_auc_score(y, to_labels(probs, t), average='weighted') for t in thresh_values]
            scores_accuracy = [accuracy_score(y, to_labels(probs, t)) for t in thresh_values]
            ix_f1 = np.argmax(scores_f1)
            ix_roc_auc = np.argmax(scores_roc_auc)
            ix_acc = np.argmax(scores_accuracy)
            all_scores = [thresh_values[ix_f1], thresh_values[ix_roc_auc], thresh_values[ix_acc] ]
            mean_thresh = np.mean([all_scores[0]])
            thresholds.append(mean_thresh)

        predictions = []
        for i in range(len(self.models)):
            if probabilities[i] > thresholds[i]: predictions.append(1)
            else: predictions.append(0)

        prediction = mode(predictions)
        time = algorithm.Time.date()
        algorithm.Debug(str(time)+' Predicted: '+str(prediction))
        algorithm.Debug(str( pred ))

        return prediction, 1
'''
    The MIT License (MIT)
    
    Copyright (c) 2021 Ostirion
    
    Permission is hereby granted, free of charge, to any person obtaining a copy
    of this software and associated documentation files (the "Software"), to deal
    in the Software without restriction, including without limitation the rights
    to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
    copies of the Software, and to permit persons to whom the Software is
    furnished to do so, subject to the following conditions:
    
    The above copyright notice and this permission notice shall be included in all
    copies or substantial portions of the Software.
    
    THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
    IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
    FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
    AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
    LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
    OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
    SOFTWARE.
'''

from sklearn.pipeline import Pipeline
from sklearn.model_selection import GridSearchCV
from sklearn.ensemble import RandomForestClassifier as rfc
from sklearn.neural_network import MLPClassifier as mlp
from sklearn.svm import SVC
from sklearn.model_selection import TimeSeriesSplit
from sklearn.preprocessing import StandardScaler, MinMaxScaler

class MlpPipeGrid:
    def __init__(self):
        model_type = mlp()
        scalers = [('scaler1', StandardScaler() ), 
                   ('scaler2', MinMaxScaler() ),]
        pipeline = Pipeline([*scalers, ('model', model_type )])

        search_grid = {'model__hidden_layer_sizes': [(50,50,50), (50,100,50), (100,)],
                    'model__activation': ['tanh', 'relu'],
                    'model__solver': ['sgd', 'adam'],
                    'model__alpha': [0.0001, 0.05],
                    'model__learning_rate': ['constant','adaptive'],
                    }
        
        scoring = 'f1_weighted'
        tscv = TimeSeriesSplit(n_splits=5)

        self.model =  GridSearchCV(estimator=pipeline,
                                   param_grid=search_grid,
                                   cv=tscv,
                                   scoring=scoring)

class SVCPipeGrid:
    def __init__(self):
        model_type = SVC()
        scalers = [('scaler1', StandardScaler() ), 
                   ('scaler2', MinMaxScaler() ),]
        pipeline = Pipeline([*scalers, ('model', model_type )])

        search_grid = {'model__C':[1,10,100,1000],
                       'model__gamma':[1,0.1,0.001,0.0001],
                       'model__kernel':['linear','rbf'],
                       'model__probability': [True]}
        
        scoring = 'f1_weighted'
        tscv = TimeSeriesSplit(n_splits=5)

        self.model = GridSearchCV(estimator=pipeline,
                                   param_grid=search_grid,
                                   cv=tscv,
                                   scoring=scoring)

class RfcPipeGrid:
    def __init__(self):
        model_type = rfc()
        scalers = [('scaler1', StandardScaler() ), 
                   ('scaler2', MinMaxScaler() ),]
        pipeline = Pipeline([*scalers, ('model', model_type )])

        search_grid = { 'model__n_estimators': [50, 100, 200],
                        'model__max_features': ['auto'],
                        'model__max_depth': [5, 10, None],
                        'model__min_samples_split': [5, 10],
                        'model__min_samples_leaf': [1, 2, 4],
                        'model__bootstrap': [True]}
        
        scoring = 'f1_weighted'
        tscv = TimeSeriesSplit(n_splits=5)

        self.model = GridSearchCV(estimator=pipeline,
                                   param_grid=search_grid,
                                   cv=tscv,
                                   scoring=scoring)
'''
    Ostirion Predictive Power Score (Factor) Selector Demostration Algorithm
    version 1.0
    Copyright (C) 2021  Ostirion

    This program is free software: you can redistribute it and/or modify
    it under the terms of the GNU General Public License as published by
    the Free Software Foundation, either version 3 of the License, or
    (at your option) any later version.

    This program is distributed in the hope that it will be useful,
    but WITHOUT ANY WARRANTY; without even the implied warranty of
    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
    GNU General Public License for more details.

    You should have received a copy of the GNU General Public License
    along with this program.  If not, see <https://www.gnu.org/licenses/>.
    
    Contact: www.ostirion.net/contact
'''

from PPSSelector import PPSSelector

class PPSPredictor(QCAlgorithm):

    def Initialize(self):
        N_YEARS = 5
        test_length = timedelta(days= int(365 * N_YEARS)+1)
        self.SetStartDate(datetime.today() - test_length)
        self.SetEndDate(datetime.today())
        self.SetCash(1000000)

        # Hourly and Daily resolutions are posible, minute data minimizes
        # 'stale data' warning messages and increases model run time.
        resolution = Resolution.Daily
        # Add Required Symbols:
        # Run in Quantconnect. 
        # Using index of the factor lists if parameter testing is required.
        # Enter candidate factors in factors_0 otherwise.

        factors_0 = ['XLK', 'XLY', 'XLB', 'XLV', 'XLP', 'XLI', 'XLU', 'XLF', 'XLE']
        factors_1 = ['TLT', 'QQQ', 'SPY']
        factors_2 = ['QQQ']
        
        factor_list = [factors_0, factors_1, factors_2]
        
        try:
            index = int(self.GetParameter("index"))
        except:
            self.Debug('No parameters found, using default.')
            index = int(0)

        factors = factor_list[index]
        targets = ['SPY']
        hedges = []
    
        symbols = [self.AddEquity(ticker, resolution).Symbol for ticker in factors + targets + hedges]
    
        self.SetBenchmark("SPY")
        self.UniverseSettings.Resolution = resolution
        self.SetBrokerageModel(AlphaStreamsBrokerageModel())
        self.AddAlpha(PPSAlphaModel(factors, 
                                    targets,
                                    hedges,
                                    resolution))

        self.SetExecution(ImmediateExecutionModel())
        self.SetPortfolioConstruction(InsightWeightingPortfolioConstructionModel(lambda time: None))


class PPSAlphaModel(AlphaModel):
    """
    Predictive Power Score demostrator alpha model.
    """

    def __init__(self, factors, targets, hedges, resolution):

        self.Name = "PPSAlphaModel"

        # Factor, prediction target and instruments to exploit the predictions.
        self.factors = factors
        self.targets = targets
        self.hedges = hedges
        
        # Dynamically allow for hour/day/minute resolutions:
        self.resolution = resolution

        # Parameters for machine learning model:
        # ML model intial training and retraining:
        self.selector = False
        self.training_period = 252*5 # Now passed into history as data points
        self.std_cut_off = 3

        # Other operational parameters:
        self.time_slack = 10 # Minutes after market open.
        self.expected_gain = 0.05 # Fixed value, the model is unable predict this, yet.

        # Directions dictionary:
        self.directions = [InsightDirection.Down,
                           InsightDirection.Up,
                           InsightDirection.Flat]

    def Update(self, algorithm, data):
        # Initialize every slice of data:
        insights = []
        
        if not data.HasData:
            return []

        # Obtain the market operating time:
        hours = algorithm.ActiveSecurities[self.targets[0]].Exchange.Hours

        # Positions are entered as market opens:
        previous_day = algorithm.Time - timedelta(days=int(True))
        slack = timedelta(minutes=self.time_slack)
        operating_time = hours.GetNextMarketOpen(previous_day, False) + slack

        if (algorithm.Time.hour == 10 and algorithm.Time.minute == 0) or self.resolution == Resolution.Daily:
            for target in self.targets:
                # If invested, wait for the insight to decay:
                if algorithm.Securities[target].Invested: return[]
            
            self.selector = PPSSelector(algorithm,
                                         self.factors,
                                         self.targets,
                                         std_cut_off = self.std_cut_off,
                                         training_period=self.training_period)

            prediction, probability = self.selector.get_pps_selection()
            
            # Handle possible error models:
            if prediction == -1: return []
            tdelta = timedelta(days=self.selector.prediction_duration)
            insight_duration = hours.GetNextMarketClose(algorithm.Time + tdelta,
                                                        False) - algorithm.Time

            direction_instrument = self.directions[int(prediction)]

            # TODO: Alternative bet_size to be implemented in the future:
            # bet_size = (probability-0.5)*2
            # TODO: remove codetags.

            bet_size = 1
            for target in self.targets:
                if not data.ContainsKey(target): continue
                insights.append(Insight(target,
                                        insight_duration,
                                        InsightType.Price,
                                        direction_instrument,
                                        self.expected_gain,
                                        probability,
                                        self.Name,
                                        bet_size))

            for hedge in self.hedges:
                if not data.ContainsKey(hedge): continue
                insights.append(Insight(hedge,
                                        insight_duration,
                                        InsightType.Price,
                                        direction_instrument,
                                        self.expected_gain,
                                        probability,
                                        self.Name,
                                        1-bet_size))

            return insights

        return []
import matplotlib.dates as mdates
import matplotlib.pyplot as plt
import pandas as pd
import numpy as np
import seaborn as sns


def plot_df(df, color='blue', size=(16, 7), legend='Close Price', y_label='Price in USD'):
    plt.style.use('dark_background')
    plt.rcParams["figure.figsize"] = size
    ax = df.plot()
    plt.ylabel(y_label)
    x = 0.01
    y = 0.01
    plt.text(x, y, 'www.ostirion.net', fontsize=15, transform=ax.transAxes)
    plt.legend(ncol=int(len(df.columns) / 2))
    date_form = mdates.DateFormatter("%m-%Y")
    plt.xticks(rotation=45);
    plt.show()
    
def plot_corr_hm(df, title='Title', size=(16, 7), annot = True):
    corr = df.corr()
    plt.style.use('dark_background')
    plt.rcParams["figure.figsize"] = size
    mask = np.triu(np.ones_like(corr, dtype=bool))
    cmap = sns.color_palette("RdBu")
    ax = sns.heatmap(corr, mask=mask, vmax=.3, center=0, cmap=cmap, annot=annot,
                     square=True, linewidths=0, cbar_kws={"shrink": .5}, fmt='g')
    ax.set_title(title)
    plt.setp(ax.get_yticklabels(), rotation=0);
    plt.setp(ax.get_xticklabels(), rotation=90);
    plt.show()

def plot_cm(df, title='Title', size=(16,7)):
    plt.style.use('dark_background')
    plt.rcParams["figure.figsize"] = size
    cmap = sns.color_palette("Blues")
    ax = sns.heatmap(df, cmap=cmap, annot=True, linewidths=0, cbar_kws={"shrink": .5}, fmt='g')
    ax.set_title(title)
    plt.xlabel('Predicted')
    plt.ylabel('True')
    plt.setp(ax.get_xticklabels(), rotation=0);
'''
The MIT License (MIT)

Copyright (c) 2020 8080 Labs
Copyright (c) 2021 Ostirion

Permission is hereby granted, free of charge, to any person obtaining a copy
of this software and associated documentation files (the "Software"), to deal
in the Software without restriction, including without limitation the rights
to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
copies of the Software, and to permit persons to whom the Software is
furnished to do so, subject to the following conditions:

The above copyright notice and this permission notice shall be included in all
copies or substantial portions of the Software.

THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
SOFTWARE.
'''

from sklearn import tree
from sklearn import preprocessing
from sklearn.model_selection import cross_val_score
from sklearn.metrics import mean_absolute_error, f1_score

from sklearn.preprocessing import StandardScaler, MinMaxScaler
from sklearn.pipeline import Pipeline

import pandas as pd
from pandas.api.types import (
    is_numeric_dtype,
    is_bool_dtype,
    is_object_dtype,
    is_categorical_dtype,
    is_string_dtype,
    is_datetime64_any_dtype,
    is_timedelta64_dtype,
)


NOT_SUPPORTED_ANYMORE = "NOT_SUPPORTED_ANYMORE"
TO_BE_CALCULATED = -1


def _calculate_model_cv_score_(
    df, target, feature, task, cross_validation, random_seed, **kwargs
):
    "Calculates the mean model score based on cross-validation"
    # Sources about the used methods:
    # https://scikit-learn.org/stable/modules/tree.html
    # https://scikit-learn.org/stable/modules/cross_validation.html
    # https://scikit-learn.org/stable/modules/generated/sklearn.model_selection.cross_val_score.html
    metric = task["metric_key"]
    model = task["model"]
    # shuffle the rows - this is important for cross-validation
    # because the cross-validation just takes the first n lines
    # if there is a strong pattern in the rows eg 0,0,0,0,1,1,1,1
    # then this will lead to problems because the first cv sees mostly 0 and the later 1
    # this approach might be wrong for timeseries because it might leak information
    # Fixing the time_series:
    if not TIME_SERIES:
        df = df.sample(frac=1, random_state=random_seed, replace=False)

    # preprocess target
    if task["type"] == "classification":
        label_encoder = preprocessing.LabelEncoder()
        df[target] = label_encoder.fit_transform(df[target])
        target_series = df[target]
    else:
        target_series = df[target]

    # preprocess feature
    if _dtype_represents_categories(df[feature]):
        one_hot_encoder = preprocessing.OneHotEncoder()
        array = df[feature].__array__()
        sparse_matrix = one_hot_encoder.fit_transform(array.reshape(-1, 1))
        feature_input = sparse_matrix
    else:
        # reshaping needed because there is only 1 feature
        feature_input = df[feature].values.reshape(-1, 1)

    # Cross-validation is stratifiedKFold for classification, KFold for regression
    # CV on one core (n_job=1; default) has shown to be fastest
    try:
        scores = cross_val_score(
        model, feature_input, target_series, cv=cross_validation, scoring=metric
        )
    except:
        return 0

    return scores.mean()

def _normalized_mae_score(model_mae, naive_mae):
    "Normalizes the model MAE score, given the baseline score"
    # # Value range of MAE is [0, infinity), 0 is best
    # 10, 5 ==> 0 because worse than naive
    # 10, 20 ==> 0.5
    # 5, 20 ==> 0.75 = 1 - (mae/base_mae)
    if model_mae > naive_mae:
        return 0
    else:
        return 1 - (model_mae / naive_mae)

def _mae_normalizer(df, y, model_score, **kwargs):
    "In case of MAE, calculates the baseline score for y and derives the PPS."
    df["naive"] = df[y].median()
    baseline_score = mean_absolute_error(df[y], df["naive"])  # true, pred

    ppscore = _normalized_mae_score(abs(model_score), baseline_score)
    return ppscore, baseline_score

def _normalized_f1_score(model_f1, baseline_f1):
    "Normalizes the model F1 score, given the baseline score"
    # # F1 ranges from 0 to 1
    # # 1 is best
    # 0.5, 0.7 ==> 0 because model is worse than naive baseline
    # 0.75, 0.5 ==> 0.5

    if model_f1 < baseline_f1:
        return 0
    else:
        scale_range = 1.0 - baseline_f1  # eg 0.3
        f1_diff = model_f1 - baseline_f1  # eg 0.1
        return f1_diff / scale_range  # 0.1/0.3 = 0.33

def _f1_normalizer(df, y, model_score, random_seed):
    "In case of F1, calculates the baseline score for y and derives the PPS."
    label_encoder = preprocessing.LabelEncoder()
    df["truth"] = label_encoder.fit_transform(df[y])
    df["most_common_value"] = df["truth"].value_counts().index[0]
    random = df["truth"].sample(frac=1, random_state=random_seed)

    baseline_score = max(
        f1_score(df["truth"], df["most_common_value"], average="weighted"),
        f1_score(df["truth"], random, average="weighted"),
    )

    ppscore = _normalized_f1_score(model_score, baseline_score)
    return ppscore, baseline_score

TIME_SERIES = False
VALID_CALCULATIONS = {
    "regression": {
        "type": "regression",
        "is_valid_score": True,
        "model_score": TO_BE_CALCULATED,
        "baseline_score": TO_BE_CALCULATED,
        "ppscore": TO_BE_CALCULATED,
        "metric_name": "mean absolute error",
        "metric_key": "neg_mean_absolute_error",
        "model": tree.DecisionTreeRegressor(),
        "score_normalizer": _mae_normalizer,
    },
    "classification": {
        "type": "classification",
        "is_valid_score": True,
        "model_score": TO_BE_CALCULATED,
        "baseline_score": TO_BE_CALCULATED,
        "ppscore": TO_BE_CALCULATED,
        "metric_name": "weighted F1",
        "metric_key": "f1_weighted",
        "model":  tree.DecisionTreeClassifier(),
        "score_normalizer": _f1_normalizer,
    },
    "predict_itself": {
        "type": "predict_itself",
        "is_valid_score": True,
        "model_score": 1,
        "baseline_score": 0,
        "ppscore": 1,
        "metric_name": None,
        "metric_key": None,
        "model": None,
        "score_normalizer": None,
    },
    "target_is_constant": {
        "type": "target_is_constant",
        "is_valid_score": True,
        "model_score": 1,
        "baseline_score": 1,
        "ppscore": 0,
        "metric_name": None,
        "metric_key": None,
        "model": None,
        "score_normalizer": None,
    },
    "target_is_id": {
        "type": "target_is_id",
        "is_valid_score": True,
        "model_score": 0,
        "baseline_score": 0,
        "ppscore": 0,
        "metric_name": None,
        "metric_key": None,
        "model": None,
        "score_normalizer": None,
    },
    "feature_is_id": {
        "type": "feature_is_id",
        "is_valid_score": True,
        "model_score": 0,
        "baseline_score": 0,
        "ppscore": 0,
        "metric_name": None,
        "metric_key": None,
        "model": None,
        "score_normalizer": None,
    },
}

INVALID_CALCULATIONS = [
    "target_is_datetime",
    "target_data_type_not_supported",
    "empty_dataframe_after_dropping_na",
    "unknown_error",
]

def _dtype_represents_categories(series) -> bool:
    "Determines if the dtype of the series represents categorical values"
    return (
        is_bool_dtype(series)
        or is_object_dtype(series)
        or is_string_dtype(series)
        or is_categorical_dtype(series)
    )

def _determine_case_and_prepare_df(df, x, y, sample=5_000, random_seed=123):
    "Returns str with the name of the determined case based on the columns x and y"
    if x == y:
        return df, "predict_itself"

    df = df[[x, y]]
    # IDEA: log.warning when values have been dropped
    df = df.dropna()

    if len(df) == 0:
        return df, "empty_dataframe_after_dropping_na"
        # IDEA: show warning
        # raise Exception(
        #     "After dropping missing values, there are no valid rows left"
        # )

    df = _maybe_sample(df, sample, random_seed=random_seed)

    if _feature_is_id(df, x):
        return df, "feature_is_id"

    category_count = df[y].value_counts().count()
    if category_count == 1:
        # it is helpful to separate this case in order to save unnecessary calculation time
        return df, "target_is_constant"
    if _dtype_represents_categories(df[y]) and (category_count == len(df[y])):
        # it is important to separate this case in order to save unnecessary calculation time
        return df, "target_is_id"

    if _dtype_represents_categories(df[y]):
        return df, "classification"
    if is_numeric_dtype(df[y]):
        # this check needs to be after is_bool_dtype (which is part of _dtype_represents_categories) because bool is considered numeric by pandas
        return df, "regression"

    if is_datetime64_any_dtype(df[y]) or is_timedelta64_dtype(df[y]):
        # IDEA: show warning
        # raise TypeError(
        #     f"The target column {y} has the dtype {df[y].dtype} which is not supported. A possible solution might be to convert {y} to a string column"
        # )
        return df, "target_is_datetime"

    # IDEA: show warning
    # raise Exception(
    #     f"Could not infer a valid task based on the target {y}. The dtype {df[y].dtype} is not yet supported"
    # )  # pragma: no cover
    return df, "target_data_type_not_supported"


def _feature_is_id(df, x):
    "Returns Boolean if the feature column x is an ID"
    if not _dtype_represents_categories(df[x]):
        return False

    category_count = df[x].value_counts().count()
    return category_count == len(df[x])

def _maybe_sample(df, sample, random_seed=None):
    """
    Maybe samples the rows of the given df to have at most `sample` rows
    If sample is `None` or falsy, there will be no sampling.
    If the df has fewer rows than the sample, there will be no sampling.
    Parameters
    ----------
    df : pandas.DataFrame
        Dataframe that might be sampled
    sample : int or `None`
        Number of rows to be sampled
    random_seed : int or `None`
        Random seed that is forwarded to pandas.DataFrame.sample as `random_state`
    Returns
    -------
    pandas.DataFrame
        DataFrame after potential sampling
    """
    if sample and len(df) > sample:
        # this is a problem if x or y have more than sample=5000 categories
        # TODO: dont sample when the problem occurs and show warning
        df = df.sample(sample, random_state=random_seed, replace=False)
    return df

def _is_column_in_df(column, df):
    try:
        return column in df.columns
    except:
        return False

def _score(
    df, x, y, task, sample, cross_validation, random_seed, invalid_score, catch_errors
):
    df, case_type = _determine_case_and_prepare_df(
        df, x, y, sample=sample, random_seed=random_seed
    )
    task = _get_task(case_type, invalid_score)

    if case_type in ["classification", "regression"]:
        model_score = _calculate_model_cv_score_(
            df,
            target=y,
            feature=x,
            task=task,
            cross_validation=cross_validation,
            random_seed=random_seed,
        )
        # IDEA: the baseline_scores do sometimes change significantly, e.g. for F1 and thus change the PPS
        # we might want to calculate the baseline_score 10 times and use the mean in order to have less variance
        ppscore, baseline_score = task["score_normalizer"](
            df, y, model_score, random_seed=random_seed
        )
    else:
        model_score = task["model_score"]
        baseline_score = task["baseline_score"]
        ppscore = task["ppscore"]

    return {
        "x": x,
        "y": y,
        "ppscore": ppscore,
        "case": case_type,
        "is_valid_score": task["is_valid_score"],
        "metric": task["metric_name"],
        "baseline_score": baseline_score,
        "model_score": abs(model_score),  # sklearn returns negative mae
        "model": task["model"],
    }

def score(
    df,
    x,
    y,
    pipeline = [],
    time_series = False,
    task=NOT_SUPPORTED_ANYMORE,
    sample=5_000,
    cross_validation=4,
    random_seed=123,
    invalid_score=0,
    catch_errors=True,
):
    """
    Calculate the Predictive Power Score (PPS) for "x predicts y"
    The score always ranges from 0 to 1 and is data-type agnostic.
    A score of 0 means that the column x cannot predict the column y better than a naive baseline model.
    A score of 1 means that the column x can perfectly predict the column y given the model.
    A score between 0 and 1 states the ratio of how much potential predictive power the model achieved compared to the baseline model.
    Parameters
    ----------
    df : pandas.DataFrame
        Dataframe that contains the columns x and y
    x : str
        Name of the column x which acts as the feature
    y : str
        Name of the column y which acts as the target
    sample : int or `None`
        Number of rows for sampling. The sampling decreases the calculation time of the PPS.
        If `None` there will be no sampling.
    cross_validation : int
        Number of iterations during cross-validation. This has the following implications:
        For example, if the number is 4, then it is possible to detect patterns when there are at least 4 times the same observation. If the limit is increased, the required minimum observations also increase. This is important, because this is the limit when sklearn will throw an error and the PPS cannot be calculated
    random_seed : int or `None`
        Random seed for the parts of the calculation that require random numbers, e.g. shuffling or sampling.
        If the value is set, the results will be reproducible. If the value is `None` a new random number is drawn at the start of each calculation.
    invalid_score : any
        The score that is returned when a calculation is invalid, e.g. because the data type was not supported.
    catch_errors : bool
        If `True` all errors will be catched and reported as `unknown_error` which ensures convenience. If `False` errors will be raised. This is helpful for inspecting and debugging errors.
    Returns
    -------
    Dict
        A dict that contains multiple fields about the resulting PPS.
        The dict enables introspection into the calculations that have been performed under the hood
    """
    global TIME_SERIES
    if time_series: TIME_SERIES = True
    else: TIME_SERIES = False
    
    global VALID_CALCULATIONS
    if pipeline:
        VALID_CALCULATIONS['regression']['model'] = Pipeline(pipeline + [('tree', tree.DecisionTreeRegressor())] )
        VALID_CALCULATIONS['classification']['model'] = Pipeline( pipeline + [('tree', tree.DecisionTreeClassifier())] )
    else:
        VALID_CALCULATIONS['regression']['model'] = tree.DecisionTreeRegressor()
        VALID_CALCULATIONS['classification']['model'] = tree.DecisionTreeClassifier()

    if not isinstance(df, pd.DataFrame):
        raise TypeError(
            f"The 'df' argument should be a pandas.DataFrame but you passed a {type(df)}\nPlease convert your input to a pandas.DataFrame"
        )
    if not _is_column_in_df(x, df):
        raise ValueError(
            f"The 'x' argument should be the name of a dataframe column but the variable that you passed is not a column in the given dataframe.\nPlease review the column name or your dataframe"
        )
    if len(df[[x]].columns) >= 2:
        raise AssertionError(
            f"The dataframe has {len(df[[x]].columns)} columns with the same column name {x}\nPlease adjust the dataframe and make sure that only 1 column has the name {x}"
        )
    if not _is_column_in_df(y, df):
        raise ValueError(
            f"The 'y' argument should be the name of a dataframe column but the variable that you passed is not a column in the given dataframe.\nPlease review the column name or your dataframe"
        )
    if len(df[[y]].columns) >= 2:
        raise AssertionError(
            f"The dataframe has {len(df[[y]].columns)} columns with the same column name {y}\nPlease adjust the dataframe and make sure that only 1 column has the name {y}"
        )
    if task is not NOT_SUPPORTED_ANYMORE:
        raise AttributeError(
            "The attribute 'task' is no longer supported because it led to confusion and inconsistencies.\nThe task of the model is now determined based on the data types of the columns. If you want to change the task please adjust the data type of the column.\nFor more details, please refer to the README"
        )

    if random_seed is None:
        from random import random

        random_seed = int(random() * 1000)

    try:
        return _score(
            df,
            x,
            y,
            task,
            sample,
            cross_validation,
            random_seed,
            invalid_score,
            catch_errors,
        )
    except Exception as exception:
        if catch_errors:
            case_type = "unknown_error"
            task = _get_task(case_type, invalid_score)
            return {
                "x": x,
                "y": y,
                "ppscore": task["ppscore"],
                "case": case_type,
                "is_valid_score": task["is_valid_score"],
                "metric": task["metric_name"],
                "baseline_score": task["baseline_score"],
                "model_score": task["model_score"],  # sklearn returns negative mae
                "model": task["model"],
            }
        else:
            raise exception


def _get_task(case_type, invalid_score):
    if case_type in VALID_CALCULATIONS.keys():
        return VALID_CALCULATIONS[case_type]
    elif case_type in INVALID_CALCULATIONS:
        return {
            "type": case_type,
            "is_valid_score": False,
            "model_score": invalid_score,
            "baseline_score": invalid_score,
            "ppscore": invalid_score,
            "metric_name": None,
            "metric_key": None,
            "model": None,
            "score_normalizer": None,
        }
    raise Exception(f"case_type {case_type} is not supported")

def _format_list_of_dicts(scores, output, sorted):
    """
    Format list of score dicts `scores`
    - maybe sort by ppscore
    - maybe return pandas.Dataframe
    - output can be one of ["df", "list"]
    """
    if sorted:
        scores.sort(key=lambda item: item["ppscore"], reverse=True)

    if output == "df":
        df_columns = [
            "x",
            "y",
            "ppscore",
            "case",
            "is_valid_score",
            "metric",
            "baseline_score",
            "model_score",
            "model",
        ]
        data = {column: [score[column] for score in scores] for column in df_columns}
        scores = pd.DataFrame.from_dict(data)

    return scores

def predictors(df, y, output="df", pipeline = [], time_series=False, sorted=True, **kwargs):
    """
    Calculate the Predictive Power Score (PPS) of all the features in the dataframe
    against a target column
    Parameters
    ----------
    df : pandas.DataFrame
        The dataframe that contains the data
    y : str
        Name of the column y which acts as the target
    output: str - potential values: "df", "list"
        Control the type of the output. Either return a pandas.DataFrame (df) or a list with the score dicts
    sorted: bool
        Whether or not to sort the output dataframe/list by the ppscore
    pipeline: list
        list of transformstions to be included in the processing of the data
    kwargs:
        Other key-word arguments that shall be forwarded to the pps.score method,
        e.g. `sample, `cross_validation, `random_seed, `invalid_score`, `catch_errors`
    Returns
    -------
    pandas.DataFrame or list of Dict
        Either returns a tidy dataframe or a list of all the PPS dicts. This can be influenced
        by the output argument
    """
     
    if not isinstance(df, pd.DataFrame):
        raise TypeError(
            f"The 'df' argument should be a pandas.DataFrame but you passed a {type(df)}\nPlease convert your input to a pandas.DataFrame"
        )
    if not _is_column_in_df(y, df):
        raise ValueError(
            f"The 'y' argument should be the name of a dataframe column but the variable that you passed is not a column in the given dataframe.\nPlease review the column name or your dataframe"
        )
    if len(df[[y]].columns) >= 2:
        raise AssertionError(
            f"The dataframe has {len(df[[y]].columns)} columns with the same column name {y}\nPlease adjust the dataframe and make sure that only 1 column has the name {y}"
        )
    if not output in ["df", "list"]:
        raise ValueError(
            f"""The 'output' argument should be one of ["df", "list"] but you passed: {output}\nPlease adjust your input to one of the valid values"""
        )
    if not sorted in [True, False]:
        raise ValueError(
            f"""The 'sorted' argument should be one of [True, False] but you passed: {sorted}\nPlease adjust your input to one of the valid values"""
        )

    scores = [score(df, column, y, pipeline, time_series, **kwargs) for column in df if column != y]

    return _format_list_of_dicts(scores=scores, output=output, sorted=sorted)

def matrix(df, output="df", sorted=False, **kwargs):
    """
    Calculate the Predictive Power Score (PPS) matrix for all columns in the dataframe
    Parameters
    ----------
    df : pandas.DataFrame
        The dataframe that contains the data
    output: str - potential values: "df", "list"
        Control the type of the output. Either return a pandas.DataFrame (df) or a list with the score dicts
    sorted: bool
        Whether or not to sort the output dataframe/list by the ppscore
    kwargs:
        Other key-word arguments that shall be forwarded to the pps.score method,
        e.g. `sample, `cross_validation, `random_seed, `invalid_score`, `catch_errors`
    Returns
    -------
    pandas.DataFrame or list of Dict
        Either returns a tidy dataframe or a list of all the PPS dicts. This can be influenced
        by the output argument
    """
    if not isinstance(df, pd.DataFrame):
        raise TypeError(
            f"The 'df' argument should be a pandas.DataFrame but you passed a {type(df)}\nPlease convert your input to a pandas.DataFrame"
        )
    if not output in ["df", "list"]:
        raise ValueError(
            f"""The 'output' argument should be one of ["df", "list"] but you passed: {output}\nPlease adjust your input to one of the valid values"""
        )
    if not sorted in [True, False]:
        raise ValueError(
            f"""The 'sorted' argument should be one of [True, False] but you passed: {sorted}\nPlease adjust your input to one of the valid values"""
        )

    scores = [score(df, x, y, **kwargs) for x in df for y in df]

    return _format_list_of_dicts(scores=scores, output=output, sorted=sorted)