Overall Statistics |
Total Trades 2548 Average Win 0.13% Average Loss -0.10% Compounding Annual Return 17.296% Drawdown 8.000% Expectancy 0.128 Net Profit 17.296% Sharpe Ratio 1.193 Probabilistic Sharpe Ratio 54.983% Loss Rate 51% Win Rate 49% Profit-Loss Ratio 1.31 Alpha 0.147 Beta 0.115 Annual Standard Deviation 0.125 Annual Variance 0.016 Information Ratio 0.735 Tracking Error 0.175 Treynor Ratio 1.294 Total Fees $3387.84 Estimated Strategy Capacity $2800000.00 Lowest Capacity Asset LFC SUKR265DCACL |
# KFold time-based with also groups (tickers) import numpy as np from sklearn.model_selection._split import _BaseKFold, indexable, _num_samples class TimeSeriesSplitGroups(_BaseKFold): def __init__(self, n_splits=5): super().__init__(n_splits, shuffle=False, random_state=None) def split(self, X, y=None, groups=None): X, y, groups = indexable(X, y, groups) n_samples = _num_samples(X) n_splits = self.n_splits n_folds = n_splits + 1 group_list = np.unique(groups) n_groups = len(group_list) if n_folds > n_groups: raise ValueError( ("Cannot have number of folds ={0} greater" " than the number of samples: {1}.").format(n_folds, n_groups)) indices = np.arange(n_samples) test_size = (n_groups // n_folds) test_starts = range(test_size + n_groups % n_folds, n_groups, test_size) test_starts = list(test_starts)[::-1] for test_start in test_starts: yield (indices[groups.isin(group_list[:test_start])], indices[groups.isin(group_list[test_start:test_start + test_size])])
# Technical ML Alpha Model with persistent object storage from clr import AddReference AddReference("QuantConnect.Common") AddReference("QuantConnect.Algorithm") AddReference("QuantConnect.Algorithm.Framework") from QuantConnect import * from QuantConnect.Algorithm import * from QuantConnect.Algorithm.Framework import * from QuantConnect.Algorithm.Framework.Alphas import AlphaModel, Insight, InsightDirection import pickle from sklearn.pipeline import Pipeline from sklearn.decomposition import PCA from sklearn.neural_network import MLPClassifier from sklearn.model_selection import RandomizedSearchCV import timeseriessplitgroups as tss STEPS = [("pca", PCA()), ("mlp", MLPClassifier(n_iter_no_change=1, max_iter=100, solver="adam", early_stopping=True, warm_start=True, validation_fraction=0.2))] PARAMS = {"pca__n_components": [None, 0.9], "mlp__activation": ["logistic", "relu"], "mlp__alpha": [0.1, 0.01, 0.001, 0.0001, 0], "mlp__hidden_layer_sizes": [[96, ], [48, 48], [32, 32, 32]]} class MLTechnical(AlphaModel): def __init__(self, algorithm, model_key="mltechnical06"): self.algorithm = algorithm self.resolution = algorithm.UniverseSettings.Resolution self.model_key = model_key self.model = None #algorithm.ObjectStore.Delete(model_key) if algorithm.ObjectStore.ContainsKey(model_key): model_buffer = algorithm.ObjectStore.ReadBytes(model_key) self.model = pickle.loads(bytes(model_buffer)) self.lookbacks = [1, 5, 10, 21] def Update(self, algorithm, data): insights = [] if self.model is not None: predictions = self.predict() for idx in predictions.index: signal = predictions.loc[idx]["Signal"] symbol = self.algorithm.Symbol(idx[0]) direction = InsightDirection.Up if signal > 0 else InsightDirection.Down insights.append(Insight.Price(symbol, timedelta(days=1), direction, abs(signal), None)) self.algorithm.Debug(f"{self.algorithm.Time} {len(insights)} insights") return insights def train(self): if self.model is None: cv = tss.TimeSeriesSplitGroups(n_splits=10) self.model = RandomizedSearchCV(Pipeline(steps=STEPS), PARAMS, scoring="accuracy", cv=cv, n_iter=10, n_jobs=1) x, y = self.get_data(252 * 1, include_y=True) groups = x.index.get_level_values("time") self.model.fit(x, y, groups=groups) self.algorithm.ObjectStore.SaveBytes(self.model_key, pickle.dumps(self.model)) self.algorithm.Debug(f"{self.algorithm.Time} Model Validation {self.model.best_score_:.1%}") self.algorithm.Plot("Model", "Accuracy", self.model.best_score_) def predict(self): x = self.get_data(max(self.lookbacks) + 1, include_y=False) y = pd.DataFrame(self.model.predict_proba(x)[:, 1], index=x.index, columns=["Signal"]) return y #return y[dates == max(dates)] def get_data(self, datapoints=1, include_y=True): tickers = list(self.algorithm.ActiveSecurities.Keys) data = self.algorithm.History(tickers, datapoints, self.resolution) data["volatility"] = data["high"] - data["low"] data = data[["close", "volatility", "volume"]] groups = data.groupby("symbol") features = [groups.pct_change(p) for p in self.lookbacks] # Momentum features += [data/groups.apply(lambda x: x.rolling(p).mean()) # Normalized by average for p in self.lookbacks] features = pd.concat(features, join="inner", axis="columns").dropna() if include_y: target = groups["close"].pct_change(1).shift(-1) target = target.reindex_like(features).dropna() return features.loc[target.index], (target > 0).astype("float") else: return features
# First stable version from clr import AddReference AddReference("QuantConnect.Common") AddReference("QuantConnect.Algorithm") AddReference("QuantConnect.Algorithm.Framework") from QuantConnect import * from QuantConnect.Algorithm import * from QuantConnect.Algorithm.Framework import * from QuantConnect.Algorithm.Framework.Alphas import AlphaModel, Insight, InsightType, InsightDirection import datetime as dt from sklearn.linear_model import Ridge from sklearn.neural_network import MLPRegressor from sklearn.model_selection import train_test_split class MLTechnical(AlphaModel): ''' Provides an implementation of IAlphaModel that always returns the same insight for each security''' def __init__(self, algorithm, resolution=Resolution.Daily, model=None): self.algorithm = algorithm self.resolution = resolution self.model = model def Update(self, algorithm, data): insights = [] if self.model is not None: predictions = self.predict() for symbol in predictions.index.get_level_values("symbol"): signal = predictions["Signal"].loc[symbol].iloc[0] # TODO: Fix symbol = self.algorithm.Symbol(symbol) direction = InsightDirection.Up if signal > 0 else InsightDirection.Down insights.append(Insight.Price(symbol, timedelta(days=1), direction, abs(signal), None)) self.algorithm.Debug(f"Generated {len(insights)} insights") return insights def OnSecuritiesChanged(self, algorithm, changes): ''' Event fired each time the we add/remove securities from the data feed Args: algorithm: The algorithm instance that experienced the change in securities changes: The security additions and removals from the algorithm''' self.algorithm.Debug(f"Security Changed: {changes}") def train(self): if self.model is None: self.model = Ridge() """self.model = MLPRegressor(n_iter_no_change=1, max_iter=1000, early_stopping=True, solver="adam", validation_fraction=0.5, shuffle=True)""" x, y = self.get_data(252*10) y = y.dropna() x_train, x_test, y_train, y_test = train_test_split(x.loc[y.index], y, shuffle=True) self.model.fit(x_train, y_train) self.algorithm.Debug(f"Model trained - Score {self.model.score(x_test, y_test):.2f}") def predict(self): x, _ = self.get_data(22) y = pd.DataFrame(self.model.predict(x), index=x.index, columns=["Signal"]) dates = y.index.get_level_values("time") return y[dates==max(dates)] def get_data(self, datapoints=1): tickers = list(self.algorithm.ActiveSecurities.Keys) data = self.algorithm.History(tickers, datapoints, self.resolution) groups = data["close"].groupby("symbol") features = pd.concat([groups.pct_change(p) for p in [1, 5, 10, 21]], join="inner", axis="columns").dropna() target = groups.pct_change(1).shift(-1).reindex(features.index) return features, target
# Flexible execution model with stop and limit price options from clr import AddReference AddReference("System") AddReference("QuantConnect.Common") AddReference("QuantConnect.Algorithm") AddReference("QuantConnect.Algorithm.Framework") from System import * from QuantConnect import * from QuantConnect.Orders import * from QuantConnect.Algorithm import * from QuantConnect.Algorithm.Framework import * from QuantConnect.Algorithm.Framework.Execution import * from QuantConnect.Algorithm.Framework.Portfolio import * class CustomExecution(ExecutionModel): """Flexible Execution Model""" def __init__(self, limit_price=None, stop_price=None): """Initializes a new instance of the ImmediateExecutionModel class Equivalent to immediate execution when limit_price and stop_price are None Args: limit_order: Price margin to add to current price (or subtract for short) (Long: 0.01=101%*Current Price, -0.01=99%*Current Price, None=Market Order Short: 0.01=99%*Current Price, -0.01=101%*Current Price, None=Market Order) stop_loss: Loss in percentage before exiting the position (0.01=1% Loss, None=No stop loss)""" self.limit_price = limit_price self.stop_price = stop_price self.targetsCollection = PortfolioTargetCollection() def Execute(self, algorithm, targets): """Immediately submits orders for the specified portfolio targets. Args: algorithm: The algorithm instance targets: The portfolio targets to be ordered""" [algorithm.Transactions.CancelOrder(x.Id) for x in algorithm.Transactions.GetOpenOrders()] self.targetsCollection.AddRange(targets) #if self.targetsCollection.Count > 0: for target in self.targetsCollection.OrderByMarginImpact(algorithm): # calculate remaining quantity to be ordered quantity = OrderSizing.GetUnorderedQuantity(algorithm, target) price = algorithm.Securities[target.Symbol].Close sign = +1 if quantity > 0 else -1 if self.limit_price: limit = price * (1+self.limit_price*sign) if self.stop_price: stop = price * (1+self.stop_price*sign) ticket = algorithm.StopLimitOrder(target.Symbol, quantity, stop, limit) else: ticket = algorithm.LimitOrder(target.Symbol, quantity, limit) else: if self.stop_price: stop = price * (1-self.stop_price*sign) ticket = algorithm.StopMarketOrder(target.Symbol, quantity, stop) else: ticket = algorithm.MarketOrder(target.Symbol, quantity) self.targetsCollection.ClearFulfilled(algorithm)
# Custom Portfolio with flexible Long/Short exposure and number of positions from clr import AddReference AddReference("QuantConnect.Common") AddReference("QuantConnect.Algorithm.Framework") from QuantConnect import * from QuantConnect.Algorithm.Framework.Alphas import * from QuantConnect.Algorithm.Framework.Portfolio import * from itertools import groupby from datetime import datetime, timedelta class CustomPortfolio(PortfolioConstructionModel): """ Equal Weighted portfolio with flexible portfolio size and long/short exposure""" def __init__(self, long_short_ratio = 1.0, portfolio_size=1, rebalance = Resolution.Daily): """ Set up parameters for portfolio creation """ self.long_short_ratio = long_short_ratio self.portfolio_size = portfolio_size self.long_pos = int(portfolio_size*long_short_ratio) self.short_pos = portfolio_size - self.long_pos self.pos_size = 1.0/portfolio_size # If the argument is an instance of Resolution or Timedelta # Redefine rebalancingFunc rebalancingFunc = rebalance if isinstance(rebalance, int): rebalance = Extensions.ToTimeSpan(rebalance) elif isinstance(rebalance, timedelta): rebalancingFunc = lambda dt: dt + rebalance elif rebalancingFunc: self.SetRebalancingFunc(rebalancingFunc) def DetermineTargetPercent(self, activeInsights): """Rank insights by direction and magnitude to identify long and short trades""" rank = sorted(activeInsights, key=lambda x: (x.Direction, x.Magnitude), reverse=True) longs = {insight: self.pos_size for insight in rank[:self.long_pos]} shorts_idx = len(activeInsights)-self.short_pos shorts = {insight: -self.pos_size for insight in rank[shorts_idx:]} exits = {insight: 0 for insight in rank[self.long_pos:shorts_idx]} return {**longs , **shorts, **exits}
# Top X stocks by market capitalization with Volume and Price > 0 and fundamentals from clr import AddReference AddReference("System") AddReference("QuantConnect.Common") AddReference("QuantConnect.Algorithm.Framework") from QuantConnect.Data.UniverseSelection import * from Selection.FundamentalUniverseSelectionModel import FundamentalUniverseSelectionModel class TopMktCapStocks(FundamentalUniverseSelectionModel): """ Select top stocks by market cap which have fundamentals, positive volume and price """ def __init__(self, size=50, filterFineData = True, universeSettings = None): """Universe init""" super().__init__(filterFineData, universeSettings) self.size = size self.lastMonth = -1 def SelectCoarse(self, algorithm, coarse): """Select stocks with fundamental data, volume>0 and price>0""" if algorithm.Time.month == self.lastMonth: return Universe.Unchanged coarse_stocks = [x.Symbol for x in coarse if x.HasFundamentalData and x.Volume > 0 and x.Price > 0] return coarse_stocks def SelectFine(self, algorithm, fine): """Select top stocks by market cap""" fine_stocks = sorted([x for x in fine if x.MarketCap > 0], key=lambda x: x.MarketCap, reverse=True) return [x.Symbol for x in fine_stocks[:self.size]]
# Implementing GridSearch from clr import AddReference AddReference("QuantConnect.Common") AddReference("QuantConnect.Algorithm") AddReference("QuantConnect.Algorithm.Framework") from QuantConnect import * from QuantConnect.Algorithm import * from QuantConnect.Algorithm.Framework import * from QuantConnect.Algorithm.Framework.Alphas import AlphaModel, Insight, InsightType, InsightDirection import numpy as np import datetime as dt from sklearn.pipeline import Pipeline from sklearn.decomposition import PCA from sklearn.neural_network import MLPRegressor from sklearn.model_selection import RandomizedSearchCV import timeseriessplitgroups as tss STEPS = [("pca", PCA()), ("mlp", MLPRegressor(n_iter_no_change=1, max_iter=100, solver="adam", early_stopping=True, warm_start=True, validation_fraction=0.1))] PARAMS = {"pca__n_components": [None, 0.9], "mlp__activation": ["logistic", "relu"], "mlp__alpha": [0.1, 0.01, 0.001, 0.0001, 0], "mlp__hidden_layer_sizes": [[96, ], [48, 48], [32, 32, 32]]} class MLTechnical(AlphaModel): def __init__(self, algorithm, model=None): self.algorithm = algorithm self.resolution = algorithm.UniverseSettings.Resolution self.model = model self.lookbacks = [1, 5, 10, 21, 63] def Update(self, algorithm, data): insights = [] if self.model is not None: predictions = self.predict() for idx in predictions.index: signal = predictions.loc[idx]["Signal"] symbol = self.algorithm.Symbol(idx[0]) direction = InsightDirection.Up if signal > 0 else InsightDirection.Down insights.append(Insight.Price(symbol, timedelta(days=1), direction, abs(signal), None)) self.algorithm.Debug(f"{self.algorithm.Time} Generated {len(insights)} insights") return insights def OnSecuritiesChanged(self, algorithm, changes): pass def train(self): if self.model is None: cv = tss.TimeSeriesSplitGroups(n_splits=10) self.model = RandomizedSearchCV(Pipeline(steps=STEPS), PARAMS, scoring="neg_mean_absolute_error", cv=cv, n_iter=10, n_jobs=1) x, y = self.get_data(252*1) y = y.dropna() x = x.loc[y.index] groups = x.index.get_level_values("time") results = self.model.fit(x, y, groups=groups) self.algorithm.Debug(f"{self.algorithm.Time} Training score {self.model.best_score_}") def predict(self): x, _ = self.get_data(max(self.lookbacks)+1) y = pd.DataFrame(self.model.predict(x), index=x.index, columns=["Signal"]) dates = y.index.get_level_values("time") return y[dates==max(dates)] # Get last period prediction def get_data(self, datapoints=1): tickers = list(self.algorithm.ActiveSecurities.Keys) data = self.algorithm.History(tickers, datapoints, self.resolution) groups = data["close"].groupby("symbol") features = pd.concat([groups.pct_change(p) for p in self.lookbacks], join="inner", axis="columns").dropna() #features = data[["open","low","high"]].divide(data["close"], axis=0).dropna() #features = features.join(np.log10(data["volume"])).dropna() target = groups.pct_change(1).shift(-1).reindex(features.index) return features, target
# Implementing GridSearch from clr import AddReference AddReference("QuantConnect.Common") AddReference("QuantConnect.Algorithm") AddReference("QuantConnect.Algorithm.Framework") from QuantConnect import * from QuantConnect.Algorithm import * from QuantConnect.Algorithm.Framework import * from QuantConnect.Algorithm.Framework.Alphas import AlphaModel, Insight, InsightType, InsightDirection import datetime as dt from scipy.stats import spearmanr from sklearn.decomposition import PCA from sklearn.linear_model import Ridge from sklearn.neural_network import MLPRegressor from sklearn.metrics import make_scorer, mean_squared_error from sklearn.pipeline import Pipeline from sklearn.model_selection import RandomizedSearchCV import timeseriessplitgroups as tss STEPS = [("pca", PCA()), ("mlp", MLPRegressor(n_iter_no_change=1, max_iter=100, solver="adam", early_stopping=True, warm_start=True, validation_fraction=0.5))] PARAMS = {"pca__n_components": [None, 0.9], "mlp__activation": ["logistic", "relu"], "mlp__alpha": [0.1, 0.01, 0.001, 0.0001, 0], "mlp__hidden_layer_sizes": [[96, ], [48, 48], [32, 32, 32]]} class MLTechnical(AlphaModel): def __init__(self, algorithm, model=None): self.algorithm = algorithm self.resolution = algorithm.UniverseSettings.Resolution self.model = model def Update(self, algorithm, data): insights = [] if self.model is not None: predictions = self.predict() for idx in predictions.index: signal = predictions.loc[idx]["Signal"] symbol = self.algorithm.Symbol(idx[0]) direction = InsightDirection.Up if signal > 0 else InsightDirection.Down insights.append(Insight.Price(symbol, timedelta(days=1), direction, abs(signal), None)) self.algorithm.Debug(f"Generated {len(insights)} insights") return insights def OnSecuritiesChanged(self, algorithm, changes): pass def train(self): if self.model is None: cv = tss.TimeSeriesSplitGroups(n_splits=10) self.model = RandomizedSearchCV(Pipeline(steps=STEPS), PARAMS, scoring="r2", cv=cv, n_iter=10, n_jobs=1) x, y = self.get_data(252*1) y = y.dropna() x = x.loc[y.index] groups = x.index.get_level_values("time") results = self.model.fit(x, y, groups=groups) results = pd.DataFrame(results.cv_results_).sort_values("rank_test_score") results = results[["params", "mean_test_score", "std_test_score", "mean_fit_time"]] self.algorithm.Debug(f"Training results\n{results.to_string()}") def predict(self): x, _ = self.get_data(64) y = pd.DataFrame(self.model.predict(x), index=x.index, columns=["Signal"]) dates = y.index.get_level_values("time") return y[dates==max(dates)] def get_data(self, datapoints=1): tickers = list(self.algorithm.ActiveSecurities.Keys) data = self.algorithm.History(tickers, datapoints, self.resolution) groups = data["close"].groupby("symbol") features = pd.concat([groups.pct_change(p) for p in [1, 5, 10, 21]], join="inner", axis="columns").dropna() target = groups.pct_change(1).shift(-1).reindex(features.index) return features, target
# Algorith with ML Alpha based on Technical Features # Rolling monthly training, limit price execution # Flexible Long/Short exposure from Risk.MaximumDrawdownPercentPerSecurity import MaximumDrawdownPercentPerSecurity from alpha.mltechnical06 import MLTechnical from universe.topmktcap import TopMktCapStocks from portfolio.customportfolio import CustomPortfolio from execution.customexec import CustomExecution class DancingFluorescentYellowZebra(QCAlgorithm): def Initialize(self): self.SetStartDate(2015, 1, 1) self.SetEndDate(2016, 1, 1) self.SetCash(100000) self.SetBrokerageModel(BrokerageName.AlphaStreams) # Custom Universe - Top X stocks by market cap with fundamentals self.AddUniverseSelection(TopMktCapStocks(size=50)) self.UniverseSettings.Resolution = Resolution.Daily # Custom Portfolio - Flexible Long/Short exposure and portfolio size self.SetPortfolioConstruction(CustomPortfolio(long_short_ratio=1.0, portfolio_size=10)) # Custom Execution - Flexible order type with limit and stop options self.SetExecution(CustomExecution(limit_price=None, stop_price=None)) #self.AddRiskManagement(MaximumDrawdownPercentPerSecurity()) alpha = MLTechnical(self) self.AddAlpha(alpha) self.Train(self.DateRules.MonthStart(), self.TimeRules.At(0, 0), alpha.train)
# New Technical features and chart from clr import AddReference AddReference("QuantConnect.Common") AddReference("QuantConnect.Algorithm") AddReference("QuantConnect.Algorithm.Framework") from QuantConnect import * from QuantConnect.Algorithm import * from QuantConnect.Algorithm.Framework import * from QuantConnect.Algorithm.Framework.Alphas import AlphaModel, Insight, InsightType, InsightDirection from sklearn.pipeline import Pipeline from sklearn.decomposition import PCA from sklearn.neural_network import MLPClassifier from sklearn.model_selection import RandomizedSearchCV import timeseriessplitgroups as tss STEPS = [("pca", PCA()), ("mlp", MLPClassifier(n_iter_no_change=1, max_iter=100, solver="adam", early_stopping=True, warm_start=True, validation_fraction=0.2))] PARAMS = {"pca__n_components": [None, 0.9], "mlp__activation": ["logistic", "relu"], "mlp__alpha": [0.1, 0.01, 0.001, 0.0001, 0], "mlp__hidden_layer_sizes": [[96, ], [48, 48], [32, 32, 32]]} class MLTechnical(AlphaModel): def __init__(self, algorithm, model=None): self.algorithm = algorithm self.resolution = algorithm.UniverseSettings.Resolution self.model = model self.lookbacks = [1, 5, 10, 21, 63] def Update(self, algorithm, data): insights = [] if self.model is not None: predictions = self.predict() for idx in predictions.index: signal = predictions.loc[idx]["Signal"] symbol = self.algorithm.Symbol(idx[0]) direction = InsightDirection.Up if signal > 0 else InsightDirection.Down insights.append(Insight.Price(symbol, timedelta(days=1), direction, abs(signal), None)) self.algorithm.Debug(f"{self.algorithm.Time} {len(insights)} insights") return insights def train(self): if self.model is None: cv = tss.TimeSeriesSplitGroups(n_splits=10) self.model = RandomizedSearchCV(Pipeline(steps=STEPS), PARAMS, scoring="accuracy", cv=cv, n_iter=10, n_jobs=1) x, y = self.get_data(252 * 1, include_y=True) groups = x.index.get_level_values("time") self.model.fit(x, y, groups=groups) self.algorithm.Debug(f"{self.algorithm.Time} Model Validation {self.model.best_score_:.1%}") self.algorithm.Plot("Model", "Accuracy", self.model.best_score_) def predict(self): x = self.get_data(max(self.lookbacks) + 1, include_y=False) y = pd.DataFrame(self.model.predict_proba(x)[:, 1], index=x.index, columns=["Signal"]) dates = y.index.get_level_values("time") return y[dates == max(dates)] # Get last period prediction def get_data(self, datapoints=1, include_y=True): tickers = list(self.algorithm.ActiveSecurities.Keys) data = self.algorithm.History(tickers, datapoints, self.resolution) data["volatility"] = data["high"] - data["low"] data = data[["close", "volatility", "volume"]] data = data[["close"]] groups = data.groupby("symbol") features = [groups.pct_change(p) for p in self.lookbacks] # Momentum features += [data/groups.apply(lambda x: x.rolling(p).mean()) # Normalized average for p in self.lookbacks] features = pd.concat(features, join="inner", axis="columns").dropna() if include_y: target = data["close"].groupby("symbol").pct_change(1).shift(-1) target = target.reindex_like(features).dropna() return features.loc[target.index], (target > 0).astype("float") else: return features
# Improving target and CV from clr import AddReference AddReference("QuantConnect.Common") AddReference("QuantConnect.Algorithm") AddReference("QuantConnect.Algorithm.Framework") from QuantConnect import * from QuantConnect.Algorithm import * from QuantConnect.Algorithm.Framework import * from QuantConnect.Algorithm.Framework.Alphas import AlphaModel, Insight, InsightType, InsightDirection from sklearn.pipeline import Pipeline from sklearn.decomposition import PCA from sklearn.neural_network import MLPClassifier from sklearn.model_selection import RandomizedSearchCV import timeseriessplitgroups as tss STEPS = [("pca", PCA()), ("mlp", MLPClassifier(n_iter_no_change=1, max_iter=100, solver="adam", early_stopping=True, warm_start=True, validation_fraction=0.1))] PARAMS = {"pca__n_components": [None, 0.9], "mlp__activation": ["logistic", "relu"], "mlp__alpha": [0.1, 0.01, 0.001, 0.0001, 0], "mlp__hidden_layer_sizes": [[96, ], [48, 48], [32, 32, 32]]} class MLTechnical(AlphaModel): def __init__(self, algorithm, model=None): self.algorithm = algorithm self.resolution = algorithm.UniverseSettings.Resolution self.model = model self.lookbacks = [1, 5, 10, 21, 63] def Update(self, algorithm, data): insights = [] if self.model is not None: predictions = self.predict() for idx in predictions.index: signal = predictions.loc[idx]["Signal"] symbol = self.algorithm.Symbol(idx[0]) direction = InsightDirection.Up if signal > 0 else InsightDirection.Down insights.append(Insight.Price(symbol, timedelta(days=1), direction, abs(signal), None)) self.algorithm.Debug(f"{self.algorithm.Time} {len(insights)} insights") return insights def train(self): if self.model is None: cv = tss.TimeSeriesSplitGroups(n_splits=10) self.model = RandomizedSearchCV(Pipeline(steps=STEPS), PARAMS, cv=cv, n_iter=10, n_jobs=1) x, y = self.get_data(252 * 10, include_y=True) groups = x.index.get_level_values("time") self.model.fit(x, y, groups=groups) self.algorithm.Debug(f"{self.algorithm.Time} Model Validation {self.model.best_score_:.1%}") def predict(self): x = self.get_data(max(self.lookbacks) + 1, include_y=False) y = pd.DataFrame(self.model.predict_proba(x)[:, 1], index=x.index, columns=["Signal"]) dates = y.index.get_level_values("time") return y[dates == max(dates)] # Get last period prediction def get_data(self, datapoints=1, include_y=True): tickers = list(self.algorithm.ActiveSecurities.Keys) data = self.algorithm.History(tickers, datapoints, self.resolution) groups = data["close"].groupby("symbol") features = pd.concat([groups.pct_change(p) for p in self.lookbacks], join="inner", axis="columns").dropna() if include_y: target = groups.pct_change(1).shift(-1).reindex_like(features).dropna() return features.loc[target.index], (target > 0).astype("float") else: return features