Overall Statistics
Total Trades
113
Average Win
3.21%
Average Loss
-4.15%
Compounding Annual Return
4.724%
Drawdown
41.900%
Expectancy
0.109
Net Profit
25.977%
Sharpe Ratio
0.319
Probabilistic Sharpe Ratio
4.641%
Loss Rate
38%
Win Rate
62%
Profit-Loss Ratio
0.77
Alpha
0.076
Beta
-0.147
Annual Standard Deviation
0.168
Annual Variance
0.028
Information Ratio
-0.384
Tracking Error
0.257
Treynor Ratio
-0.365
Total Fees
$2855.31
import pandas as pd
import numpy as np
import timeseriespps as tspps

from sklearn.model_selection import TimeSeriesSplit as tss
from sklearn.preprocessing import StandardScaler, MinMaxScaler
from sklearn.pipeline import Pipeline

from scipy.stats import zscore
from statistics import mode

import warnings
warnings.filterwarnings("ignore")

import re
from mlfinlab import fracdiff as fd
import pipegridmodels as pgm

from sklearn.metrics import accuracy_score
from sklearn.metrics import roc_auc_score
from sklearn.metrics import f1_score


class PPSSelector:
    
    def __init__(self, 
                 algorithm,
                 factors,
                 targets,
                 pasts=[3, 5, 10, 15, 22, 66],
                 futures=[5,10, 15, 22],
                 emas = [5, 15, 22, 56],
                 training_period=1825):

        self.algorithm = algorithm
        self.training_period = training_period
        self.factors = factors
        self.targets = targets
        self.pasts = pasts
        self.futures = futures
        self.emas = emas
        
        self.models = False
        self.is_trained = False
        self.threshold = False
        
        self.best_target = False
        self.best_predictors = False
        self.prediction_duration = False

    def get_pps_selection(self):
        
        factors = self.factors
        targets = self.targets
        algorithm = self.algorithm
        futures = self.futures
        pasts = self.pasts

        symbols = list(algorithm.ActiveSecurities.Keys)
        history = algorithm.History(symbols, self.training_period, Resolution.Daily)
        tickers = {str(symbol): str(symbol.ID) for symbol in symbols}
        inv_symbols = dict(map(reversed, tickers.items()))
    
        # Treatment of the price:
        price_history = history['close'].unstack(level=0)
        price_history.columns = [inv_symbols[col] for col in price_history.columns]
        price_history = pd.DataFrame(price_history)

        # Treatment of the volume:
        volume_history = history['volume'].unstack(level=0)
        volume_history.columns = ['VOL_'+inv_symbols[col] for col in volume_history.columns]
        volume_history = pd.DataFrame(volume_history)

        # Treatment of Fracdiffs:
        diff_price = fd.frac_diff(price_history, 0.3, ).dropna()
        diff_price.columns = ['FD_'+col for col in diff_price.columns]
        diff_price = pd.DataFrame(diff_price)

        # Join all
        price_history = price_history.join(volume_history).join(diff_price)

        # Valid factors:
        valid_prices = [value for value in price_history.columns if value in factors]
        valid_volumes = [value for value in volume_history.columns if value[4:] in factors]
        valid_difss = list(diff_price.columns)
        valid_factors = valid_prices + valid_volumes + valid_difss 
        
        for column in price_history[valid_factors]:
            for frame in pasts:
                price_history['P_'+str(frame)+"_"+column+'_PAST'] = price_history[column].shift(frame)
                price_history['R_'+str(frame)+"_"+column+'_PAST'] = price_history[column].pct_change(frame)
                for ema in self.emas:
                    price_history['EMA'+str(ema)+'_'+str(frame)+"_"+column+'_PAST'] = price_history[column].shift(frame).ewm(span=ema).mean()

        for column in price_history[targets]:
            for frame in futures:
                price_history['D_'+str(frame)+"_"+column+'_FUT'] = -price_history[column].pct_change(-frame).dropna() > 0
        
        price_history.dropna(inplace=True)

        pipe = [('1',StandardScaler()), ('2', MinMaxScaler())]
        all_predictors = pd.DataFrame(columns=['x','y','ppscore'])

        future_cols = [col for col in price_history.columns if 'FUT' in col and 'D_' in col]
        past_cols = [col for col in price_history.columns if 'PAST' in col]

        for column in future_cols:
                predictors_df = tspps.predictors(price_history[past_cols + [column]].dropna(),
                                           y=column,
                                           pipeline=pipe,
                                           time_series=True,
                                           cross_validation=tss(4))

                all_predictors = all_predictors.append(predictors_df[['x','y','ppscore']])

        sorted_predictors = all_predictors.sort_values('ppscore', ascending=False).reset_index(drop=True)
        
        try:
            sorted_predictors['std_ppscore'] = sorted_predictors[['ppscore']].apply(zscore)
        except:
            algorithm.Debug("Zscore Error")
            return -1, -1
        
        cut_off_devs = 3
        pred_type = 'D_'
        good_predictors = sorted_predictors[sorted_predictors['std_ppscore'] > cut_off_devs]
        good_predictors = good_predictors[ good_predictors['y'].str.contains(pred_type) ].reset_index(drop=True)
        good_targets = good_predictors.groupby('y')[['std_ppscore']].mean()
        good_targets.sort_values('std_ppscore', ascending=False, inplace=True)

        INDEX = 0
        self.best_target = good_targets.iloc[INDEX].name
        self.best_predictors = list(good_predictors[good_predictors['y'] == self.best_target]['x'])
        self.prediction_duration = int(re.findall(r'\d+', self.best_target)[0])
        
        # Generate data:
        data = price_history[self.best_predictors + [self.best_target]]

        total_samples = len(data)
        X = data[self.best_predictors]
        y = data[self.best_target].astype(bool)

        self.models = [pgm.RfcPipeGrid().model, pgm.SVCPipeGrid().model , pgm.MlpPipeGrid().model ]
        #self.models = [pgm.SVCPipeGrid().model]

        # Fit the random search model
        for estimator in self.models:
            try: estimator.fit(X, y)
            except:
                algorithm.Debug('Estimator Fit Error')
                return -1, -1

        probabilities = []
        thresh_values = np.arange(0, 1, 0.01)
        thresholds = []

        def to_labels(pos_probs, threshold):
                return (pos_probs >= threshold).astype('int')

        for model in self.models:
            pred = model.predict(X)[-1]
            proba = model.predict_proba(X)
            probabilities.append(proba[-1][1])
            
            # Model threshold:
            probs = proba[:, 1]
            # define thresholds
            
            scores_f1 = [f1_score(y, to_labels(probs, t), average='weighted') for t in thresh_values]
            scores_roc_auc = [roc_auc_score(y, to_labels(probs, t), average='weighted') for t in thresh_values]
            scores_accuracy = [accuracy_score(y, to_labels(probs, t)) for t in thresh_values]
            ix_f1 = np.argmax(scores_f1)
            ix_roc_auc = np.argmax(scores_roc_auc)
            ix_acc = np.argmax(scores_accuracy)
            all_scores = [thresh_values[ix_f1], thresh_values[ix_roc_auc], thresh_values[ix_acc] ]
            mean_thresh = np.mean([all_scores[0]])
            thresholds.append(mean_thresh)
            
        predictions = []
        for i in range(len(self.models)):
            if probabilities[i] > thresholds[i]: predictions.append(1)
            else: predictions.append(0)
        
        prediction = mode(predictions)
        time = algorithm.Time.date()
        algorithm.Debug(str(time)+' Predicted: '+str(prediction))
        algorithm.Debug(str( pred ))
        
        return prediction, 1
