Overall Statistics |
Total Trades 1778 Average Win 0.42% Average Loss -0.19% Compounding Annual Return 9.734% Drawdown 13.000% Expectancy 0.019 Net Profit 1.986% Sharpe Ratio 0.472 Probabilistic Sharpe Ratio 39.811% Loss Rate 68% Win Rate 32% Profit-Loss Ratio 2.19 Alpha 0.61 Beta -0.483 Annual Standard Deviation 0.423 Annual Variance 0.179 Information Ratio -1.404 Tracking Error 0.463 Treynor Ratio -0.414 Total Fees $14352.15 |
import numpy as np import pandas as pd import scipy from sklearn import preprocessing from sklearn.metrics import roc_auc_score from sklearn.dummy import DummyClassifier from sklearn.linear_model import LogisticRegression from sklearn.ensemble import RandomForestClassifier from sklearn.model_selection import train_test_split from sklearn.pipeline import FeatureUnion, make_pipeline from sklearn.impute import SimpleImputer, MissingIndicator class MagicConch(QCAlgorithm): def Initialize(self): self.SetStartDate(2019, 1, 2) # Set Start Date self.SetEndDate(2020, 3, 31) # Set End Date self.SetCash(100000) # Set Strategy Cash self.etf_tickers = [ "EDC", "EDZ", "TYD", "TYO", "WTID", "WTIU", "SPXL", "SPXS", "TQQQ", "SQQQ", "TECL", "TECS", "DUST", "NUGT", "TVIX", ] self.trading_whitelist = self.etf_tickers self.trading_interval = 15 self.history_intervals = [ 2, 5, 10, 20, 30, 45, 60 ] self.classifier_days = 5 self.classifier_split = 0.2 self.classifier_min_confidence = 0.3 self.label_thresholds = 0.02 self.label_window_sizes = [ 15, 30, 45, 60 ] self.label_window_weights = [ 0.53, 0.53 * 0.5, 0.53 * 0.25, 0.53 * 0.125 ] self.symbols = { ticker: self.AddEquity(ticker, Resolution.Minute).Symbol for ticker in self.etf_tickers } self.field_getters = [ "Average", "Close", # "High", # "Low", # "Median", # "Open", # "SevenBar", # "Typical", "Volume", # "Weighted" ] self.indicators = {} for ticker in self.etf_tickers: for duration in self.history_intervals: for field in self.field_getters: feature_prefix = f"{duration}_{ticker}_{field.lower()}_" self.indicators[feature_prefix + "sma"] = self.SMA(ticker, duration, Resolution.Minute, getattr(Field, field)) self.indicators[feature_prefix + "apo"] = self.APO(ticker, duration, duration * 2, MovingAverageType.Simple, Resolution.Minute, getattr(Field, field)) self.SetWarmUp(60, Resolution.Minute) self.AddEquity("SPY", Resolution.Minute) self.SetBenchmark("SPY") self.Schedule.On(self.DateRules.EveryDay("SPY"), self.TimeRules.AfterMarketOpen("SPY", -30), self.PreMarketOpen) self.Schedule.On(self.DateRules.EveryDay("SPY"), self.TimeRules.BeforeMarketClose("SPY", 17), self.PreMarketClose) self.Schedule.On(self.DateRules.EveryDay("SPY"), self.TimeRules.BeforeMarketClose("SPY", -30), self.PostMarketClose) self.tick = 0 self.trading_end = False self.today_features = [] self.daily_features = [] self.models = {} def KillSwitch(self): self.trading_end = True self.Liquidate() def PreMarketOpen(self): self.tick = 0 self.trading_end = False self.today_features = [] if len(self.daily_features) >= self.classifier_days: self.models = self.BuildModels() def PreMarketClose(self): if not self.trading_end: self.KillSwitch() def PostMarketClose(self): current_time = self.Time ds = str(datetime.date(current_time)) today_df = pd.DataFrame(self.today_features) self.daily_features.append(today_df) self.Debug(f"On {ds}, training data shape: {today_df.shape}") self.Debug(f"On {ds}, last logged feature: {self.today_features[-1]}") if len(self.daily_features) > self.classifier_days: self.daily_features.pop(0) def ExtractFeatures(self, data): current_time = self.UtcTime features = {"timestamp": current_time} midnight = current_time.replace(hour=0, minute=0, second=0, microsecond=0) secs_since_midnight = current_time - midnight features["secs_since_midnight"] = secs_since_midnight.seconds for ticker in self.etf_tickers: for field in self.field_getters: if data.ContainsKey(ticker): value = getattr(Field, field)(data[ticker]) else: value = np.nan features[f"{ticker}_{field.lower()}_current"] = value for feature_name, indicator in self.indicators.items(): if indicator.IsReady: features[feature_name] = indicator.Current.Value else: features[feature_name] = np.nan self.today_features.append(features) return features def BuildLabels(self, data): asset_names = self.etf_tickers data["timestamp"] = pd.to_datetime(data["timestamp"], utc=True) data["timestamp"] = data["timestamp"].dt.floor("Min") data = data.set_index("timestamp") for asset_name in asset_names: for side in ["buy", "sell"]: for window_size in self.label_window_sizes: column_prefix = "{asset_name}_".format(asset_name=asset_name) close_column = column_prefix + "close_current" label_column = column_prefix + f"{side}_{window_size}m_label" if close_column in data: price_value = data[close_column] resampled_price_value = price_value.resample( pd.Timedelta(minutes=1) ).fillna(method="nearest") max_in_frame = ( resampled_price_value.rolling(window_size).max().shift(-window_size) ) max_in_frame = max_in_frame[price_value.index] change = max_in_frame / price_value - 1 change = change.fillna(0.0) if side == "buy": label = change > self.label_thresholds else: label = change < -self.label_thresholds ctr = label.dropna().sum() / len(label) # self.Debug(f"{label_column} CTR: {ctr:.4f}") else: label = 0.0 data[label_column] = label return data def BuildModels(self): current_time = self.Time ds = str(datetime.date(current_time)) asset_names = self.etf_tickers train_df = pd.DataFrame() for i, day_df in enumerate(self.daily_features): labeled_df = self.BuildLabels(day_df) train_df = train_df.append(labeled_df, ignore_index=True) self.Log(f"Model on {ds}, training data shape: {train_df.shape}") # train_df = train_df.drop(columns=["timestamp"]) # train_df, test_df = train_test_split( # train_df, test_size=self.classifier_split # ) label_names = [ label_name for label_name in train_df.columns if "label" in label_name ] training_data = train_df.drop(label_names, axis=1) # training_data = training_data.replace([-np.inf, np.inf], np.nan) X_train_raw = training_data.values feature_transformer = FeatureUnion( transformer_list=[ ("features", SimpleImputer(strategy="constant", fill_value=0.0)), ("indicators", MissingIndicator(features="all")), ] ) preprocessor = make_pipeline(feature_transformer, preprocessing.StandardScaler()) X_train = preprocessor.fit_transform(X_train_raw) models = {} for asset in asset_names: models[asset] = {} for window_size in self.label_window_sizes: models[asset][window_size] = {} for side in ["buy", "sell"]: column_prefix = "{asset}_{side}_{window_size}m_".format( asset=asset, side=side, window_size=window_size ) if (column_prefix + "label") in train_df.columns: pos_y_train =( train_df[column_prefix + "label"].values == True ).astype(np.float64) neg_y_train = ( train_df[column_prefix + "label"].values == False ).astype(np.float64) else: example_nums, _ = X_train.shape pos_y_train = np.zeros([example_nums]) neg_y_train = np.zeros([example_nums]) if pos_y_train.sum() > 0 and neg_y_train.sum() > 0: clf = LogisticRegression(max_iter=1200000) # clf = RandomForestClassifier() sampling_weight = ( neg_y_train.sum() / pos_y_train.sum() ) * pos_y_train sampling_weight += neg_y_train else: clf_output = 1 if pos_y_train.sum() > 0 else 0 pos_y_train = np.zeros_like(pos_y_train, dtype=np.float64) pos_y_train[-1] = 1.0 clf = DummyClassifier("constant", constant=clf_output) sampling_weight = np.ones_like(pos_y_train, dtype=np.float64) clf = clf.fit(X_train, pos_y_train, sample_weight=sampling_weight) model = { "features": list(training_data.columns), "preprocessor": preprocessor, "predictor": clf, "ds": ds, } models[asset][window_size][side] = model return models def Predict(self, model, features_map): feature_names = model["features"] preprocessor = model["preprocessor"] predictor = model["predictor"] features = np.array([features_map[feature_name] for feature_name in feature_names]) features = np.expand_dims(features, axis=0) # features = np.nan_to_num(features, copy=True) features = preprocessor.transform(features) prob = predictor.predict_proba(features)[0, 1] return prob def CalculateDistribution(self, features_map): current_time = self.Time ds = str(datetime.date(current_time)) asset_names = self.trading_whitelist models = self.models results = {asset: {"buy": 0.0, "sell": 0.0} for asset in asset_names} for window_size, window_weight in zip( self.label_window_sizes, self.label_window_weights, ): for asset in asset_names: results[asset]["buy"] += ( self.Predict(models[asset][window_size]["buy"], features_map) * window_weight ) results[asset]["sell"] += ( self.Predict(models[asset][window_size]["sell"], features_map) * window_weight ) asset_confidence = np.array( [results[asset]["buy"] - results[asset]["sell"] for asset in asset_names] ) asset_liquidate = asset_confidence <= self.classifier_min_confidence if asset_liquidate.all(): result = np.zeros_like(asset_liquidate, dtype=np.float64).tolist() else: asset_distribution = asset_confidence - 10 * asset_liquidate exp_range = np.exp(asset_distribution) asset_distribution = exp_range / exp_range.sum() result = asset_distribution.tolist() return result def OnData(self, data): self.tick += 1 asset_features = self.ExtractFeatures(data) if not self.trading_end and self.models and (self.tick % self.trading_interval == 0): asset_distribution = self.CalculateDistribution(asset_features) for asset_name, percent in zip(self.trading_whitelist, asset_distribution): self.Log(f"{asset_name:>20}: {percent:.4f}") self.SetHoldings(asset_name, percent)