Overall Statistics |
Total Trades 0 Average Win 0% Average Loss 0% Compounding Annual Return 0% Drawdown 0% Expectancy 0 Net Profit 0% Sharpe Ratio 0 Probabilistic Sharpe Ratio 0% Loss Rate 0% Win Rate 0% Profit-Loss Ratio 0 Alpha 0 Beta 0 Annual Standard Deviation 0 Annual Variance 0 Information Ratio -0.547 Tracking Error 0.194 Treynor Ratio 0 Total Fees $0.00 Estimated Strategy Capacity $0 Lowest Capacity Asset |
class NullAlgorithm(QCAlgorithm): def Initialize(self): pass
#### Version 3.3.1 from QuantConnect.DataSource import * from AlgorithmImports import * import plotly.graph_objects as go from plotly.subplots import make_subplots import plotly.express as px import statsmodels.api as sm from sklearn.linear_model import LinearRegression, LogisticRegression from sklearn.metrics import mean_squared_error from sklearn.svm import SVC from sklearn.tree import DecisionTreeClassifier from sklearn.ensemble import RandomForestClassifier from sklearn.decomposition import PCA from math import sqrt from IPython.display import display from scipy.stats import rankdata, ttest_ind, jarque_bera, mannwhitneyu, pearsonr, spearmanr, chisquare from pytz import timezone class DatasetAnalyzer: """ A class to analyze datasets listed on the QC data market. """ def __init__(self, dataset, dataset_tickers, universe, linked_dataset, security_type, factor_by_name, datasets_for_correlation_test, sparse_data, dataset_start_date, in_sample_end_date, out_of_sample_end_date, return_prediction_period=1, marker_size=3): """ Input: - dataset Class type of the dataset to analyze - dataset_tickers List of tickers of dataset links. For linked datasets => equity tickers. For unlinked datasets => dataset ticker ("REG" for Regalytics) - universe A list of tickers to use when analyzing the relationship between the dataset and security returns. If this list is empty, SPY is used as the default benchmark security. - linked_dataset Boolean to distinguish if the dataset is linked or unlinked to securities. - security_type Security type enum that describes the `universe` tickers. - factor_by_name Dictionary of `Factor`s to analyze, keyed by the factor name. The dictionary keys should match the column names that are returned when performing a history request on the dataset. - datasets_for_correlation_test List of other dataset class names to analyze the correlation with the `dataset`. - sparse_data Boolean to represent if the `dataset` is sparse. - dataset_start_date Start date of the dataset. Retrievable from the dataset listing. - in_sample_end_date Date to mark the end of the in-sample period. - out_of_sample_end_date Date to mark the end of the out-of-sample period. - return_prediction_period Number of days positions would be held for. """ self.qb = QuantBook() self.dataset = dataset self.dataset_tickers = dataset_tickers self.factor_by_name = factor_by_name self.datasets_for_correlation_test = datasets_for_correlation_test self.sparse_data = sparse_data self.dataset_start_date = dataset_start_date self.in_sample_end_date = in_sample_end_date self.out_of_sample_end_date = out_of_sample_end_date self.return_prediction_period = return_prediction_period self.transformed_dataset_history = pd.DataFrame() self.hypothesis_test_p_value = {} self.marker_size = marker_size self.security_symbols = [] self.dataset_symbols = [] if not linked_dataset: if len(universe) == 0: # Subscribe to SPY index security = self.qb.AddEquity("SPY", Resolution.Daily) self.security_timezone = security.Exchange.TimeZone self.security_symbols.append(security.Symbol) else: # Subscribe to the universe constituents for ticker in universe: security = self.qb.AddSecurity(security_type, ticker, Resolution.Daily) self.security_timezone = security.Exchange.TimeZone self.security_symbols.append(security.Symbol) # Subscribe to the dataset for ticker in dataset_tickers: dataset_subscription = self.qb.AddData(dataset, ticker) self.dataset_timezone = dataset_subscription.Exchange.TimeZone self.dataset_symbols.append(dataset_subscription.Symbol) else: for ticker in dataset_tickers: # Subscribe to the universe price data security = self.qb.AddSecurity(security_type, ticker, Resolution.Daily) self.security_timezone = security.Exchange.TimeZone self.security_symbols.append(security.Symbol) # Subscribe to the dataset dataset_subscription = self.qb.AddData(dataset, security.Symbol) self.dataset_timezone = dataset_subscription.Exchange.TimeZone self.dataset_symbols.append(dataset_subscription.Symbol) def get_data(self): """ Retrieves historical price data for the universe securities and historical data for the factors under analysis. The first 5 rows of the raw dataset history DataFrame is displayed, then the value functions for each of the factors are applied. The timestamps of the dataset history DataFrame are adjusted to match the timezone of the security. To put everything on a daily resolution, the intraday timestamps of the dataset history DataFrame are moved to midnight. If the dataset has sparse data, the historical data is filled forward. Lastly, the timestamps of the price and dataset history are aligned. """ # Request historical price data self.price_history = self.qb.History(self.security_symbols, self.dataset_start_date, self.out_of_sample_end_date).close.unstack(level=0) # Calculate historical returns self.return_history = self.price_history.pct_change(self.return_prediction_period).shift(-self.return_prediction_period).dropna() # Request historical dataset data self.dataset_history = self.qb.History(self.dataset_symbols, self.dataset_start_date, self.out_of_sample_end_date) factors = list(self.factor_by_name.keys()) # Reformat DataFrame self.dataset_history = self.dataset_history[factors].unstack(level=0) # Show the raw data display(self.dataset_history.head()) self.transformed_dataset_history = pd.DataFrame() for factor in factors: df = self.factor_by_name[factor].value_function(self.dataset_history[factor], self.return_history.index, self.dataset_timezone, self.security_timezone) df.columns = pd.MultiIndex.from_tuples([(factor, col) for col in df.columns]) self.transformed_dataset_history = pd.concat([self.transformed_dataset_history, df], axis=1) # Drop nan values that could be created by `pd.concat` in the line above self.transformed_dataset_history.dropna(inplace=True) self.return_sub_history = self.return_history.reindex(self.transformed_dataset_history.index) # Use a scatter plot if the factor values are sparse, otherwise use a line chart self.dataset_plotting_mode = 'markers' if self.sparse_data else 'lines' # Align all timestamps self.price_history = self.price_history.reindex(self.return_history.index) def _round_to_midnight(self, time): """ Rounds the given time to midnight. If the time is at or before 9:30am, it's rounded down. Otherwise, it's rounded up. Input: - time Time to be rounded. Returns the rounded time. """ if isinstance(time, date): return time.date() if time.hour == time.minute == time.second == 0: return pd.Timestamp(time.date()).date() # If the timestamp is before market open, round down to previous midnight if time.hour < 9 or time.hour ==9 and time.minute <= 30: return pd.Timestamp(time.date()).date() # Round up to next midnight return pd.Timestamp(time.date() + timedelta(days=1)).date() def plot_data_shape(self): """ Displays a plot of security returns in conjunction with the factor values after the value function has been applied. Only the first security in the universe is used to create the plots. """ num_rows = 1 + len(self.factor_by_name) # Create Plotly figure titles = ["Returns"] for factor_name, factor in self.factor_by_name.items(): titles.append(f"{factor.printable_name} Factor Values") fig = make_subplots(rows=num_rows, cols=1, shared_xaxes=True, subplot_titles=tuple(titles)) current_row = 1 for equity_symbol_index, security_symbol in enumerate(self.security_symbols[:1]): # Plot security returns returns = self.return_history[security_symbol] returns = returns[returns.index >= self.transformed_dataset_history.index[0]] fig.append_trace(go.Scatter(x=returns.index, y = returns, mode = 'lines'), row=current_row, col=1) current_row += 1 for factor in self.factor_by_name.keys(): # Plot factor values dataset_symbol = self.dataset_symbols[equity_symbol_index] factor_values = self.transformed_dataset_history[factor][dataset_symbol] printable_name = self.factor_by_name[factor].printable_name fig.append_trace(go.Scatter(x=factor_values.index, y = factor_values, mode = self.dataset_plotting_mode, marker=dict(size=self.marker_size)), row=current_row, col=1) current_row += 1 fig.update_layout(title_text=f"Time Series of Security Returns and Factor Values for {str(security_symbol)}", margin=dict(l=0, r=0, b=0), showlegend=False, height = (current_row-1) * 200) fig.update_xaxes(title="Date", row=current_row-1, col=1) # Show the plot fig.show() def measure_correlation_and_regressions(self): """ Displays a scatter plot to show the impact of each factor on the universe returns. A OLS regression line is included in the plot. """ for factor, specs in self.factor_by_name.items(): all_returns = np.array([]) all_factor_values = np.array([]) for equity_symbol_index, security_symbol in enumerate(self.security_symbols): # y-axis equity_returns = self.return_sub_history[security_symbol] # x-axis dataset_symbol = self.dataset_symbols[equity_symbol_index] factor_values = self.transformed_dataset_history[factor][dataset_symbol] all_returns = np.append(all_returns, equity_returns.values) all_factor_values = np.append(all_factor_values, factor_values.values) all_factor_values = pd.Series(all_factor_values) all_returns = pd.Series(all_returns) self.hypothesis_test_p_value[factor] = self._get_p_value(all_factor_values, all_returns, specs.data_type, 'continuous') # Define a method to write well-formated statistical properties in the annotation def convert_to_legend_format(value): rounded = round(value, 5) if rounded != 0: return str(rounded) return "{:e}".format(value) # Scientific notation # Fit a regression model model = LinearRegression() reg = model.fit(sm.add_constant(all_factor_values.values), all_returns) # Save results corr = convert_to_legend_format(all_factor_values.corr(all_returns)) r_square = convert_to_legend_format(model.score(sm.add_constant(all_factor_values.values), all_returns)) alpha = convert_to_legend_format(reg.intercept_) beta = convert_to_legend_format(reg.coef_[1]) p_value = convert_to_legend_format(self.hypothesis_test_p_value[factor]) my_df = pd.concat([all_returns, all_factor_values], axis=1) fig = px.scatter(my_df, x=1, y=0, trendline="ols", render_mode="svg", size_max=self.marker_size) printable_factor_name = self.factor_by_name[factor].printable_name fig.update_layout(title=f"Correlation Between {printable_factor_name} Factor and Returns", xaxis_title=f"{printable_factor_name} Factor Values", yaxis_title=f"{self.return_prediction_period} Day Future Returns", margin=dict(l=0, r=0, b=0), legend=dict(yanchor="top", y=0.99, xanchor="left", x=0.01), legend_bgcolor= 'rgba(255, 255, 255, 0.5)', annotations=[ go.layout.Annotation( text=f'Correlation: {corr}<br>R-square: {r_square}<br>Alpha: {alpha}<br>Beta: {beta}<br>P-value: {p_value}', align='left', showarrow=False, xref='paper', yref='paper', x=1, y=1, bordercolor='black', borderwidth=1, bgcolor='white', opacity=0.75 ) ]) fig.update_traces(marker=dict(size=self.marker_size)) fig.show() output_string = f"Correlation: {corr}, " + \ f"R-Square: {r_square}, " + \ f"Alpha: {alpha}, " + \ f"Beta: {beta}, " + \ f"P-value: {p_value}.\n\n\n" print(output_string) def calculate_statistics(self): """ Displays a DataFrame of the following statistics: mean, std dev, skewness, kurtosis, & Jarque Bera test P-value. """ statistic_df = pd.DataFrame() for factor in self.factor_by_name.keys(): printable_factor_name = self.factor_by_name[factor].printable_name # Gather factor values for all the securities all_factor_values = pd.Series(self.transformed_dataset_history[factor].values.flatten()) statistic_df.loc['Mean', printable_factor_name] = all_factor_values.mean() statistic_df.loc['Standard deviation', printable_factor_name] = all_factor_values.std() statistic_df.loc['Skewness', printable_factor_name] = all_factor_values.skew() statistic_df.loc['Kurtosis', printable_factor_name] = all_factor_values.kurt() if self.factor_by_name[factor].data_type == 'continuous': p_value = jarque_bera(all_factor_values).pvalue else: p_value = 'N/A' statistic_df.loc['Jarque Bera test P-value', printable_factor_name] = p_value display(statistic_df) def calculate_other_dataset_correlations(self): """ Displays two DataFrames: - The first DataFrame shows the correlation between each of the factors in the `dataset` - The second DataFrame shows the correlation each of the factors in the `dataset` and their respective correlation with the factors of the datasets in the `datasets_for_correlation_test` list. """ # Factor correlation within the main dataset factor_df = pd.DataFrame() for factor in self.factor_by_name.keys(): factor_df[factor] = pd.Series(self.transformed_dataset_history[factor].values.flatten()) display(factor_df.corr()) # For linked securities, select the columns of the securities that are present in both datasets # For unlinked securities, duplicate the columns # To get one correlation value, move all of the columns in the DataFrames into one column before using `corr` def value_func(df): return df.apply(lambda row: row, axis=1) other_dataset_factor_by_class = { QuiverWallStreetBets: [OtherDatasetFactor('rank', 'discrete', SecurityType.Equity, True, value_func), OtherDatasetFactor('sentiment', 'continuous', SecurityType.Equity, True, value_func)], QuiverQuantTwitterFollowers: [OtherDatasetFactor('followers', 'discrete', SecurityType.Equity, True, value_func)], USTreasuryYieldCurveRate: [OtherDatasetFactor('onemonth', 'continuous', "USTYCR", False, value_func)] } results = pd.DataFrame() for dataset_class in self.datasets_for_correlation_test: for other_dataset_factor in other_dataset_factor_by_class[dataset_class]: dataset_class_str = str(dataset_class).split("'")[-2].split(".")[-1] start_date = self.transformed_dataset_history.index[0] end_date = self.transformed_dataset_history.index[-1] if other_dataset_factor.link == SecurityType.Equity: matching_symbols = [] other_dataset_df = pd.DataFrame() for symbol in self.security_symbols: # Subscribe to the dataset and request history dataset_symbol = self.qb.AddData(dataset_class, symbol).Symbol history = self.qb.History(dataset_symbol, start_date, end_date) history = history.loc[dataset_symbol][[other_dataset_factor.factor_name]].fillna(method='ffill') # Apply value function history = other_dataset_factor.value_func(history) matching_symbols.append(symbol) other_dataset_df[dataset_symbol] = history[other_dataset_factor.factor_name] for factor in self.factor_by_name.keys(): equity_symbol_df = pd.DataFrame() dataset_symbols = [] for symbol in matching_symbols: idx = self.security_symbols.index(symbol) dataset_symbols.append(self.dataset_symbols[idx]) aligned_transformed_dataset = self.transformed_dataset_history[factor][dataset_symbols].reindex(other_dataset_df.index) corr = pd.Series(aligned_transformed_dataset.values.flatten('F')).corr(pd.Series(other_dataset_df.values.flatten('F'))) printable_factor_name = self.factor_by_name[factor].printable_name results.loc[f"{dataset_class_str}.{other_dataset_factor.factor_name}", f'Correlation with {printable_factor_name}'] = corr hypothesis_test_p_value = self._get_p_value(aligned_transformed_dataset.values.flatten('F'), # .fillna(0) other_dataset_df.values.flatten('F'), # .fillna(0) self.factor_by_name[factor].data_type, other_dataset_factor.data_type) results.loc[f"{dataset_class_str}.{other_dataset_factor.factor_name}", f'P-value on Correlation with {printable_factor_name}'] = hypothesis_test_p_value else: try: dataset_symbol = self.qb.AddData(dataset_class, other_dataset_factor.link).Symbol history = self.qb.History(dataset_symbol, start_date, end_date) history = history.loc[dataset_symbol][other_dataset_factor.factor_name].fillna(method='ffill') except: continue history.index = history.index.map(self._round_to_midnight) history = pd.DataFrame(history) for i in range(len(self.security_symbols) - 1): history[f'column_{i}'] = history[history.columns[0]] for factor in self.factor_by_name.keys(): printable_factor_name = self.factor_by_name[factor].printable_name set_a = pd.Series(self.transformed_dataset_history[factor].reindex(history.index).values.flatten('F')) set_b = pd.Series(history.values.flatten('F')) results.loc[f"{dataset_class_str}.{other_dataset_factor.factor_name}", f'Correlation with {printable_factor_name}'] = set_a.corr(set_b) hypothesis_test_p_value = self._get_p_value(set_a.fillna(0), set_b.fillna(0), self.factor_by_name[factor].data_type, other_dataset_factor.data_type) results.loc[f"{dataset_class_str}.{other_dataset_factor.factor_name}", f'P-value on Correlation with {printable_factor_name}'] = hypothesis_test_p_value display(results) def _get_p_value(self, dataset_a, dataset_b, data_a_type, data_b_type, significance=0.05): """ Gets the p-value of two lists, considering the "type" of each list (continuous or discrete). Input: - dataset_a First list of values. - dataset_b Second list of values. - data_a_type Type of `dataset_a` ('continuous' or 'discrete'). - data_b_type Type of `dataset_b` ('continuous' or 'discrete'). - significance Level of significance to use for the statistical tests. Returns the p-value that results after applying the correct statistical test. """ np.seterr(divide='ignore') if isinstance(dataset_a, pd.Series) and isinstance(dataset_b, pd.Series): merge = pd.concat([dataset_a, dataset_b], axis=1).dropna(axis=0) dataset_a = merge.iloc[:, 0] dataset_b = merge.iloc[:, 1] else: merge = np.concatenate([dataset_a.reshape(-1, 1), dataset_b.reshape(-1, 1)], axis=1) merge = merge[~np.isnan(merge).any(axis=1), :] dataset_a = merge[:, 0] dataset_b = merge[:, 1] if data_b_type == 'continuous': if jarque_bera(dataset_b).pvalue < significance: if data_a_type == "continuous": return ttest_ind(dataset_a, dataset_b).pvalue return pearsonr(dataset_a, dataset_b)[1] # discrete data type else: if data_a_type == "continuous": return mannwhitneyu(dataset_a, dataset_b).pvalue return spearmanr(dataset_a, dataset_b)[1] # discrete data type return chisquare(dataset_a, dataset_b)[1] def run_logistic_regression(self): """ Runs the logistic regression test. A line plot of results is diplayed, along with a DataFrame containing the model's accuracy. """ self.run_binary_ml_model(LogisticRegression(class_weight = 'balanced'), 'Accuracy', True, model_name='Logistic Regression') def produce_return_predictions(self): """ Runs the linear regression test. Two line plots are diplayed: - The first line plot shows the actual and predicted daily returns of the universe - The second line plot shows the actual and predicted equity curve of the universe A DataFrame containing the model's accuracy is also presented. """ results = pd.DataFrame() fig = go.Figure() # Gather in-sample data x = self.transformed_dataset_history.loc[:self.in_sample_end_date.date()] y = self.return_sub_history.loc[self.return_sub_history.index <= self.in_sample_end_date].mean(axis=1) x = sm.add_constant(x.values) # Fit model to in-sample data model = LinearRegression() reg = model.fit(x, y.values) results.loc['Alpha', 'value'] = reg.intercept_ #for i in range(1, len(reg.coef_)): # results.loc[f'Beta_{i}', 'value'] = reg.coef_[i] # Calculate in-sample mean squared error in_sample_predictions = model.predict(x) results.loc['In-sample MSE', 'value'] = mean_squared_error(y, in_sample_predictions) # Plot in-sample actual y-values fig.add_traces(go.Scatter(x=y.index, y = y.values, mode = 'markers', marker=dict(size=self.marker_size), name=f"In-sample Actual Returns")) # Plot in-sample predicted y-values fig.add_traces(go.Scatter(x=y.index, y = in_sample_predictions, mode = 'markers', marker=dict(size=self.marker_size), name=f"In-sample Predicted Returns")) # Gather out-of-sample-data x = self.transformed_dataset_history.loc[self.in_sample_end_date:] y = self.return_sub_history.loc[self.return_sub_history.index >= self.in_sample_end_date].mean(axis=1) x = sm.add_constant(x.values, has_constant='add') # Calculate out-of-sample mean squared error out_of_sample_predictions = model.predict(x) results.loc['Out-of-sample MSE', 'value'] = mean_squared_error(y.values, out_of_sample_predictions) # Plot out-of-sample actual y-values fig.add_traces(go.Scatter(x=y.index, y = y.values, mode = 'markers', marker=dict(size=self.marker_size), name=f"Out-of-sample Actual Returns")) # Plot out-of-sample predicted y-values fig.add_traces(go.Scatter(x=y.index, y = out_of_sample_predictions, mode = 'markers', marker=dict(size=self.marker_size), name=f"Out-of-sample Predicted Returns")) # Update layout fig.update_layout(title_text=f"Actual and Predicted Daily Returns From a Multiple Linear Regression Model", margin=dict(l=0, r=0, b=0), legend=dict(yanchor="top", y=0.99, xanchor="left", x=0.01), legend_bgcolor= 'rgba(255, 255, 255, 0.5)') fig.update_yaxes(title=f"{self.return_prediction_period} Day Future Returns") fig.update_xaxes(title="Date") # Show the figure fig.show() # Plot prediction uncertainty fig = go.Figure() mean_return = (self.return_history.loc[self.transformed_dataset_history.index].mean(axis=1) + 1) idx = pd.date_range(mean_return.index[0], mean_return.index[-1]) mean_return = mean_return.reindex(idx).fillna(1) mean_return = mean_return.cumprod() fig.add_traces(go.Scatter(x=mean_return.index, y = mean_return.values, mode = 'lines', name=f"Actual Equity Curve", line=dict(width=6))) in_sample_returns = (in_sample_predictions + 1) * mean_return.reindex(self.return_sub_history.index)[:len(in_sample_predictions)].shift(1).fillna(1) out_of_sample_returns = (out_of_sample_predictions + 1) * mean_return.reindex(self.return_sub_history.index)[-len(out_of_sample_predictions)-1:].shift(1)[1:] fig.add_traces(go.Scatter(x=in_sample_returns.index, y = in_sample_returns.values, mode = self.dataset_plotting_mode, marker=dict(size=self.marker_size), name=f"Predicted In-Sample Equity Curve")) fig.add_traces(go.Scatter(x=out_of_sample_returns.index, y = out_of_sample_returns.values, mode = self.dataset_plotting_mode, marker=dict(size=self.marker_size), name=f"Predicted Out-Of-Sample Equity Curve")) last_equity_value = mean_return.reindex(self.return_sub_history.index)[len(in_sample_predictions)] avg_daily_return = self.return_history.mean(axis=1).loc[:self.in_sample_end_date.date()].mean() std_equity = self.return_history.mean(axis=1).loc[:self.in_sample_end_date.date()].std() for curve_std in [2, -2]: cone_points = [] for period in range(1, len(self.return_history.loc[self.in_sample_end_date.date():])+1):#len(out_of_sample_predictions)+1): cone_point = last_equity_value + (avg_daily_return * period + sqrt(period)*(curve_std*std_equity)) cone_points.append(cone_point) # plot cone sign = "-" if curve_std < 0 else "+" fig.add_traces(go.Scatter(x=out_of_sample_returns.index, y=cone_points, mode='lines', name=f'{sign}{abs(curve_std)} STD Prediction Range')) # Update layout fig.update_layout(title_text=f"Equity Curve of Multiple Linear Regression Model", margin=dict(l=0, r=0, b=0), legend=dict(yanchor="top", y=0.99, xanchor="left", x=0.01), legend_bgcolor= 'rgba(255, 255, 255, 0.5)') fig.update_yaxes(title=f"Equity") fig.update_xaxes(title="Date") # Show the prediction cone fig.show() # Display results display(results) def find_most_informative_factors(self): """ Displays a DataFrame that shows the percentage each factor contributes to the first component after performing PCA. A PCA is performed for each security in the universe. """ # Source: https://stackoverflow.com/questions/50796024/feature-variable-importance-after-a-pca-analysis if len(self.factor_by_name) == 1: print("Principal component analysis is only available when analyzing multiple factors.") return results = pd.DataFrame() for symbol_index, symbol in enumerate(self.security_symbols): dataset_symbol = self.dataset_symbols[symbol_index] data = pd.DataFrame() for factor in self.factor_by_name.keys(): data[factor] = self.transformed_dataset_history[factor][dataset_symbol] pca = PCA(n_components='mle') # Standardize if the data isn't already fit to a normal distribution #if standardize: # data = (data - data.mean()) / data.std() # Perform PCA dimensionality reduction pca.fit(data) contributions_to_first_component = pca.explained_variance_ratio_.dot(pca.components_) factor_importance_pct = contributions_to_first_component / sum(contributions_to_first_component) for i, factor in enumerate(data.columns): printable_factor_name = self.factor_by_name[factor].printable_name results.loc[str(symbol), printable_factor_name] = factor_importance_pct[i] display(results) def run_linear_regression(self, display_df=True, model_name=None): """ Trains a multiple linear regression model on the in-sample period and tests it's performance on the out-of-sample period. Input: - display_df Boolean to indicate if the results DataFrame should be displayed - model_name String describing the name of the model (Linear Regression). Only used if `display_df` == False. Returns the results of the regression model if not displaying the result DataFrame. """ results = pd.DataFrame() # Gather in-sample data inclusive = [factor for factor, pvalue in self.hypothesis_test_p_value.items() if pvalue < 0.05] x = self.transformed_dataset_history.loc[:self.in_sample_end_date.date(), inclusive] #x = self.transformed_dataset_history.loc[:self.in_sample_end_date.date()] y = self.return_sub_history.loc[self.return_sub_history.index <= self.in_sample_end_date].mean(axis=1) x = sm.add_constant(x.values) # Fit model to in-sample data model = LinearRegression() reg = model.fit(x, y.values) # Calculate in-sample mean squared error in_sample_predictions = model.predict(x) in_sample_score = mean_squared_error(y, in_sample_predictions) results.loc['In-sample MSE', 'value'] = in_sample_score # Gather out-of-sample-data x = self.transformed_dataset_history.loc[self.in_sample_end_date:, inclusive] y = self.return_sub_history.loc[self.return_sub_history.index >= self.in_sample_end_date].mean(axis=1) x = sm.add_constant(x.values) # Calculate out-of-sample mean squared error out_of_sample_predictions = model.predict(x) out_of_sample_score = mean_squared_error(y.values, out_of_sample_predictions) results.loc['Out-of-sample MSE', 'value'] = out_of_sample_score if display_df: display(results) return return {'model_name': 'Linear Regression', 'metric': 'MSE', 'In-sample': in_sample_score, 'Out-of-sample': out_of_sample_score} def run_svm_classifier(self, display_df=True, model_name=None): """ Trains a support vector machine classifier model to predict the direction of the next day given the factor values of the current day. Input: - display_df Boolean to indicate if the results DataFrame should be displayed - model_name String describing the name of the model (Linear Regression). Only used if `display_df` == False. Returns the results of the model if not displaying the result DataFrame. """ return self.run_binary_ml_model(SVC(class_weight = 'balanced'), 'Accuracy', display_df=display_df, model_name=model_name) def run_decision_tree_classifier(self, display_df=True, model_name=None): """ Trains a decision tree classifier model to predict the direction of the next day given the factor values of the current day. Input: - display_df Boolean to indicate if the results DataFrame should be displayed - model_name String describing the name of the model (Linear Regression). Only used if `display_df` == False. Returns the results of the model if not displaying the result DataFrame. """ return self.run_binary_ml_model(DecisionTreeClassifier(max_depth=3, class_weight = 'balanced'), 'Accuracy', display_df=display_df, model_name=model_name) def run_random_forest_classifier(self, display_df=True, model_name=None): """ Trains a random forest classifier model to predict the direction of the next day given the factor values of the current day. Input: - display_df Boolean to indicate if the results DataFrame should be displayed - model_name String describing the name of the model (Linear Regression). Only used if `display_df` == False. Returns the results of the model if not displaying the result DataFrame. """ return self.run_binary_ml_model(RandomForestClassifier(max_depth=3, random_state = 1990, class_weight = 'balanced'), 'Accuracy', display_df=display_df, model_name=model_name) def run_binary_ml_model(self, model, score_name, plot=False, display_df=True, model_name=None): """ Trains a machine learning model to predict the direction of the next day given the factor values of the current day. Input: - model Class of the ML model to use. - score_name Name of the metric used to evaluate the model - plot Boolean to indicator if the actual values and the model's predictions should be displayed - display_df Boolean to indicate if the results DataFrame should be displayed - model_name String describing the name of the model (Linear Regression). Only used if `display_df` == False. Returns the results of the model if not displaying the result DataFrame. """ # Make the binany price return series binary_price_return = self.return_sub_history.mean(axis=1).copy() binary_price_return[binary_price_return <= 0] = -1 binary_price_return[binary_price_return > 0] = 1 results = pd.DataFrame() # Gather in-sample data inclusive = [factor for factor, pvalue in self.hypothesis_test_p_value.items() if pvalue < 0.05] x = self.transformed_dataset_history.loc[:self.in_sample_end_date.date(), inclusive] #x = self.transformed_dataset_history.loc[:self.in_sample_end_date.date()] y = binary_price_return.loc[binary_price_return.index <= self.in_sample_end_date] x = sm.add_constant(x.values) # Fit model to in-sample data model.fit(x, y) # Calculate in-sample mean squared error in_sample_predictions = model.predict(x) in_sample_score = model.score(x, y) results.loc[f'In-sample {score_name}', 'value'] = in_sample_score if plot: # For logistic regression titles = ['Predicted Price Movement Direction From a Logistic Regression Model (Up=1, Down=-1)', 'Accuracy of Logistic Regression Model Predictions (Correct=1, Incorrect=-1)'] fig = make_subplots(rows=2, cols=1, shared_xaxes=True, subplot_titles=tuple(titles)) fig.append_trace(go.Bar(x=y.index, y=in_sample_predictions), row=1, col=1) in_sample_accuracy = (y == in_sample_predictions).apply(lambda x: 1 if x else -1) fig.append_trace(go.Bar(x=in_sample_accuracy.index, y=in_sample_accuracy.values), row=2, col=1) # Gather out-of-sample-data x = self.transformed_dataset_history.loc[self.in_sample_end_date:, inclusive] y = binary_price_return.loc[binary_price_return.index >= self.in_sample_end_date] x = sm.add_constant(x.values) # Calculate out-of-sample mean squared error out_of_sample_predictions = model.predict(x) out_of_sample_score = model.score(x, y) results.loc[f'Out-of-sample {score_name}', 'value'] = out_of_sample_score if plot: # For logistic regression fig.append_trace(go.Bar(x=y.index, y=out_of_sample_predictions), row=1, col=1) out_of_sample_accuracy = (y == out_of_sample_predictions).apply(lambda x: 1 if x else -1) fig.append_trace(go.Bar(x=out_of_sample_accuracy.index, y=out_of_sample_accuracy.values), row=2, col=1) fig.update_layout(margin=dict(l=0, r=0, b=0), showlegend=False, plot_bgcolor='rgba(0,0,0,0)') fig.update_xaxes(title="Date") # Display figure fig.show() if display_df: # Display results display(results) return return {'model_name': model_name, 'metric': score_name, 'In-sample': in_sample_score, 'Out-of-sample': out_of_sample_score} def display_ml_model_summary(self): """ Displays a DataFrame showing the scores of each machine learning model in the in-sample and out-of-sample periods. """ model_results = [ self.run_linear_regression(False, 'Linear Regression'), self.run_svm_classifier(False, 'Support Vector Machine'), self.run_decision_tree_classifier(False, 'Decision Tree'), self.run_random_forest_classifier(False, 'Random Forest') ] result_df = pd.DataFrame() for result in model_results: model_name = result['model_name'] metric = result['metric'] for result_type in ['In-sample', 'Out-of-sample']: result_df.loc[f"{model_name} {metric}", result_type] = result[result_type] display(result_df) class Factor: def __init__(self, value_function, data_type, printable_name, standardize=True): self.value_function = value_function self.data_type = data_type self.printable_name = printable_name self.standardize = standardize class OtherDatasetFactor: def __init__(self, factor_name, data_type, link, sparse, value_func): self.factor_name = factor_name self.data_type = data_type self.link = link self.sparse = sparse self.value_func = value_func