from sklearn.pipeline import Pipeline
from sklearn.model_selection import GridSearchCV
from sklearn.ensemble import RandomForestClassifier as rfc
from sklearn.neural_network import MLPClassifier as mlp
from sklearn.svm import SVC
from sklearn.model_selection import TimeSeriesSplit
from sklearn.preprocessing import StandardScaler, MinMaxScaler

class MlpPipeGrid:
    def __init__(self):
        model_type = mlp()
        scalers = [('scaler1', StandardScaler() ), 
                   ('scaler2', MinMaxScaler() ),]
        pipeline = Pipeline([*scalers, ('model', model_type )])

        search_grid = {'model__hidden_layer_sizes': [(50,50,50), (50,100,50), (100,)],
                    'model__activation': ['tanh', 'relu'],
                    'model__solver': ['sgd', 'adam'],
                    'model__alpha': [0.0001, 0.05],
                    'model__learning_rate': ['constant','adaptive'],
                    }
        
        scoring = 'f1_weighted'
        tscv = TimeSeriesSplit(n_splits=5)

        self.model =  GridSearchCV(estimator=pipeline,
                                   param_grid=search_grid,
                                   cv=tscv,
                                   scoring=scoring)

class SVCPipeGrid:
    def __init__(self):
        model_type = SVC()
        scalers = [('scaler1', StandardScaler() ), 
                   ('scaler2', MinMaxScaler() ),]
        pipeline = Pipeline([*scalers, ('model', model_type )])

        search_grid = {'model__C':[1,10,100,1000],
                       'model__gamma':[1,0.1,0.001,0.0001],
                       'model__kernel':['linear','rbf'],
                       'model__probability': [True]}
        
        scoring = 'f1_weighted'
        tscv = TimeSeriesSplit(n_splits=5)

        self.model = GridSearchCV(estimator=pipeline,
                                   param_grid=search_grid,
                                   cv=tscv,
                                   scoring=scoring)

