Overall Statistics |
Total Trades 0 Average Win 0% Average Loss 0% Compounding Annual Return 0% Drawdown 0% Expectancy 0 Net Profit 0% Sharpe Ratio 0 Probabilistic Sharpe Ratio 0% Loss Rate 0% Win Rate 0% Profit-Loss Ratio 0 Alpha 0 Beta 0 Annual Standard Deviation 0 Annual Variance 0 Information Ratio -0.413 Tracking Error 0.154 Treynor Ratio 0 Total Fees $0.00 Estimated Strategy Capacity $0 Lowest Capacity Asset Portfolio Turnover 0% |
from QuantConnect.DataSource import * from AlgorithmImports import * import plotly.graph_objects as go from plotly.subplots import make_subplots import plotly.express as px import statsmodels.api as sm from scipy.stats.mstats import winsorize from sklearn.linear_model import LinearRegression, LogisticRegression, ElasticNetCV, LassoCV, RidgeCV from sklearn.metrics import mean_squared_error from sklearn.svm import SVC from sklearn.tree import DecisionTreeClassifier from sklearn.ensemble import RandomForestClassifier from sklearn.decomposition import PCA from math import sqrt from IPython.display import display, HTML, IFrame from scipy.stats import rankdata, ttest_ind, jarque_bera, mannwhitneyu, pearsonr, spearmanr, chisquare, shapiro import numbers from pytz import timezone from factor import * class DatasetAnalyzer: """ A class to analyze datasets listed on the QC data market. """ def __init__(self, dataset, dataset_tickers, universe, factors, sparse_data, dataset_start_date, in_sample_end_date, out_of_sample_end_date, label_function=None, return_prediction_period=1, marker_size=3): """ Retrieves historical price data for the universe securities and historical data for the factors under analysis. The first 5 rows of the raw dataset history DataFrame is displayed, then the value functions for each of the factors are applied. Input: - dataset Class type of the dataset to analyze - dataset_tickers Either a ManualUniverse or ETFUniverse object that matches the `universe` (if the dataset is linked), otherwise a list of tickers of dataset links (for example: ['REG'] for Regalytics) - universe A ManualUniverse or ETFUniverse object containing list of tickers to use when analyzing the relationship between the dataset and security returns. - factors A list of Factors to analyze within the dataset - sparse_data Boolean to represent if the `dataset` is sparse. - dataset_start_date Start date of the dataset. Retrievable from the dataset listing. - in_sample_end_date Date to mark the end of the in-sample period. - out_of_sample_end_date Date to mark the end of the out-of-sample period. - return_prediction_period Number of days positions would be held for (the target variable) - marker_size Size of markers in plots """ self.qb = QuantBook() self.dataset = dataset self.factors = factors self.sparse_data = sparse_data self.dataset_start_date = dataset_start_date self.in_sample_end_date = in_sample_end_date self.out_of_sample_end_date = out_of_sample_end_date self.return_prediction_period = return_prediction_period self.linked_dataset = universe == dataset_tickers self.transformed_dataset_history = pd.DataFrame() self.hypothesis_test_p_value = {} self.marker_size = marker_size # Subscribe to universe of securities symbols, self.security_timezone = universe.get_symbols(self.qb) # Request historical price data self.raw_history = self.qb.History(symbols, self.dataset_start_date, self.out_of_sample_end_date) self.price_history = self.raw_history.open.unstack(level=0) self.price_history = self.price_history.groupby(self.price_history.index.date).last() self.price_history.index = [datetime.combine(date, datetime.min.time()) for date in self.price_history.index] # Only consider securities with historical data self.security_symbols = [x for x in symbols if x in self.price_history.columns] self.dataset_symbols = [] # Subscribe to dataset links = self.security_symbols if self.linked_dataset else dataset_tickers for link in links: dataset_subscription = self.qb.AddData(dataset, link) self.dataset_timezone = dataset_subscription.Exchange.TimeZone self.dataset_symbols.append(dataset_subscription.Symbol) # Create labels if label_function is None: self.labels = self.price_history.pct_change(self.return_prediction_period).shift(-self.return_prediction_period).iloc[:-self.return_prediction_period] else: self.labels = label_function(self.raw_history) # Calculate historical returns self.return_history = self.price_history.pct_change(self.return_prediction_period).shift(-self.return_prediction_period).iloc[:-self.return_prediction_period] # Request historical dataset data dataset_str = str(self.dataset).split("'")[-2].split(".")[-1] try: dataset_hist = [] for symbol in self.dataset_symbols: dataset_hist.append(self.qb.History(symbol, self.dataset_start_date, self.out_of_sample_end_date)) self.dataset_history = pd.concat(dataset_hist) except Exception as e: print(f"You don't have a subscription for {dataset_str}. Add it to your organization on the Pricing page.") return if self.dataset_history.empty: print(f'No historical data was available for the {dataset_str} dataset.') return factor_names = [factor.name for factor in factors] # Reformat DataFrame self.dataset_history = self._process_nested_dataset( self.dataset_history[~self.dataset_history.index.duplicated(keep="last")], factor_names) self.dataset_history = self.dataset_history.groupby(self.dataset_history.index.date).last() self.dataset_history.index = [datetime.combine(date, datetime.min.time()) for date in self.dataset_history.index] # Show the raw data display(self.dataset_history.dropna(how='all').head().dropna(axis=1, how='all')) self.transformed_dataset_history = pd.DataFrame() # Transform raw factor values using the value function defined by the client for factor in factors: if factor.value_function is None: df = self._default_value_function(self.dataset_history[factor.name], self.dataset_history.index, self.dataset_timezone, self.security_timezone) else: df = factor.value_function(self.dataset_history[factor.name], self.dataset_history.index, self.dataset_timezone, self.security_timezone) df.columns = pd.MultiIndex.from_tuples([(factor.name, col) for col in df.columns]) self.transformed_dataset_history = pd.concat([self.transformed_dataset_history, df], axis=1) # Timestamp of adjusted factor values should be when the factor value was acted upon # factor_value_raw_timestamp => target_return_period # -Sunday 12am => Monday open to Tuesday open # -Monday 12am => Monday open to Tuesday open (timestamp: Tuesday 12am) # -Tuesday 12am => Tuesday open to Wednesday open (timestamp: Wednesday 12am) # -Wednesday 12am => Wednesday open to Thursday open (timestamp: Thursday 12am) # -Thursday 12am => Thursday open to Friday open (timestamp: Friday 12am) # -Friday 12am => Friday open to Monday open (timestamp: Saturday 12am) # -Saturday 12am => Monday open to Tuesday open (timestamp: Tuesday 12am) # In this ^ case, adjusted factor timestamps should match the timestamp of the bar where the trade was opened ## Remove securities that don't have data for the dataset indices_to_remove = [] for i, dataset_symbol in enumerate(self.dataset_symbols): if not all([dataset_symbol in self.transformed_dataset_history[factor.name].columns for factor in factors]): indices_to_remove.append(i) for factor in factors: if dataset_symbol in self.transformed_dataset_history[factor.name].columns: self.transformed_dataset_history.drop(pd.MultiIndex.from_tuples([(factor.name, dataset_symbol)]), axis=1, inplace=True) for index in indices_to_remove[::-1]: del self.dataset_symbols[index] del self.security_symbols[index] self.price_history = self.price_history.loc[:, self.security_symbols] self.return_sub_history = self.return_history.reindex(self.transformed_dataset_history.index) # Use a scatter plot if the factor values are sparse, otherwise use a line chart self.dataset_plotting_mode = 'markers' if self.sparse_data else 'lines' # Align all timestamps self.price_history = self.price_history.reindex(self.return_sub_history.index) def _process_nested_dataset(self, df, factor_names): def _is_dataframe_all_numbers(df): try: df.apply(pd.to_numeric) return True except: return False if not _is_dataframe_all_numbers(df): df = df.unstack(level=0).swaplevel(axis=1) symbols = [col[0] for col in df.columns] new_df = pd.DataFrame() for factor in factor_names: row = {} for j in range(len(symbols)): row[symbols[j]] = [] for i in range(df.shape[0]): if isinstance(df.iloc[i, j], List): df.iloc[i, j] = df.iloc[i, j][-1] try: if pd.isnull(df.iloc[i, j]): row[symbols[j]].append(df.iloc[i, j]) else: row[symbols[j]].append(eval(f"df.iloc[i, j].{factor}")) except: row[symbols[j]].append(np.nan) new_row = pd.DataFrame(row) new_df = pd.concat([new_df, new_row], axis=1) new_df.columns = pd.MultiIndex.from_tuples([(symbol, factor) for factor in factor_names for symbol in symbols]) new_df.index = df.index return new_df.swaplevel(axis=1) return df[factor_names].unstack(level=0) def _default_value_function(self, df, index, dataset_timezone, security_timezone): """ This function transforms the dataset's raw data into a numerical value. The timestamps of the factor values returned from this method should match the timestamps of the bar where the trade was opened in response to the factor value. Input: - df DataFrame of factor values for each security in the universe - index The timestamps of when the security traded - dataset_timezone Timezone of the dataset - security_timezone Timezone of the security Returns a DataFrame of adjusted numerical factor values. """ # Match timezones if dataset_timezone != security_timezone: match_timezones_func = lambda time: time.replace(tzinfo=timezone(str(dataset_timezone))).astimezone(timezone(str(security_timezone))) df.index = df.index.map(match_timezones_func) result_df = pd.DataFrame(columns=df.columns) # If tz-aware index, remove tz-aware (so we can compare the indices in the snippet that comes after) if isinstance(df.index, pd.Index): new_index = pd.to_datetime(df.index, utc=True).tz_convert(None) df.index = new_index # Move dataset index forward by 1 day (since we open our trade on the day after we receive the factor value) df.index = df.index + timedelta(1) # Move forward index of other_dataset_history if its index elements don't align with `this_dataset_index` for i in df.index: adjusted_index_options = index[index >= i] if len(adjusted_index_options) == 0: continue adjusted_index = adjusted_index_options[0] row = df.loc[i] row.name = adjusted_index result_df.loc[adjusted_index] = row # Drop duplicate indices result_df = result_df[~result_df.index.duplicated(keep='last')] # Align factor values with this_dataset_index result_df = result_df.reindex(index) # Drop rows that have only NaN values result_df = result_df.dropna(axis=0, how='all') return result_df def plot_data_shape(self, num_securities=10, y_axis_title='', subplot_title_extension=''): """ Displays a time series plot for each factor using the values returned from the value function. For linked datasets, the first `num_securities` are selected to have their factor values plotted. Input: - num_securities Number of securities to plot factor values for. (Used for linked datasets) - y_axis_title Y axis title of each subplot - subplot_title_extension A string to add onto the end of the factor names to make them more understandable """ # Create Plotly figure titles = [] for factor in self.factors: title = factor.printable_name if subplot_title_extension != '': title += f' {subplot_title_extension}' titles.append(title) fig = make_subplots(rows=len(self.factors), cols=1, shared_xaxes=False, vertical_spacing=0.15, subplot_titles=tuple(titles)) current_row = 1 for factor in self.factors: dataset_symbols = [] if self.linked_dataset: for equity_symbol_index, security_symbol in enumerate(self.security_symbols[:num_securities]): dataset_symbols.append(self.dataset_symbols[equity_symbol_index]) else: dataset_symbols.append(self.dataset_symbols[0]) for dataset_symbol in dataset_symbols: factor_values = self.transformed_dataset_history[factor.name][dataset_symbol] fig.append_trace(go.Scatter(x=factor_values.index, y = factor_values, mode = self.dataset_plotting_mode, marker=dict(size=self.marker_size), name=str(dataset_symbol)), row=current_row, col=1) current_row += 1 fig.update_layout(title_text=f"Factor Values Over Time", margin=dict(l=0, r=0, b=0), showlegend=False, height = (current_row-1) * 300) for i, factor in enumerate(self.factors): fig['layout'][f'yaxis{i+1}']['title']= y_axis_title fig['layout'][f'xaxis{i+1}']['range'] = [self.transformed_dataset_history.index[0], self.transformed_dataset_history.index[-1]] # Show the plot fig.show() def _convert_to_legend_format(self, value): """ A helper method to display write well-formated values in statistical plots. Input: - value The value to be rounded or put into scientific notation """ rounded = round(value, 5) if rounded != 0: return str(rounded) return "{:e}".format(value) # Scientific notation def measure_significance(self): """ Displays R square, adjusted R square, t-test p-value, and F-test p-value. Each one is selected when appropriate, depending on the universe size and number of factors. """ adj_r_squares = [] f_pvalues = [] r_squares_by_factor = {factor: [] for factor in self.factors} t_pvalues_by_factor = {factor: [] for factor in self.factors} # For each security, gather the regression results for equity_symbol_index, security_symbol in enumerate(self.security_symbols): dataset_symbol = self.dataset_symbols[equity_symbol_index if self.linked_dataset else 0] # x-axis = factor values factor_values = self.transformed_dataset_history.iloc[:, self.transformed_dataset_history.columns.get_level_values(1)==dataset_symbol] # y-axis = returns labels = self.labels[security_symbol]#equity_returns = self.return_sub_history[security_symbol] # Align time stamps (incase there are NaN values) intersect_index = sorted(list(set(labels.dropna().index).intersection(set(factor_values.dropna().index)))) factor_values = factor_values.reindex(intersect_index) labels = labels.reindex(intersect_index) # Fit the model using each factor individually for i, factor in enumerate(self.factors): if factor_values[factor_values.columns[i]].empty: r_squares_by_factor[factor].append(0) t_pvalues_by_factor[factor].append(1) continue x = sm.add_constant(factor_values[factor_values.columns[i]].values, has_constant='add') temp_df = pd.DataFrame(x) temp_df["labels"] = labels.values.reshape(-1, 1) temp_df = temp_df.replace([np.inf, -np.inf], np.nan).dropna() if temp_df.empty: r_squares_by_factor[factor].append(0) t_pvalues_by_factor[factor].append(1) continue model = sm.OLS(temp_df.iloc[:, -1], temp_df.iloc[:, :-1]) results = model.fit() r_squares_by_factor[factor].append(results.rsquared) t_pvalues_by_factor[factor].append(results.pvalues[1]) if len(self.factors) > 1: # Fit the model using all the factors if factor_values.empty: r_squares_by_factor[factor].append(0) t_pvalues_by_factor[factor].append(1) continue x = sm.add_constant(factor_values.values, has_constant='add') temp_df = pd.DataFrame(x) temp_df["labels"] = labels.values.reshape(-1, 1) temp_df = temp_df.replace([np.inf, -np.inf], np.nan).dropna() if temp_df.empty: adj_r_squares.append(np.array([0]*factor_values.shape[1])) f_pvalues.append(np.array([1]*factor_values.shape[1])) continue model = sm.OLS(temp_df.iloc[:, -1], temp_df.iloc[:, :-1]) results = model.fit() adj_r_squares.append(results.rsquared_adj) f_pvalues.append(results.f_pvalue) max_x = 0 if len(self.factors) == 1 else max(adj_r_squares) for r_squares in r_squares_by_factor.values(): max_x = max(max_x, max(r_squares)) if len(self.factors) > 1: # Plot the results fig = go.Figure() fig.add_trace(go.Scatter(x=adj_r_squares, y=f_pvalues, mode='markers', showlegend=False, marker=dict(symbol='circle', opacity=0.7, color='white', size=8, line=dict(width=1), ))) fig.add_trace(go.Histogram2d(x=adj_r_squares, y=f_pvalues, colorscale='YlGnBu', xbins=dict(start=0.,end=1., size=0.05), autobinx=False, ybins=dict(start=0.,end=1., size=0.05)))#, nbinsx=20, nbinsy=20, zauto=True)) # colorscale='YlGnBu', fig.update_layout( title=f'All Factors', margin=dict(l=0, r=0, b=0), xaxis = dict(range = [-0.05, max_x + 0.05], title="Adjusted R<sup>2</sup>"), yaxis = dict(range = [-0.05, 1.05], title='F-Statistic P-Value'), height=500, width=500, hovermode='closest', ) fig.show() for factor in self.factors: fig = go.Figure() r_squares = r_squares_by_factor[factor] t_pvalues = t_pvalues_by_factor[factor] fig.add_trace(go.Scatter(x=r_squares, y=t_pvalues, mode='markers', showlegend=False, marker=dict(symbol='circle', opacity=0.7, color='white', size=8, line=dict(width=1), ))) fig.add_trace(go.Histogram2d(x=r_squares, y=t_pvalues, colorscale='YlGnBu', xbins=dict(start=0.,end=1., size=0.05), autobinx=False, ybins=dict(start=0.,end=1., size=0.05)))#, nbinsx=20, nbinsy=20, zauto=True)) # colorscale='YlGnBu', fig.update_layout( title=f'{factor.printable_name} Factor', margin=dict(l=0, r=0, b=0), xaxis = dict(range = [-0.05, max_x + 0.05], title="R<sup>2</sup>"), yaxis = dict(range = [-0.05, 1.05], title='T-Test P-Value'), height=500, width=500, hovermode='closest', ) fig.show() def calculate_statistics(self, winsorize_limits=(0.01, 0.01)): """ Displays a DataFrame of the following statistics: mean, std dev, skewness, kurtosis, & normality test P-value. Input: - winsorize_limits Limits to exclude the top x% and bottom y% of outliers from the calculations. """ statistic_df = pd.DataFrame() for factor in self.factors: # Gather factor values for all the securities all_factor_values = pd.Series(self.transformed_dataset_history[factor.name].values.flatten()).dropna() # Remove outliers all_factor_values = pd.Series(winsorize(all_factor_values, limits=winsorize_limits)) statistic_df.loc['Mean', factor.printable_name] = all_factor_values.mean() statistic_df.loc['Standard deviation', factor.printable_name] = all_factor_values.std() statistic_df.loc['Skewness', factor.printable_name] = all_factor_values.skew() statistic_df.loc['Kurtosis', factor.printable_name] = all_factor_values.kurt() if factor.data_type == 'continuous': if len(all_factor_values) > 2000: p_value = jarque_bera(all_factor_values).pvalue else: p_value = shapiro(all_factor_values).pvalue else: p_value = 'N/A' statistic_df.loc['Normality test P-value', factor.printable_name] = p_value statistic_df.index.names = ['Universe Statistic'] display(statistic_df) def _update_correlation_results(self, results, df, factor, dataset_class_str, other_dataset_factor): """ A helper method to update correlation calculation results. Input: - results The DataFrame containing all of the correlation results - df A DataFrame containing data from two datasets that needs the correlation calculated - factor The first factor we're calculating the correlation on - dataset_class_str The name of the dataset the second factor is from - other_dataset_factor The second factor we're calculating the correlation on Returns the correlation results DataFrame """ # Calculate correlation corr = df['this_dataset'].corr(df['other_dataset']) hypothesis_test_p_value = self._get_p_value(df['this_dataset'], df['other_dataset'], factor.data_type, other_dataset_factor.data_type) corr = '{:,.4f}'.format(corr) hypothesis_test_p_value = '{:,.4f}'.format(hypothesis_test_p_value) results.loc[f"{dataset_class_str}.{other_dataset_factor.factor_name}", f'{factor.printable_name}'] = f"{corr} ({hypothesis_test_p_value})" return results def calculate_factor_correlations(self, other_dataset_factor_by_class): """ Displays a DataFrames to show - The correlation between each of the factors in the `dataset` - The correlation each of the factors in the `dataset` and their respective correlation with the factors of the datasets in the `other_dataset_factor_by_class` dictionary. Input: - other_dataset_factor_by_class A dictionary (key: dataset class, value: a list of OtherDatasetFactor objects) that contains the factors of other datasets we want to calculate the correlation with """ results = pd.DataFrame() # Factor correlation within the main dataset for factor_1 in self.factors: #factor_df[factor.printable_name] = pd.Series(self.transformed_dataset_history[factor_1.name].values.flatten('F')) for factor_2 in self.factors: df = pd.DataFrame({'this_dataset': self.transformed_dataset_history[factor_1.name].values.flatten('F'), 'other_dataset': self.transformed_dataset_history[factor_2.name].values.flatten('F')}).dropna(axis=0, how='any') # Calculate correlation corr = df['this_dataset'].corr(df['other_dataset']) hypothesis_test_p_value = self._get_p_value(df['this_dataset'], df['other_dataset'], factor_1.data_type, factor_2.data_type) corr = '{:,.4f}'.format(corr) hypothesis_test_p_value = '{:,.4f}'.format(hypothesis_test_p_value) results.loc[f"{factor_2.printable_name}", f'{factor_1.printable_name}'] = f"{corr} ({hypothesis_test_p_value})" # Calculate p-value of correlation #results.loc[f"{self.dataset.__name__}.{factor_2.name}", f'P-value on Correlation with {factor_1.printable_name}'] = hypothesis_test_p_value # Calculate correlation of the factors with factors from other datasets # For linked securities, select the columns of the securities that are present in both datasets # For unlinked securities, duplicate the columns # To get one correlation value, move all of the columns in the DataFrames into one column before using `corr` for dataset_class in other_dataset_factor_by_class: dataset_class_str = str(dataset_class).split("'")[-2].split(".")[-1] if dataset_class not in other_dataset_factor_by_class: print(f"{dataset_class_str} was not provided in the `other_dataset_factor_by_class` dictionary. To use {dataset_class_str}, add it to the `other_dataset_factor_by_class` dictionary.") continue # Select one of the factors from the other datasets for other_dataset_factor in other_dataset_factor_by_class[dataset_class]: # Get start and end dates of correlation period start_date = self.transformed_dataset_history.index[0] end_date = self.transformed_dataset_history.index[-1] if other_dataset_factor.link == SecurityType.Equity: # Gather `other_dataset_factor` data for each security in the universe other_dataset_symbols = [] other_dataset_timezone = None for i, symbol in enumerate(self.security_symbols): # Subscribe to the other dataset factor other_dataset_subscription = self.qb.AddData(dataset_class, symbol) other_dataset_timezone = other_dataset_subscription.Exchange.TimeZone other_dataset_symbol = other_dataset_subscription.Symbol other_dataset_symbols.append(other_dataset_symbol) # Get historical data for the `other_dataset_factor` try: other_dataset_history = self.qb.History(other_dataset_symbols, start_date, end_date) except: print(f'You don\'t have a subscription to the {dataset_class_str} dataset. Add it to your organization on the Pricing page.') continue if other_dataset_history.empty: print(f'No historical data was available for the {dataset_class_str} dataset.') continue other_dataset_history = other_dataset_history[other_dataset_factor.factor_name] other_dataset_history = other_dataset_history[~other_dataset_history.index.duplicated(keep='last')].unstack(level=0) for factor in self.factors: # Get this dataset history this_dataset_history = self.transformed_dataset_history[factor.name] # Apply value function other_dataset_history = other_dataset_factor.value_function(other_dataset_history, this_dataset_history.index, other_dataset_timezone, self.dataset_timezone) # Align indices (incase timestamps were removed from the value function) intersect_index = sorted(list(set(this_dataset_history.index).intersection(set(other_dataset_history.index)))) aligned_this_dataset_history = this_dataset_history.loc[intersect_index] aligned_other_dataset_history = other_dataset_history.loc[intersect_index] if self.linked_dataset: # Get a lists of symbols for securities that are in both datasets selected_this_dataset_symbols = [] selected_other_dataset_symbols = [] for i, other_dataset_symbol in enumerate(other_dataset_symbols): if other_dataset_symbol not in aligned_other_dataset_history.columns: continue if self.dataset_symbols[i] not in aligned_this_dataset_history.columns: continue selected_this_dataset_symbols.append(self.dataset_symbols[i]) selected_other_dataset_symbols.append(other_dataset_symbol) # Make a DataFrame of both histories so we can align the timestamps and drop rows with nan values data = { 'this_dataset': aligned_this_dataset_history[selected_this_dataset_symbols].values.flatten('F'), 'other_dataset': aligned_other_dataset_history[selected_other_dataset_symbols].values.flatten('F') } df = pd.DataFrame(data).dropna(axis=0, how='any') else: # Get a lists of symbols for securities that are in the 'other' dataset selected_other_dataset_symbols = [] for i, other_dataset_symbol in enumerate(other_dataset_symbols): if other_dataset_symbol not in aligned_other_dataset_history.columns: continue selected_other_dataset_symbols.append(other_dataset_symbol) for i in range(1, len(selected_other_dataset_symbols)): aligned_this_dataset_history[f"column_{i}"] = aligned_this_dataset_history[aligned_this_dataset_history.columns[0]] # Make a DataFrame of both histories so we can align the timestamps and drop rows with nan values df = pd.DataFrame({'this_dataset': aligned_this_dataset_history.values.flatten('F'), 'other_dataset': aligned_other_dataset_history[selected_other_dataset_symbols].values.flatten('F')}).dropna(axis=0, how='any') results = self._update_correlation_results(results, df, factor, dataset_class_str, other_dataset_factor) else: # In this case, the dataset isn't linked to a security # Subscribe to the other dataset other_dataset_subscription = self.qb.AddData(dataset_class, other_dataset_factor.link) other_dataset_symbol = other_dataset_subscription.Symbol other_dataset_timezone = other_dataset_subscription.Exchange.TimeZone # Gather historical data of the other dataset try: other_dataset_history = self.qb.History(other_dataset_symbol, start_date, end_date) except: print(f'You don\'t have a subscription to the {dataset_class_str} dataset. Add it to your organization on the Pricing page.') continue if other_dataset_history.empty: print(f'No historical data was available for the {dataset_class_str} dataset.') continue other_dataset_history = other_dataset_history.loc[other_dataset_symbol][[other_dataset_factor.factor_name]] for factor in self.factors: # Get this dataset history this_dataset_history = self.transformed_dataset_history[factor.name] # Apply value function other_dataset_history = other_dataset_factor.value_function(other_dataset_history, this_dataset_history.index, other_dataset_timezone, self.dataset_timezone) # Align indices (incase timestamps were removed from the value function) intersect_index = sorted(list(set(this_dataset_history.index).intersection(set(other_dataset_history.index)))) aligned_this_dataset_history = this_dataset_history.reindex(intersect_index) aligned_other_dataset_history = other_dataset_history.reindex(intersect_index) # Make duplicate columns of `aligned_other_dataset_history` so the number of columns matches `aligned_this_dataset_history` for i in range(1, len(aligned_this_dataset_history.columns)): aligned_other_dataset_history[f"column_{i}"] = aligned_other_dataset_history[aligned_other_dataset_history.columns[0]] # Make a DataFrame of both histories so we can align the timestamps and drop rows nan values df = pd.DataFrame({'this_dataset': aligned_this_dataset_history.values.flatten('F'), 'other_dataset': aligned_other_dataset_history.values.flatten('F')}).dropna(axis=0, how='any') results = self._update_correlation_results(results, df, factor, dataset_class_str, other_dataset_factor) results.columns = pd.MultiIndex.from_tuples([('Factor Correlation Coefficient (P-Value)', col) for col in results.columns]) display(results) def _get_p_value(self, dataset_a, dataset_b, data_a_type, data_b_type, significance=0.05): """ Gets the p-value of two lists, considering the "type" of each list (continuous or discrete). Input: - dataset_a First list of values. - dataset_b Second list of values. - data_a_type Type of `dataset_a` ('continuous' or 'discrete'). - data_b_type Type of `dataset_b` ('continuous' or 'discrete'). - significance Level of significance to use for the normality test. Returns the p-value that results after applying the correct statistical test. """ np.seterr(divide='ignore') if (isinstance(dataset_a, pd.Series) or isinstance(dataset_a, pd.DataFrame)) \ and (isinstance(dataset_b, pd.Series) or isinstance(dataset_b, pd.DataFrame)): merge = pd.concat([dataset_a, dataset_b], axis=1).replace([np.inf, -np.inf], np.nan).dropna(axis=0) dataset_a = merge.iloc[:, 0] dataset_b = merge.iloc[:, 1] else: merge = np.concatenate([dataset_a.reshape(-1, 1), dataset_b.reshape(-1, 1)], axis=1) merge = merge[np.isfinite(merge).all(axis=1), :] dataset_a = merge[:, 0] dataset_b = merge[:, 1] if jarque_bera(dataset_b).pvalue < significance: # If normally distributed return pearsonr(dataset_a, dataset_b)[1] # null hypothesis is that there is no linear relationship between the datasets return spearmanr(dataset_a, dataset_b)[1] # null hypotheisis is that two sets of data are uncorrelated def calculate_factor_importance(self, standardize=True): """ Displays a box plot showing the relative importance of each factor in determining future returns for the securities in the universe, and a histogram showing the accuracy score of regularization. Input: - standardize A boolean to represent if the data should be standardized before applying dimensionality reduction """ if len(self.factors) == 1: print("Factor importance analysis is only available when analyzing multiple factors.") return models = [LassoCV, RidgeCV, ElasticNetCV] # titles = [str(model.__name__) for model in models] fig = make_subplots(rows=len(models), cols=1, subplot_titles=tuple(titles), vertical_spacing=0.1, shared_yaxes='all') current_row = 1 results_by_model = {} scores_by_model = {} # Generate factor importance plots for model in models: results_by_model[model] = np.ndarray(shape=(0,len(self.factors))) scores_by_model[model] = pd.DataFrame(columns=["Score"]) for symbol_index, security_symbol in enumerate(self.security_symbols): dataset_symbol = self.dataset_symbols[symbol_index if self.linked_dataset else 0] factor_importance_pct, factor_names, model_score = self._get_factor_importance_pct(dataset_symbol, security_symbol, model, standardize) if factor_importance_pct is None: continue # Append row to results ndarray results_by_model[model] = np.vstack([results_by_model[model], factor_importance_pct]) scores_by_model[model].loc[security_symbol] = np.array(model_score) for i, factor in enumerate(self.factors): fig.append_trace(go.Box(y=results_by_model[model][:, i], name=factor.printable_name, line_width=6, showlegend=False), row=current_row, col=1) current_row += 1 # Update layout for i in range(1, len(models)+1): fig['layout'][f'yaxis{i}']['title']='Importance (%)' fig.update_layout(title_text=f"Distribution of Explanatory Power of Each Factor for Each Security in the Universe", yaxis_range=[0,1], showlegend=False, margin=dict(l=0, r=0, b=0), height = (current_row-1) * 250) fig.show() # Generate the accuracy (variance explained) score plot fig = go.Figure() annotation_text = "" for i, model in enumerate(models): fig.add_trace(go.Histogram( x=scores_by_model[model].values.flatten(), name=str(model.__name__))) scores = scores_by_model[model].values minimum = self._convert_to_legend_format(scores.min()) maximum = self._convert_to_legend_format(scores.max()) mean = self._convert_to_legend_format(scores.mean()) std = self._convert_to_legend_format(scores.std()) if i > 0: annotation_text += "<br>" annotation_text += f"{str(model.__name__)}<br>-Minimum: {minimum}<br>-Maximum: {maximum}<br>-Mean: {mean}<br>-Standard deviation: {std}<br>" fig.update_layout(title=f"Model Accuracy<br><span style='font-size: 12px'>Distribution of R<sup>2</sup> Values From Applying Each Model to Each Security</span>", legend=dict(yanchor="top", y=0.99, xanchor="left", x=0.01, bgcolor='rgba(255,255,255,0.5)'), margin=dict(l=0, r=0, b=0), xaxis_title="R<sup>2</sup>", yaxis_title='Count', height = 500, bargap=0.1, annotations=[ go.layout.Annotation( text=annotation_text, align='left', showarrow=False, xref='paper', yref='paper', x=1, y=1, bordercolor='black', borderwidth=1, bgcolor='white', opacity=0.75 )] ) fig.show() def _get_factor_importance_pct(self, dataset_symbol, security_symbol, model, standardize): """ A helper method calculate how much influence each factor has on the target values Input: - dataset_symbol The symbol of the dataset to use - security_symbol The symbol of the security used to fit the model - model The regression model to use - standardize A boolean to represent if the data should be standardized before applying dimentionality reduction Returns the percentage of influence each factor has on target values, the associated factor names, and the accuracy score of the model. """ data = pd.DataFrame() for factor in self.factors: data[factor.name] = self.transformed_dataset_history[factor.name][dataset_symbol] # Drop rows with NaN values data = data.dropna(axis=0, how='any') factor_names = data.columns # Standardize data if standardize: if 0 in data.std().values: return None, None, None data = (data - data.mean()) / data.std() # Drop columns that are just NaNs data.dropna(axis=1, how='all', inplace=True) if data.shape[1] <= 1: return None, None, None return_ = self.return_history[security_symbol].reindex(data.index) data = pd.concat([data, return_], axis=1).dropna() try: model_ = model() except TypeError as e: raise e model_.fit(data.iloc[:, :-1], data.iloc[:, -1]) coef = model_.coef_ if np.sum(abs(coef)) == 0: factor_importance_pct = abs(coef) else: factor_importance_pct = abs(coef)/np.sum(abs(coef)) model_score = model_.score(data.iloc[:, :-1], data.iloc[:, -1]) return factor_importance_pct, factor_names, model_score def run_ml_models(self, regression_models, classifier_models, negative_return_label=-1, positive_return_label=1): """ Trains machine learning models to predict the magnitude/direction of the next day given the factor values of the current day. Input: - model Instance of the SKLearn classification model to use. - negative_return_label Label to use when there was a negative daily return (-1 means the model will take 100% short exposure when it predicts a down day) - positive_return_label Label to use when there was a negative daily return (1 means the model will take 100% long exposure when it predicts an up day) Displays the results of the models, including equity curve, accuracy, and exposure. """ all_returns_by_model = {} all_predictions_by_model = {} scores_by_model = {} computed_symbols_by_model = {} models = regression_models + classifier_models for model_idx, model in enumerate(models): is_regression_model = model_idx < len(regression_models) scores_by_model[model] = {'in-sample': np.array([]), 'out-of-sample': np.array([])} computed_symbols_by_model[model] = [] all_returns_by_model[model] = pd.DataFrame() all_predictions_by_model[model] = pd.DataFrame() for equity_symbol_index, security_symbol in enumerate(self.security_symbols): # Gather factor values dataset_symbol = self.dataset_symbols[equity_symbol_index if self.linked_dataset else 0] factor_values = pd.DataFrame() for factor in self.factors: factor_values[factor.name] = self.transformed_dataset_history[factor.name][dataset_symbol] # Gather labels label = self.labels[security_symbol].copy() #self.return_sub_history[security_symbol].copy() if not is_regression_model: label[label <= 0] = negative_return_label label[label > 0] = positive_return_label # Align time stamps (incase there are NaN values) intersect_index = sorted(list(set(label.dropna().index).intersection(set(factor_values.dropna().index)))) factor_values = factor_values.reindex(intersect_index) label = label.reindex(intersect_index) x = factor_values.loc[:self.in_sample_end_date.date()] if x.empty: continue y = label.loc[label.index <= self.in_sample_end_date] if y.empty: continue # Fit model to in-sample data model.fit(x, y) # Run model on in-sample data in_sample_predictions = pd.Series(model.predict(x), index=y.index) if is_regression_model: # Convert to binary predictions and labels in_sample_predictions.loc[in_sample_predictions <= 0] = negative_return_label in_sample_predictions.loc[in_sample_predictions > 0] = positive_return_label y.loc[y <= 0] = negative_return_label y.loc[y > 0] = positive_return_label in_sample_score = (in_sample_predictions == y).mean() else: in_sample_score = model.score(x, y) # Gather out-of-sample-data x = factor_values.loc[factor_values.index > self.in_sample_end_date] if x.empty: continue y = label.loc[label.index > self.in_sample_end_date] if y.empty: continue # Run model on out of sample data out_of_sample_predictions = pd.Series(model.predict(x), index=y.index) out_of_sample_predictions.loc[out_of_sample_predictions <= 0] = negative_return_label out_of_sample_predictions.loc[out_of_sample_predictions > 0] = positive_return_label if is_regression_model: y.loc[y <= 0] = negative_return_label y.loc[y > 0] = positive_return_label out_of_sample_score = (out_of_sample_predictions == y).mean() else: out_of_sample_score = model.score(x, y) # Align predictions with security price security_price_history = self.price_history[security_symbol].dropna() in_sample_price_index = security_price_history[security_price_history.index <= self.in_sample_end_date].index out_of_sample_price_index = security_price_history[security_price_history.index > self.in_sample_end_date].index in_sample_predictions = in_sample_predictions.reindex(in_sample_price_index) out_of_sample_predictions = out_of_sample_predictions.reindex(out_of_sample_price_index) # Fill forward predictions if self.return_prediction_period > 1: in_sample_predictions = in_sample_predictions.fillna(method='ffill', limit=self.return_prediction_period - 1) out_of_sample_predictions = out_of_sample_predictions.fillna(method='ffill', limit=self.return_prediction_period - 1) in_and_out_sample_predictions = pd.concat([in_sample_predictions, out_of_sample_predictions]) all_predictions_by_model[model] = pd.concat([all_predictions_by_model[model], in_and_out_sample_predictions], axis=1, sort=False) # Calculate prediction returns security_return_history = security_price_history.pct_change(1).shift(-1).dropna() model_returns = security_return_history * in_and_out_sample_predictions.fillna(0) model_returns.name = security_symbol all_returns_by_model[model] = pd.concat([all_returns_by_model[model], model_returns], axis=1, sort=False) # Save results scores_by_model[model]['in-sample'] = np.append(scores_by_model[model]['in-sample'], in_sample_score) scores_by_model[model]['out-of-sample'] = np.append(scores_by_model[model]['out-of-sample'], out_of_sample_score) computed_symbols_by_model[model].append(security_symbol) # Plot equity curves fig = go.Figure() # -- Benchmark (Universe) returns universe_returns = (self.price_history[self.security_symbols].pct_change() + 1).mean(axis=1) universe_returns.iloc[0] = 1 universe_returns = universe_returns.cumprod() bottom = min(universe_returns) top = max(universe_returns) # -- Model returns for model in models: model_returns = all_returns_by_model[model].reindex(universe_returns.index) model_returns = (model_returns + 1).mean(axis=1).cumprod().shift(1) model_returns.iloc[0] = 1 fig.add_trace(go.Scatter(x=model_returns.index, y = model_returns, name=f"{type(model).__name__} Model")) bottom = min(bottom, min(model_returns)) top = max(top, max(model_returns)) fig.add_trace(go.Scatter(x=universe_returns.index, y = universe_returns, name='Benchmark')) fig.add_shape(type="line", x0=self.in_sample_end_date, y0=bottom, x1=self.in_sample_end_date, y1=top, line=dict(color="Orange", width=2, dash="dot") ) fig.add_trace(go.Scatter( x=[self.in_sample_end_date], y=[top], text=["In-sample "], mode="text", textposition="top left", showlegend=False )) fig.add_trace(go.Scatter( x=[self.in_sample_end_date], y=[top], text=[" Out-of-sample"], mode="text", textposition="top right", showlegend=False )) fig.update_layout(title_text=f"Daily Equity Curves<br><span style='font-size: 12px'>Equity Curves of Buying the Universe Constituents and Following the Model Predictions</span>", margin=dict(l=0, r=0, b=0), legend=dict( yanchor="top", y=0.99, xanchor="left", x=0.01 )) fig.update_xaxes(title="Date", range=[universe_returns.index[0], universe_returns.index[-1]]) fig.show() # Plot daily prediction accuracy fig = go.Figure() for model_idx, model in enumerate(models): all_predictions_by_model[model].columns = list(range(len(computed_symbols_by_model[model]))) all_correct_predictions = pd.DataFrame() for i, column in enumerate(computed_symbols_by_model[model]): predictions = all_predictions_by_model[model][i].dropna()[:-1] returns = self.price_history[column].pct_change(1).shift(-1).reindex(predictions.index) signed_returns = np.sign(returns) signed_returns.loc[signed_returns == 0] = -1 correct_predictions = pd.Series(predictions.values == signed_returns.values, index=returns.index, name=column) all_correct_predictions = pd.concat([all_correct_predictions, correct_predictions], axis=1, sort=False) accuracy_per_day = all_correct_predictions.mean(axis=1) fig.add_trace(go.Scatter(x=accuracy_per_day.index, y = accuracy_per_day.values, mode='markers', marker=dict(size=self.marker_size), name=type(model).__name__)) title_text = f"Daily Prediction Accuracy<br><span style='font-size: 12px'>" title_text += f"The Proportion of Models That Had a Correct Prediction for Each Day" fig.update_layout(legend=dict(yanchor="top", y=0.99, xanchor="left", x=0.01, bgcolor='rgba(255,255,255,0.5)'), height=300) title_text += "</span>" fig.update_layout(title_text=title_text, margin=dict(l=0, r=0, b=0), yaxis_range=[-0.02,1.02]) fig.update_xaxes(range=[universe_returns.index[0], universe_returns.index[-1]]) fig.update_layout(legend=dict(yanchor="top", y=0.99, xanchor="left", x=0.01, bgcolor='rgba(255,255,255,0.5)')) fig.update_xaxes(title="Date") fig.add_shape(type="line", x0=self.in_sample_end_date, y0=0, x1=self.in_sample_end_date, y1=1, line=dict(color="Orange", width=2, dash="dot") ) fig.add_trace(go.Scatter( x=[self.in_sample_end_date], y=[0.9], text=["In-sample "], mode="text", textposition="top left", showlegend=False )) fig.add_trace(go.Scatter( x=[self.in_sample_end_date], y=[0.9], text=[" Out-of-sample"], mode="text", textposition="top right", showlegend=False )) fig.show() ## Plot Long-Short Exposure fig = go.Figure() for model in models: all_predictions = all_predictions_by_model[model].reindex(universe_returns.index).mean(axis=1).fillna(0) fig.add_trace(go.Scatter(x=all_predictions.index, y = all_predictions.values, name=type(model).__name__)) fig.update_layout(title_text=f"Daily Mean Security Exposure<br><span style='font-size: 12px'>Mean Long-Short Exposure Across All of the Universe Constituents When Following the Model Predictions</span>", margin=dict(l=0, r=0, b=0), height=300, yaxis_range=[-1.02,1.02], legend=dict( yanchor="top", y=0.99, xanchor="left", x=0.01, bgcolor='rgba(255,255,255,0.5)' )) fig.add_shape(type="line", x0=self.in_sample_end_date, y0=-1, x1=self.in_sample_end_date, y1=1, line=dict(color="Orange", width=2, dash="dot") ) fig.add_trace(go.Scatter( x=[self.in_sample_end_date], y=[0.8], text=["In-sample "], mode="text", textposition="top left", showlegend=False )) fig.add_trace(go.Scatter( x=[self.in_sample_end_date], y=[0.8], text=[" Out-of-sample"], mode="text", textposition="top right", showlegend=False )) fig.update_xaxes(title="Date", range=[universe_returns.index[0], universe_returns.index[-1]]) fig.show() # Display model accuracy scores for period in ['in-sample', 'out-of-sample']: capitalized_period = 'In-Sample' if period == 'in-sample' else 'Out-of-Sample' # Generate the accuracy (variance explained) score plot fig = go.Figure() output_text = "" for i, model in enumerate(models): scores = scores_by_model[model][period] fig.add_trace(go.Histogram(x=scores.flatten(),name=type(model).__name__, xbins=dict( start=0., end=1., size=0.05), autobinx=False )) fig.update_layout(title=f"Daily Model Accuracy {capitalized_period}<br><span style='font-size: 12px'>Distribution of {capitalized_period} Accuracy Values From Applying Each Model to Each Security</span>", legend=dict(yanchor="top", y=0.99, xanchor="left", x=0.01, bgcolor='rgba(255,255,255,0.5)'), margin=dict(l=0, r=0, b=0), xaxis_title="Accuracy", yaxis_title='Count', height = 250, bargap=0.1, ) fig.update_xaxes(range=[0, 1]) fig.show() # Display model comparison DataFrame result_df = pd.DataFrame(columns=pd.MultiIndex.from_tuples([('In-Sample Accuracy Distribution','Mean'), ('In-Sample Accuracy Distribution','Standard Deviation'), ('Out-of-Sample Accuracy Distribution','Mean'), ('Out-of-Sample Accuracy Distribution','Standard Deviation')])) for model in models: model_name = type(model).__name__ for period in ['in-sample', 'out-of-sample']: capitalized_period = 'In-Sample' if period == 'in-sample' else 'Out-of-Sample' score_mean = scores_by_model[model][period].mean() score_std_dev = scores_by_model[model][period].std() result_df.loc[model_name, (capitalized_period + ' Accuracy Distribution', 'Mean')] = '{:,.4f}'.format(score_mean) result_df.loc[model_name, (capitalized_period + ' Accuracy Distribution', 'Standard Deviation')] = '{:,.4f}'.format(score_std_dev) display(result_df) def _get_factor_rankings(self): """ Gets the factor rankings for each security. If there are more than one factor, for each security, we take the mean rankings across all the factors. """ if len(self.factors) == 1: daily_ranks = self.transformed_dataset_history.copy().rank(axis=1, method='first') daily_ranks.columns = self.security_symbols else: daily_ranks = pd.DataFrame() for equity_symbol_index, security_symbol in enumerate(self.security_symbols): dataset_symbol = self.dataset_symbols[equity_symbol_index if self.linked_dataset else 0] daily_ranks[security_symbol] = self.transformed_dataset_history.iloc[:, self.transformed_dataset_history.columns.get_level_values(1)==dataset_symbol].mean(axis=1) daily_ranks = daily_ranks.rank(axis=1, method='first') return daily_ranks def run_ranking_algorithm(self, quantiles): """ Breaks the universe into quantiles based on the ranking of the sole factor then produces equity curve plots of each quantile. A long-short portfolio is also presented using the first and last quantiles. If there are more than one factor, for each security, we take the mean rankings across all the factors. Input: - quantiles Number of quantiles to break the universe into """ # Plot benchmark fig = go.Figure() daily_returns = self.price_history.pct_change().shift(-1) daily_returns.columns = self.security_symbols benchmark_equity_curve = (daily_returns.mean(axis=1) + 1).cumprod() fig.add_trace(go.Scatter(x=benchmark_equity_curve.index, y = benchmark_equity_curve.values, name='Benchmark')) # Plot each quantile for selected_quantile in range(1, quantiles+1): daily_ranks = self._get_factor_rankings() num_ranks_per_day = daily_ranks.max(axis=1) def rank_to_exposure(row): if isinstance(row.name, np.datetime64): # The first row is called 2 times when using `apply` return row num_securities = num_ranks_per_day.loc[row.name] security_per_quintile = int(num_securities / quantiles) long_start = security_per_quintile * (selected_quantile-1) long_end = security_per_quintile * selected_quantile row[(row < long_start) | (row > long_end)] = np.nan row[~row.isna()] = 1 return row exposures = daily_ranks.apply(rank_to_exposure, axis=1) portfolio_returns = exposures * daily_returns portfolio_equity_curve = (portfolio_returns.mean(axis=1) + 1).cumprod() fig.add_trace(go.Scatter(x=portfolio_equity_curve.index, y = portfolio_equity_curve.values, name=f'Q{selected_quantile}')) # Plot Qn-Q1 daily_ranks = self._get_factor_rankings() num_ranks_per_day = daily_ranks.max(axis=1) def rank_to_exposure(row): if isinstance(row.name, np.datetime64): # The first row is called 2 times when using `apply` return row num_securities = num_ranks_per_day.loc[row.name] short_threshold = int(num_securities / quantiles) long_threshold = num_securities - short_threshold + 1 row[(row > short_threshold) & (row < long_threshold)] = np.nan row[row <= short_threshold] = -1 row[row >= long_threshold] = 1 return row exposures = daily_ranks.apply(rank_to_exposure, axis=1) portfolio_returns = exposures * daily_returns portfolio_equity_curve = (portfolio_returns.mean(axis=1) + 1).cumprod() fig.add_trace(go.Scatter(x=portfolio_equity_curve.index, y = portfolio_equity_curve.values, name=f'Q{quantiles}-Q1')) # Update figure layout fig.update_layout(title_text=f"Daily Equity Curves<br><span style='font-size: 12px'>Equity Curves of Buying the Universe Constituents and Forming a Portfolio Based on the Factor Rankings</span>", margin=dict(l=0, r=0, b=0), legend=dict( yanchor="top", y=0.99, xanchor="left", x=0.01 )) fig.update_xaxes(title="Date", range=[portfolio_equity_curve.index[0], portfolio_equity_curve.index[-1]]) fig.show() class ETFUniverse: """ A class to create a universe of equities from the constituents of an ETF """ def __init__(self, etf_ticker, universe_date): """ Input: - etf_ticker Ticker of the ETF - universe_date The date to gather the constituents of the ETF """ self.etf_ticker = etf_ticker self.universe_date = universe_date def get_symbols(self, qb): """ Subscribes to the universe constituents and returns a list of symbols and their timezone Input: - qb The QuantBook instance inside the DatasetAnalyzer Returns a list of symbols and their timezone """ etf_symbols = self._get_etf_constituents(qb, self.etf_ticker, self.universe_date) security_timezone = None security_symbols = [] # Subscribe to the universe price data for symbol in etf_symbols: security = qb.AddSecurity(symbol, Resolution.Daily) security_timezone = security.Exchange.TimeZone security_symbols.append(symbol) return security_symbols, security_timezone def _get_etf_constituents(self, qb, etf_ticker, date): """ A helper method to retreive the ETF constituents on a given date Input: - qb The QuantBook instance inside the DatasetAnalyzer - etf_ticker Ticker of the ETF - universe_date The date to gather the constituents of the ETF Returns a list of symbols """ date_str = date.strftime("%Y%m%d") filename = f"/data/equity/usa/universes/etf/{etf_ticker.lower()}/{date_str}.csv" try: df = pd.read_csv(filename) except: print(f'Error: The ETF universe file does not exist') return security_ids = df[df.columns[1]].values symbols = [qb.Symbol(security_id) for security_id in security_ids] return symbols def show_video(url_code): display(IFrame(src=f"https://www.youtube.com/embed/{url_code}?rel=0&controls=0&showinfo=0" , width="560", height="315"))
from AlgorithmImports import * from pytz import timezone class Factor: """ A class to define factors from the dataset under analysis """ def __init__(self, name, printable_name, data_type, value_function): """ Input: - name Name of the factor as represented in the dataframe column of a history request - printable_name The name of the factor to be used when mentioning in plots and tables - data_type The type of data ('discrete' or 'continuous') - value_function User-defined value function to translate the raw factor values """ self.name = name self.printable_name = printable_name self.data_type = data_type self.value_function = value_function class OtherDatasetFactor: """ A class to define factors from other datasets (for the inter-dataset correlation analysis) """ def __init__(self, factor_name, data_type, link, sparse, value_function): """ Input: - factor_name Name of the factor as represented in the dataframe column of a history request - data_type The type of data ('discrete' or 'continuous') - link The linked (SecurityType.Equity if linked; 'REG' for unlinked Regalytics dataset) - value_function User-defined value function to translate the raw factor values """ self.factor_name = factor_name self.data_type = data_type self.link = link self.sparse = sparse self.value_function = value_function class DemoCorrelationDatasets: def __init__(self): self.other_dataset_factor_by_class = { QuiverWallStreetBets: [OtherDatasetFactor('rank', 'discrete', SecurityType.Equity, True, self.value_function), OtherDatasetFactor('sentiment', 'continuous', SecurityType.Equity, True, self.value_function)], QuiverQuantTwitterFollowers: [OtherDatasetFactor('followers', 'discrete', SecurityType.Equity, True, self.value_function)], USTreasuryYieldCurveRate: [OtherDatasetFactor('onemonth', 'continuous', "USTYCR", False, self.value_function)] } def value_function(self, other_dataset_history, this_dataset_index, other_dataset_timezone, this_dataset_timezone): """ This function transforms the dataset's raw data into a numerical value and aligns it with the timestamps of the securities in the universe. In this case, we just return the raw values since all the factors are processed data. """ # Match timezones if other_dataset_timezone != this_dataset_timezone: match_timezones_func = lambda time: time.replace(tzinfo=timezone(str(other_dataset_timezone))).astimezone(timezone(str(this_dataset_timezone))) other_dataset_history.index = other_dataset_history.index.map(match_timezones_func) result_df = pd.DataFrame(columns=other_dataset_history.columns) # If tz-aware index, remove tz-aware (so we can compare the indices in the snippet that comes after) if isinstance(other_dataset_history.index, pd.Index): new_index = pd.to_datetime(other_dataset_history.index, utc=True).tz_convert(None) other_dataset_history.index = new_index # Move dataset index forward by 1 day (since we open our trade on the day after we receive the factor value) other_dataset_history.index = other_dataset_history.index + timedelta(1) # Move forward index of other_dataset_history if its index elements don't align with `this_dataset_index` for i in other_dataset_history.index: adjusted_index_options = this_dataset_index[this_dataset_index >= i] if len(adjusted_index_options) == 0: continue adjusted_index = adjusted_index_options[0] row = other_dataset_history.loc[i] row.name = adjusted_index result_df.loc[adjusted_index] = row # Drop duplicate indices result_df = result_df[~result_df.index.duplicated(keep='last')] # Align factor values with this_dataset_index result_df = result_df.reindex(this_dataset_index) # Drop rows and columns that have only NaN values result_df = result_df.dropna(axis=0, how='all').dropna(axis=1, how='all') return result_df
#region imports from AlgorithmImports import * #endregion class UglyBrownMule(QCAlgorithm): def Initialize(self): self.SetStartDate(2021, 3, 21) # Set Start Date self.SetCash(100000) # Set Strategy Cash # self.AddEquity("SPY", Resolution.Minute) def OnData(self, data): '''OnData event is the primary entry point for your algorithm. Each new data point will be pumped in here. Arguments: data: Slice object keyed by symbol containing the stock data ''' # if not self.Portfolio.Invested: # self.SetHoldings("SPY", 1)