class RfcPipeGrid:
    def __init__(self):
        model_type = rfc()
        scalers = [('scaler1', StandardScaler() ), 
                   ('scaler2', MinMaxScaler() ),]
        pipeline = Pipeline([*scalers, ('model', model_type )])

        search_grid = { 'model__n_estimators': [50, 100, 200],
                        'model__max_features': ['auto'],
                        'model__max_depth': [5, 10, None],
                        'model__min_samples_split': [5, 10],
                        'model__min_samples_leaf': [1, 2, 4],
                        'model__bootstrap': [True]}
        
        scoring = 'f1_weighted'
        tscv = TimeSeriesSplit(n_splits=5)

        self.model = GridSearchCV(estimator=pipeline,
                                   param_grid=search_grid,
                                   cv=tscv,
                                   scoring=scoring)
from PPSSelector import PPSSelector

class PPSPredictor(QCAlgorithm):

    def Initialize(self):
        N_YEARS = 5
        test_length = timedelta(days= int(365 * N_YEARS)+1)
        self.SetStartDate(datetime.today() - test_length)
        self.SetEndDate(datetime.today())
        self.SetCash(1000000)

        # Hourly and Daily resolutions are posible, minute data minimizes
        # 'stale data' warning messages.
        resolution = Resolution.Daily
        # Add Required Symbols

        factors_0 = ['XLK', 'XLY', 'XLB', 'XLV', 'XLP', 'XLI', 'XLU', 'XLF', 'XLE'] #XLC Dropped
        factors_1 = ['TLT', 'QQQ', 'SPY']
        factors_2 = ['QQQ']
        
        factor_list = [factors_0, factors_1, factors_2]
        
        index = int(self.GetParameter("index"))
        
        factors = factor_list[index]
        targets = ['SPY']
        hedges = []
    
        symbols = [self.AddEquity(ticker, resolution).Symbol for ticker in factors + targets + hedges]
    
        self.SetBenchmark("SPY")
        self.UniverseSettings.Resolution = resolution
        self.SetBrokerageModel(AlphaStreamsBrokerageModel())
        self.AddAlpha(PPSAlphaModel(factors, 
                                    targets,
                                    hedges,
                                    resolution))
    
        self.SetExecution(ImmediateExecutionModel())
        self.SetPortfolioConstruction(InsightWeightingPortfolioConstructionModel(lambda time: None))
        # self.SetRiskManagement(TrailingStopRiskManagementModel(0.025))


class PPSAlphaModel(AlphaModel):
    """
    """

    def __init__(self, factors, targets, hedges, resolution):

        self.Name = "PPSAlphaModel"
        # Factor, prediction target and instruments to exploit the predictions.
        self.factors = factors
        self.targets = targets
        self.hedges = hedges
        
        # Dynamically allow for hour/day/minute resolutions:
        self.resolution = resolution

        # Parameters for machine learning model:
        # ML model intial training and retraining:
        self.selector = False
        self.training_period = 252*5 # Now passed into history as data points

        # Other operational parameters:
        self.time_slack = 10 # Minutes after market open.
        self.expected_gain = 0.05 # Fixed value, the model is unable predict this.

        # Directions dictionary:
        self.directions = [InsightDirection.Down,
                           InsightDirection.Up,
                           InsightDirection.Flat]

    def Update(self, algorithm, data):
        # Initialize every slice of data:
        insights = []
        
        if not data.HasData:
            return []
            
        # Obtain the market operating time:
        hours = algorithm.ActiveSecurities[self.targets[0]].Exchange.Hours

        # Positions are entered as market opens:
        previous_day = algorithm.Time - timedelta(days=int(True))
        slack = timedelta(minutes=self.time_slack)
        operating_time = hours.GetNextMarketOpen(previous_day, False) + slack

        # If invested, wait for the insight to decay:
        if (algorithm.Time.hour == 10 and algorithm.Time.minute == 0) or self.resolution == Resolution.Daily:
            for target in self.targets:
                if algorithm.Securities[target].Invested: return[]
            
            self.selector = PPSSelector(algorithm,
                                         self.factors,
                                         self.targets,
                                         training_period=self.training_period)
            
            prediction, probability = self.selector.get_pps_selection()
            if prediction == -1: return []
            tdelta = timedelta(days=self.selector.prediction_duration)
            insight_duration = hours.GetNextMarketClose(algorithm.Time + tdelta,
                                                        False) - algorithm.Time
            
            direction_instrument = self.directions[int(prediction)]

            # TODO: Alternative bet_size to be implemented in the future:
            # Sorry for codetag.
            # bet_size = (probability-0.5)*2

            bet_size = 1
            for target in self.targets:
                if not data.ContainsKey(target): continue
                insights.append(Insight(target,
                                        insight_duration,
                                        InsightType.Price,
                                        direction_instrument,
                                        self.expected_gain,
                                        probability,
                                        self.Name,
                                        bet_size))

            for hedge in self.hedges:
                if not data.ContainsKey(hedge): continue
                insights.append(Insight(hedge,
                                        insight_duration,
                                        InsightType.Price,
                                        direction_instrument,
                                        self.expected_gain,
                                        probability,
                                        self.Name,
                                        1-bet_size))

            return insights

        return []
import matplotlib.dates as mdates
import matplotlib.pyplot as plt
import pandas as pd
import numpy as np
import seaborn as sns


def plot_df(df, color='blue', size=(16, 7), legend='Close Price', y_label='Price in USD'):
    plt.style.use('dark_background')
    plt.rcParams["figure.figsize"] = size
    ax = df.plot()
    plt.ylabel(y_label)
    x = 0.01
    y = 0.01
    plt.text(x, y, 'www.ostirion.net', fontsize=15, transform=ax.transAxes)
    plt.legend(ncol=int(len(df.columns) / 2))
    date_form = mdates.DateFormatter("%m-%Y")
    plt.xticks(rotation=45);
    plt.show()
    
def plot_corr_hm(df, title='Title', size=(16, 7), annot = True):
    corr = df.corr()
    plt.style.use('dark_background')
    plt.rcParams["figure.figsize"] = size
    mask = np.triu(np.ones_like(corr, dtype=bool))
    cmap = sns.color_palette("RdBu")
    ax = sns.heatmap(corr, mask=mask, vmax=.3, center=0, cmap=cmap, annot=annot,
                     square=True, linewidths=0, cbar_kws={"shrink": .5}, fmt='g')
    ax.set_title(title)
    plt.setp(ax.get_yticklabels(), rotation=0);
    plt.setp(ax.get_xticklabels(), rotation=90);
    plt.show()

def plot_cm(df, title='Title', size=(16,7)):
    plt.style.use('dark_background')
    plt.rcParams["figure.figsize"] = size
    cmap = sns.color_palette("Blues")
    ax = sns.heatmap(df, cmap=cmap, annot=True, linewidths=0, cbar_kws={"shrink": .5}, fmt='g')
    ax.set_title(title)
    plt.xlabel('Predicted')
    plt.ylabel('True')
    plt.setp(ax.get_xticklabels(), rotation=0);
# Import Modified PPS module:

from sklearn import tree
from sklearn import preprocessing
from sklearn.model_selection import cross_val_score
from sklearn.metrics import mean_absolute_error, f1_score

from sklearn.preprocessing import StandardScaler, MinMaxScaler
from sklearn.pipeline import Pipeline

import pandas as pd
from pandas.api.types import (
    is_numeric_dtype,
    is_bool_dtype,
    is_object_dtype,
    is_categorical_dtype,
    is_string_dtype,
    is_datetime64_any_dtype,
    is_timedelta64_dtype,
)


NOT_SUPPORTED_ANYMORE = "NOT_SUPPORTED_ANYMORE"
TO_BE_CALCULATED = -1


def _calculate_model_cv_score_(
    df, target, feature, task, cross_validation, random_seed, **kwargs
):
    "Calculates the mean model score based on cross-validation"
    # Sources about the used methods:
    # https://scikit-learn.org/stable/modules/tree.html
    # https://scikit-learn.org/stable/modules/cross_validation.html
    # https://scikit-learn.org/stable/modules/generated/sklearn.model_selection.cross_val_score.html
    metric = task["metric_key"]
    model = task["model"]
    # shuffle the rows - this is important for cross-validation
    # because the cross-validation just takes the first n lines
    # if there is a strong pattern in the rows eg 0,0,0,0,1,1,1,1
    # then this will lead to problems because the first cv sees mostly 0 and the later 1
    # this approach might be wrong for timeseries because it might leak information
    # Fixing the time_series:
    if not TIME_SERIES:
        df = df.sample(frac=1, random_state=random_seed, replace=False)

    # preprocess target
    if task["type"] == "classification":
        label_encoder = preprocessing.LabelEncoder()
        df[target] = label_encoder.fit_transform(df[target])
        target_series = df[target]
    else:
        target_series = df[target]

    # preprocess feature
    if _dtype_represents_categories(df[feature]):
        one_hot_encoder = preprocessing.OneHotEncoder()
        array = df[feature].__array__()
        sparse_matrix = one_hot_encoder.fit_transform(array.reshape(-1, 1))
        feature_input = sparse_matrix
    else:
        # reshaping needed because there is only 1 feature
        feature_input = df[feature].values.reshape(-1, 1)

    # Cross-validation is stratifiedKFold for classification, KFold for regression
    # CV on one core (n_job=1; default) has shown to be fastest
    try:
        scores = cross_val_score(
        model, feature_input, target_series, cv=cross_validation, scoring=metric
        )
    except:
        return 0

    return scores.mean()


def _normalized_mae_score(model_mae, naive_mae):
    "Normalizes the model MAE score, given the baseline score"
    # # Value range of MAE is [0, infinity), 0 is best
    # 10, 5 ==> 0 because worse than naive
    # 10, 20 ==> 0.5
    # 5, 20 ==> 0.75 = 1 - (mae/base_mae)
    if model_mae > naive_mae:
        return 0
    else:
        return 1 - (model_mae / naive_mae)


def _mae_normalizer(df, y, model_score, **kwargs):
    "In case of MAE, calculates the baseline score for y and derives the PPS."
    df["naive"] = df[y].median()
    baseline_score = mean_absolute_error(df[y], df["naive"])  # true, pred

    ppscore = _normalized_mae_score(abs(model_score), baseline_score)
    return ppscore, baseline_score


def _normalized_f1_score(model_f1, baseline_f1):
    "Normalizes the model F1 score, given the baseline score"
    # # F1 ranges from 0 to 1
    # # 1 is best
    # 0.5, 0.7 ==> 0 because model is worse than naive baseline
    # 0.75, 0.5 ==> 0.5
    #
    if model_f1 < baseline_f1:
        return 0
    else:
        scale_range = 1.0 - baseline_f1  # eg 0.3
        f1_diff = model_f1 - baseline_f1  # eg 0.1
        return f1_diff / scale_range  # 0.1/0.3 = 0.33


def _f1_normalizer(df, y, model_score, random_seed):
    "In case of F1, calculates the baseline score for y and derives the PPS."
    label_encoder = preprocessing.LabelEncoder()
    df["truth"] = label_encoder.fit_transform(df[y])
    df["most_common_value"] = df["truth"].value_counts().index[0]
    random = df["truth"].sample(frac=1, random_state=random_seed)

    baseline_score = max(
        f1_score(df["truth"], df["most_common_value"], average="weighted"),
        f1_score(df["truth"], random, average="weighted"),
    )

    ppscore = _normalized_f1_score(model_score, baseline_score)
    return ppscore, baseline_score

TIME_SERIES = False
VALID_CALCULATIONS = {
    "regression": {
        "type": "regression",
        "is_valid_score": True,
        "model_score": TO_BE_CALCULATED,
        "baseline_score": TO_BE_CALCULATED,
        "ppscore": TO_BE_CALCULATED,
        "metric_name": "mean absolute error",
        "metric_key": "neg_mean_absolute_error",
        "model": tree.DecisionTreeRegressor(),
        "score_normalizer": _mae_normalizer,
    },
    "classification": {
        "type": "classification",
        "is_valid_score": True,
        "model_score": TO_BE_CALCULATED,
        "baseline_score": TO_BE_CALCULATED,
        "ppscore": TO_BE_CALCULATED,
        "metric_name": "weighted F1",
        "metric_key": "f1_weighted",
        "model":  tree.DecisionTreeClassifier(),
        "score_normalizer": _f1_normalizer,
    },
    "predict_itself": {
        "type": "predict_itself",
        "is_valid_score": True,
        "model_score": 1,
        "baseline_score": 0,
        "ppscore": 1,
        "metric_name": None,
        "metric_key": None,
        "model": None,
        "score_normalizer": None,
    },
    "target_is_constant": {
        "type": "target_is_constant",
        "is_valid_score": True,
        "model_score": 1,
        "baseline_score": 1,
        "ppscore": 0,
        "metric_name": None,
        "metric_key": None,
        "model": None,
        "score_normalizer": None,
    },
    "target_is_id": {
        "type": "target_is_id",
        "is_valid_score": True,
        "model_score": 0,
        "baseline_score": 0,
        "ppscore": 0,
        "metric_name": None,
        "metric_key": None,
        "model": None,
        "score_normalizer": None,
    },
    "feature_is_id": {
        "type": "feature_is_id",
        "is_valid_score": True,
        "model_score": 0,
        "baseline_score": 0,
        "ppscore": 0,
        "metric_name": None,
        "metric_key": None,
        "model": None,
        "score_normalizer": None,
    },
}

INVALID_CALCULATIONS = [
    "target_is_datetime",
    "target_data_type_not_supported",
    "empty_dataframe_after_dropping_na",
    "unknown_error",
]


def _dtype_represents_categories(series) -> bool:
    "Determines if the dtype of the series represents categorical values"
    return (
        is_bool_dtype(series)
        or is_object_dtype(series)
        or is_string_dtype(series)
        or is_categorical_dtype(series)
    )


def _determine_case_and_prepare_df(df, x, y, sample=5_000, random_seed=123):
    "Returns str with the name of the determined case based on the columns x and y"
    if x == y:
        return df, "predict_itself"

    df = df[[x, y]]
    # IDEA: log.warning when values have been dropped
    df = df.dropna()

    if len(df) == 0:
        return df, "empty_dataframe_after_dropping_na"
        # IDEA: show warning
        # raise Exception(
        #     "After dropping missing values, there are no valid rows left"
        # )

    df = _maybe_sample(df, sample, random_seed=random_seed)

    if _feature_is_id(df, x):
        return df, "feature_is_id"

    category_count = df[y].value_counts().count()
    if category_count == 1:
        # it is helpful to separate this case in order to save unnecessary calculation time
        return df, "target_is_constant"
    if _dtype_represents_categories(df[y]) and (category_count == len(df[y])):
        # it is important to separate this case in order to save unnecessary calculation time
        return df, "target_is_id"

    if _dtype_represents_categories(df[y]):
        return df, "classification"
    if is_numeric_dtype(df[y]):
        # this check needs to be after is_bool_dtype (which is part of _dtype_represents_categories) because bool is considered numeric by pandas
        return df, "regression"

    if is_datetime64_any_dtype(df[y]) or is_timedelta64_dtype(df[y]):
        # IDEA: show warning
        # raise TypeError(
        #     f"The target column {y} has the dtype {df[y].dtype} which is not supported. A possible solution might be to convert {y} to a string column"
        # )
        return df, "target_is_datetime"

    # IDEA: show warning
    # raise Exception(
    #     f"Could not infer a valid task based on the target {y}. The dtype {df[y].dtype} is not yet supported"
    # )  # pragma: no cover
    return df, "target_data_type_not_supported"


def _feature_is_id(df, x):
    "Returns Boolean if the feature column x is an ID"
    if not _dtype_represents_categories(df[x]):
        return False

    category_count = df[x].value_counts().count()
    return category_count == len(df[x])


def _maybe_sample(df, sample, random_seed=None):
    """
    Maybe samples the rows of the given df to have at most `sample` rows
    If sample is `None` or falsy, there will be no sampling.
    If the df has fewer rows than the sample, there will be no sampling.
    Parameters
    ----------
    df : pandas.DataFrame
        Dataframe that might be sampled
    sample : int or `None`
        Number of rows to be sampled
    random_seed : int or `None`
        Random seed that is forwarded to pandas.DataFrame.sample as `random_state`
    Returns
    -------
    pandas.DataFrame
        DataFrame after potential sampling
    """
    if sample and len(df) > sample:
        # this is a problem if x or y have more than sample=5000 categories
        # TODO: dont sample when the problem occurs and show warning
        df = df.sample(sample, random_state=random_seed, replace=False)
    return df


def _is_column_in_df(column, df):
    try:
        return column in df.columns
    except:
        return False


def _score(
    df, x, y, task, sample, cross_validation, random_seed, invalid_score, catch_errors
):
    df, case_type = _determine_case_and_prepare_df(
        df, x, y, sample=sample, random_seed=random_seed
    )
    task = _get_task(case_type, invalid_score)

    if case_type in ["classification", "regression"]:
        model_score = _calculate_model_cv_score_(
            df,
            target=y,
            feature=x,
            task=task,
            cross_validation=cross_validation,
            random_seed=random_seed,
        )
        # IDEA: the baseline_scores do sometimes change significantly, e.g. for F1 and thus change the PPS
        # we might want to calculate the baseline_score 10 times and use the mean in order to have less variance
        ppscore, baseline_score = task["score_normalizer"](
            df, y, model_score, random_seed=random_seed
        )
    else:
        model_score = task["model_score"]
        baseline_score = task["baseline_score"]
        ppscore = task["ppscore"]

    return {
        "x": x,
        "y": y,
        "ppscore": ppscore,
        "case": case_type,
        "is_valid_score": task["is_valid_score"],
        "metric": task["metric_name"],
        "baseline_score": baseline_score,
        "model_score": abs(model_score),  # sklearn returns negative mae
        "model": task["model"],
    }


def score(
    df,
    x,
    y,
    pipeline = [],
    time_series = False,
    task=NOT_SUPPORTED_ANYMORE,
    sample=5_000,
    cross_validation=4,
    random_seed=123,
    invalid_score=0,
    catch_errors=True,
):
    """
    Calculate the Predictive Power Score (PPS) for "x predicts y"
    The score always ranges from 0 to 1 and is data-type agnostic.
    A score of 0 means that the column x cannot predict the column y better than a naive baseline model.
    A score of 1 means that the column x can perfectly predict the column y given the model.
    A score between 0 and 1 states the ratio of how much potential predictive power the model achieved compared to the baseline model.
    Parameters
    ----------
    df : pandas.DataFrame
        Dataframe that contains the columns x and y
    x : str
        Name of the column x which acts as the feature
    y : str
        Name of the column y which acts as the target
    sample : int or `None`
        Number of rows for sampling. The sampling decreases the calculation time of the PPS.
        If `None` there will be no sampling.
    cross_validation : int
        Number of iterations during cross-validation. This has the following implications:
        For example, if the number is 4, then it is possible to detect patterns when there are at least 4 times the same observation. If the limit is increased, the required minimum observations also increase. This is important, because this is the limit when sklearn will throw an error and the PPS cannot be calculated
    random_seed : int or `None`
        Random seed for the parts of the calculation that require random numbers, e.g. shuffling or sampling.
        If the value is set, the results will be reproducible. If the value is `None` a new random number is drawn at the start of each calculation.
    invalid_score : any
        The score that is returned when a calculation is invalid, e.g. because the data type was not supported.
    catch_errors : bool
        If `True` all errors will be catched and reported as `unknown_error` which ensures convenience. If `False` errors will be raised. This is helpful for inspecting and debugging errors.
    Returns
    -------
    Dict
        A dict that contains multiple fields about the resulting PPS.
        The dict enables introspection into the calculations that have been performed under the hood
    """
    global TIME_SERIES
    if time_series: TIME_SERIES = True
    else: TIME_SERIES = False
    
    global VALID_CALCULATIONS
    if pipeline:
        VALID_CALCULATIONS['regression']['model'] = Pipeline(pipeline + [('tree', tree.DecisionTreeRegressor())] )
        VALID_CALCULATIONS['classification']['model'] = Pipeline( pipeline + [('tree', tree.DecisionTreeClassifier())] )
    else:
        VALID_CALCULATIONS['regression']['model'] = tree.DecisionTreeRegressor()
        VALID_CALCULATIONS['classification']['model'] = tree.DecisionTreeClassifier()

    if not isinstance(df, pd.DataFrame):
        raise TypeError(
            f"The 'df' argument should be a pandas.DataFrame but you passed a {type(df)}\nPlease convert your input to a pandas.DataFrame"
        )
    if not _is_column_in_df(x, df):
        raise ValueError(
            f"The 'x' argument should be the name of a dataframe column but the variable that you passed is not a column in the given dataframe.\nPlease review the column name or your dataframe"
        )
    if len(df[[x]].columns) >= 2:
        raise AssertionError(
            f"The dataframe has {len(df[[x]].columns)} columns with the same column name {x}\nPlease adjust the dataframe and make sure that only 1 column has the name {x}"
        )
    if not _is_column_in_df(y, df):
        raise ValueError(
            f"The 'y' argument should be the name of a dataframe column but the variable that you passed is not a column in the given dataframe.\nPlease review the column name or your dataframe"
        )
    if len(df[[y]].columns) >= 2:
        raise AssertionError(
            f"The dataframe has {len(df[[y]].columns)} columns with the same column name {y}\nPlease adjust the dataframe and make sure that only 1 column has the name {y}"
        )
    if task is not NOT_SUPPORTED_ANYMORE:
        raise AttributeError(
            "The attribute 'task' is no longer supported because it led to confusion and inconsistencies.\nThe task of the model is now determined based on the data types of the columns. If you want to change the task please adjust the data type of the column.\nFor more details, please refer to the README"
        )

    if random_seed is None:
        from random import random

        random_seed = int(random() * 1000)

    try:
        return _score(
            df,
            x,
            y,
            task,
            sample,
            cross_validation,
            random_seed,
            invalid_score,
            catch_errors,
        )
    except Exception as exception:
        if catch_errors:
            case_type = "unknown_error"
            task = _get_task(case_type, invalid_score)
            return {
                "x": x,
                "y": y,
                "ppscore": task["ppscore"],
                "case": case_type,
                "is_valid_score": task["is_valid_score"],
                "metric": task["metric_name"],
                "baseline_score": task["baseline_score"],
                "model_score": task["model_score"],  # sklearn returns negative mae
                "model": task["model"],
            }
        else:
            raise exception


def _get_task(case_type, invalid_score):
    if case_type in VALID_CALCULATIONS.keys():
        return VALID_CALCULATIONS[case_type]
    elif case_type in INVALID_CALCULATIONS:
        return {
            "type": case_type,
            "is_valid_score": False,
            "model_score": invalid_score,
            "baseline_score": invalid_score,
            "ppscore": invalid_score,
            "metric_name": None,
            "metric_key": None,
            "model": None,
            "score_normalizer": None,
        }
    raise Exception(f"case_type {case_type} is not supported")


def _format_list_of_dicts(scores, output, sorted):
    """
    Format list of score dicts `scores`
    - maybe sort by ppscore
    - maybe return pandas.Dataframe
    - output can be one of ["df", "list"]
    """
    if sorted:
        scores.sort(key=lambda item: item["ppscore"], reverse=True)

    if output == "df":
        df_columns = [
            "x",
            "y",
            "ppscore",
            "case",
            "is_valid_score",
            "metric",
            "baseline_score",
            "model_score",
            "model",
        ]
        data = {column: [score[column] for score in scores] for column in df_columns}
        scores = pd.DataFrame.from_dict(data)

    return scores


def predictors(df, y, output="df", pipeline = [], time_series=False, sorted=True, **kwargs):
    """
    Calculate the Predictive Power Score (PPS) of all the features in the dataframe
    against a target column
    Parameters
    ----------
    df : pandas.DataFrame
        The dataframe that contains the data
    y : str
        Name of the column y which acts as the target
    output: str - potential values: "df", "list"
        Control the type of the output. Either return a pandas.DataFrame (df) or a list with the score dicts
    sorted: bool
        Whether or not to sort the output dataframe/list by the ppscore
    pipeline: list
        list of transformstions to be included in the processing of the data
    kwargs:
        Other key-word arguments that shall be forwarded to the pps.score method,
        e.g. `sample, `cross_validation, `random_seed, `invalid_score`, `catch_errors`
    Returns
    -------
    pandas.DataFrame or list of Dict
        Either returns a tidy dataframe or a list of all the PPS dicts. This can be influenced
        by the output argument
    """
     
    if not isinstance(df, pd.DataFrame):
        raise TypeError(
            f"The 'df' argument should be a pandas.DataFrame but you passed a {type(df)}\nPlease convert your input to a pandas.DataFrame"
        )
    if not _is_column_in_df(y, df):
        raise ValueError(
            f"The 'y' argument should be the name of a dataframe column but the variable that you passed is not a column in the given dataframe.\nPlease review the column name or your dataframe"
        )
    if len(df[[y]].columns) >= 2:
        raise AssertionError(
            f"The dataframe has {len(df[[y]].columns)} columns with the same column name {y}\nPlease adjust the dataframe and make sure that only 1 column has the name {y}"
        )
    if not output in ["df", "list"]:
        raise ValueError(
            f"""The 'output' argument should be one of ["df", "list"] but you passed: {output}\nPlease adjust your input to one of the valid values"""
        )
    if not sorted in [True, False]:
        raise ValueError(
            f"""The 'sorted' argument should be one of [True, False] but you passed: {sorted}\nPlease adjust your input to one of the valid values"""
        )

    scores = [score(df, column, y, pipeline, time_series, **kwargs) for column in df if column != y]

    return _format_list_of_dicts(scores=scores, output=output, sorted=sorted)


def matrix(df, output="df", sorted=False, **kwargs):
    """
    Calculate the Predictive Power Score (PPS) matrix for all columns in the dataframe
    Parameters
    ----------
    df : pandas.DataFrame
        The dataframe that contains the data
    output: str - potential values: "df", "list"
        Control the type of the output. Either return a pandas.DataFrame (df) or a list with the score dicts
    sorted: bool
        Whether or not to sort the output dataframe/list by the ppscore
    kwargs:
        Other key-word arguments that shall be forwarded to the pps.score method,
        e.g. `sample, `cross_validation, `random_seed, `invalid_score`, `catch_errors`
    Returns
    -------
    pandas.DataFrame or list of Dict
        Either returns a tidy dataframe or a list of all the PPS dicts. This can be influenced
        by the output argument
    """
    if not isinstance(df, pd.DataFrame):
        raise TypeError(
            f"The 'df' argument should be a pandas.DataFrame but you passed a {type(df)}\nPlease convert your input to a pandas.DataFrame"
        )
    if not output in ["df", "list"]:
        raise ValueError(
            f"""The 'output' argument should be one of ["df", "list"] but you passed: {output}\nPlease adjust your input to one of the valid values"""
        )
    if not sorted in [True, False]:
        raise ValueError(
            f"""The 'sorted' argument should be one of [True, False] but you passed: {sorted}\nPlease adjust your input to one of the valid values"""
        )

    scores = [score(df, x, y, **kwargs) for x in df for y in df]

    return _format_list_of_dicts(scores=scores, output=output, sorted=sorted)