Overall Statistics |
Total Trades 34 Average Win 1.23% Average Loss -0.46% Compounding Annual Return 19.425% Drawdown 0.700% Expectancy 2.008 Net Profit 16.878% Sharpe Ratio 2.307 Probabilistic Sharpe Ratio 99.708% Loss Rate 18% Win Rate 82% Profit-Loss Ratio 2.65 Alpha 0 Beta 0 Annual Standard Deviation 0.057 Annual Variance 0.003 Information Ratio 2.307 Tracking Error 0.057 Treynor Ratio 0 Total Fees $4763.05 Estimated Strategy Capacity $240000.00 Lowest Capacity Asset UVIX XX86EIYKAK4L Portfolio Turnover 10.52% |
from AlgorithmImports import * from datetime import date, time, datetime import pandas as pd import numpy as np from sklearn.neighbors import KNeighborsClassifier from sklearn.pipeline import Pipeline from sklearn.preprocessing import StandardScaler, MinMaxScaler from sklearn.model_selection import GridSearchCV import joblib from scipy.spatial.distance import pdist class ExampleAlpha(AlphaModel): def __init__(self, algorithm: QCAlgorithm, lookback: timedelta): ' Initialize a new instance of LorentzianClassification' self.symbols_data = {} self.algorithm = algorithm self.lookback = lookback def Update(self, algorithm: QCAlgorithm, data: Slice) -> List[Insight]: ' Generate insights as new data is fed through the algorithm.' insights = [] for sym in self.symbols_data.keys(): obj = self.symbols_data[sym] invested = self.algorithm.Portfolio[sym].Invested islong = self.algorithm.Portfolio[sym].IsLong holding_period = (timedelta(minutes=5) * obj.target_shift) - timedelta(minutes=1) if obj.IsReady() and self.EndOfTradingDay(self.algorithm.Time) > holding_period: if obj.prediction and not invested: insights.append(Insight.Price(sym, holding_period, InsightDirection.Up, float(0.001), None, None, 1.0)) return insights def OnSecuritiesChanged(self, algorithm: QCAlgorithm, changes: SecurityChanges) -> None: ' Make one-time actions on securities as they enter or exit the universe.' for security in changes.AddedSecurities: self.symbols_data[security.Symbol] = SymbolData(algorithm, security.Symbol, self.lookback) for security in changes.RemovedSecurities: if security.Symbol in self.symbols_data: symbol_data = self.symbols_data.pop(security.Symbol, None) if symbol_data: symbol_data.Dispose() def EndOfTradingDay(self, current: datetime) -> timedelta: ' Compute and return timedelta corresponding to the last minute of trading from the moment an insight is created' close = datetime.combine(self.algorithm.Time.date(), time(15, 58, 00)) return (close - current) class SymbolData: def __init__(self, algorithm: QCAlgorithm, symbol: Symbol, lookback: timedelta): ' Initialize a new instance of LorentzianClassification' self.algorithm = algorithm self.symbol = symbol self.lookback = lookback self.consolidator_period = timedelta(minutes=5) self.history = self.algorithm.History(self.symbol, self.lookback, self.algorithm.UniverseSettings.Resolution) self.window_size = timedelta(minutes=self.history.shape[0] * 0.95) // (self.consolidator_period) self.indicators = [] self.combinations = [] self.selected_features = [] self.rolling_windows = {} self.features = pd.DataFrame() self.training_data = pd.DataFrame() self.models = Models(algorithm=algorithm) self.last_trained = 0 self.prediction = 0 self.required_confidence = 0.60 self.target_shift = 4 # Create indicators self.adx = AverageDirectionalIndex(14) self.adx.Name = 'ADX' self.adx.Updated += (lambda sender, updated: self.rolling_windows[self.adx.Name].Add(updated)) self.rsi = RelativeStrengthIndex(14, movingAverageType=2) self.rsi.Name = 'RSI' self.rsi.Updated += (lambda sender, updated: self.rolling_windows[self.rsi.Name].Add(updated)) self.cci = CommodityChannelIndex(14, movingAverageType=2) self.cci.Name = 'CCI' self.cci.Updated += (lambda sender, updated: self.rolling_windows[self.cci.Name].Add(updated)) self.ema = ExponentialMovingAverage(20) self.ema.Name = 'EMA' self.ema.Updated += (lambda sender, updated: self.rolling_windows[self.ema.Name].Add(updated)) self.tema = TripleExponentialMovingAverage(40) self.tema.Name = 'TripleEMA' self.tema.Updated += (lambda sender, updated: self.rolling_windows[self.tema.Name].Add(updated)) self.crossover = IndicatorExtensions.Over(self.tema, self.ema, 'Crossover') self.crossover.Updated += (lambda sender, updated: self.rolling_windows[self.crossover.Name].Add(updated)) # Store indicators for easy indexing self.indicators.append(self.adx) self.indicators.append(self.rsi) self.indicators.append(self.cci) self.indicators.append(self.ema) self.indicators.append(self.tema) self.indicators.append(self.crossover) # Combination indicators are treated differently than regular indicators. They don't inherit from IIndicatorWarmUpPeriodProvider and so cannot benefit from WarmUpIndicator # And shouldn't be Registered to the consolidator. self.combinations.append(self.crossover) # Determine which features will be used in machine learning models self.selected_features = ['RSI', 'ADX', 'CCI', 'Crossover'] + ['volume'] # Create a consolidator to update the indicators and create a rolling window for consolidator bars. self.consolidator = TradeBarConsolidator(self.consolidator_period) self.consolidator.DataConsolidated += self.OnDataConsolidated self.rolling_windows['Price'] = RollingWindow[TradeBar](self.window_size) # Register the consolidator to update the indicators. self.algorithm.SubscriptionManager.AddConsolidator(self.symbol, self.consolidator) # Create rolling windows, Warm up the indicators, and update rolling windows for indicator in self.indicators: self.rolling_windows[indicator.Name] = RollingWindow[IndicatorDataPoint](self.window_size) if indicator not in self.combinations: self.algorithm.RegisterIndicator(self.symbol, indicator, self.consolidator) def OnDataConsolidated(self, sender: object, consolidated_bar: TradeBar) -> None: ' Performs actions when the cosolidator object emits a sender bar' # Update the consolidated price rolling window self.rolling_windows['Price'].Add(consolidated_bar) # Summarize all features into a single DataFrame on each consolidated bar. Use the most recent consolidated bar as prediction inputs. if self.IsReady(): self.features = self.PrepareInputs(consolidated_bar) # If the model is finished training predict the direction of the next n-bars. if self.models.trained_models['KNN'] is not None and not self.models.training: model = self.models.trained_models['KNN'] self.proba = model.predict_proba(self.features).flatten() self.prediction = [1 if model.predict(self.features)[0] and self.proba[1] > self.required_confidence else 0][0] if self.last_trained != self.algorithm.Time.isocalendar()[1]: self.last_trained = self.algorithm.Time.isocalendar()[1] # if self.last_trained != self.algorithm.Time.month: # self.last_trained = self.algorithm.Time.month training_features, training_targets = self.PrepareTrainingData() self.models.TrainModels( training_data = training_features.join(training_targets, how='inner', on='time'), ) else: if not self.algorithm.IsWarmingUp: windows = [ind for ind, win in self.rolling_windows.items() if not win.IsReady] indicators = [ind.Name for ind in self.indicators if not ind.IsReady] self.algorithm.Debug(f'Waiting on windows: {windows}, indicators: {indicators} : {self.algorithm.Time}') def PrepareTrainingData(self) -> pd.DataFrame: ' Summarizes the rolling windows into a single DataFrame. Returns training features and training targets.' # Reset the training data DataFrame self.training_data = self.algorithm.PandasConverter.GetDataFrame[TradeBar](list(self.rolling_windows['Price'])).loc[self.symbol] # Construct the training data DataFrame from all rolling windows for indicator in self.indicators: key = indicator.Name wind = self.rolling_windows[key] indicator_df = self.algorithm.PandasConverter.GetDataFrame[IndicatorDataPoint](list(wind)).reset_index().drop(columns='symbol').set_index('time').rename(columns = {'value': key}) self.training_data = self.training_data.join(indicator_df, how='inner', on='time') # Reverse the order of the DataFrame such that the most recent consolidated bar is the last row of the DataFrame self.training_data = self.training_data[::-1] # Generate the targets. Set the threshold of a positive bar to a factor of the standard deviation self.training_data['target'] = np.log(1 + self.training_data.close.pct_change()) - (np.log(1 + self.training_data.close.pct_change()).std()) self.training_data['target'] = np.sign(self.training_data.target).shift(-self.target_shift) self.training_data = self.training_data.dropna() training_features = self.training_data.loc[:, self.selected_features].copy() training_targets = self.training_data.iloc[:, -1].to_frame() return training_features, training_targets def PrepareInputs(self, consolidated_bar: TradeBar) -> pd.DataFrame: ' Extracts the most recent consolidated data into a single DataFrame' # Construct the feature data DataFrame from all rolling windows self.features = pd.DataFrame(data={x.Name: self.rolling_windows[x.Name][0].Value for x in self.indicators if x.Name in self.selected_features}, index=pd.Series(consolidated_bar.EndTime)) bar = self.rolling_windows['Price'][0] prices = pd.DataFrame(data={'open': bar.Open, 'high': bar.High, 'low': bar.Low, 'close': bar.Close, 'volume': bar.Volume}, index=pd.Series(bar.EndTime)) self.features = prices.join(self.features) return self.features.loc[:, self.selected_features] def IsReady(self) -> bool: ' Returns true if all rolling windows are ready' windows_ready = all([win.IsReady for ind, win in self.rolling_windows.items()]) indicators_ready = all([x.IsReady for x in self.indicators]) ready = all([windows_ready, indicators_ready]) return ready # If you have a dynamic universe, remove consolidators for the securities removed from the universe def Dispose(self) -> None: self.algorithm.SubscriptionManager.RemoveConsolidator(self.symbol, self.consolidator) class Models: ' Various models for forward looking insights.' def __init__(self, algorithm: QCAlgorithm): ' Initializes a new instance of Models' self.algorithm = algorithm self.training = False self.trained_models = { 'KNN': None } def Preprocess(self, training_data: pd.DataFrame) -> pd.DataFrame: ' Preprocess the summary DataFrame of all rolling windows.' # Reset the index and drop the timestamp # training_data = training_data.reset_index().drop(columns='time') # Separate the features from the targets train_x = training_data.iloc[:, :-1].copy() train_y = training_data.target.to_frame() return train_x, train_y def TrainModels(self, training_data: pd.DataFrame): ' Trains all models' if not self.training: self.training = True training_features, training_targets = self.Preprocess( training_data = training_data, ) self.TrainSklearnKNN( training_features = training_features, training_targets = training_targets, ) self.TradingViewLorentzianClassification( training_features = training_features, training_targets = training_targets, ) self.ResearchLorentzianClassification( training_features = training_features, training_targets = training_targets, ) if self.training: self.training = False def TrainSklearnKNN(self, training_features: pd.DataFrame, training_targets: pd.DataFrame): ' Trains, optimizes and saves an SKlearn K-Nearest Neighbors classifier' knn = KNeighborsClassifier() self.training = True param_grid = [ { 'n_neighbors': list(np.arange(2, 21, 2)), 'leaf_size': list(np.arange(1, 2, 1)), 'metric': ['cityblock'] }] gcv = GridSearchCV(estimator=knn, param_grid=param_grid, cv=10, verbose=0, n_jobs = -1).fit(training_features, training_targets) # self.algorithm.Debug(f'{gcv.best_params_}') pipe = Pipeline([ # ('scaler', MinMaxScaler()), ('scaler', StandardScaler()), # ('scaler', RobustScaler()), # ('pca', PCA()), ('nearest_neighbors', KNeighborsClassifier(**gcv.best_params_)) ]) model = pipe.fit(training_features, training_targets) # Store the most recent file in the models dictionary model_key = 'KNN' self.trained_models[model_key] = model self.training = False # Save the model to ObjectStore # file_name = self.algorithm.ObjectStore.GetFilePath(model_key) # joblib.dump(model, file_name) # self.algorithm.ObjectStore.Save(model_key) def TradingViewLorentzianClassification(self, training_features: pd.DataFrame, training_targets: pd.DataFrame): ' Replicates the model logic from the Trading View indicator: Lorentzian Classification' pass def ResearchLorentzianClassification(self, training_features: pd.DataFrame, training_targets: pd.DataFrame): ''' Modiifies the Trading View Lorentzian Classification by adhering to the findings from the following research paper: https://www.researchgate.net/publication/282441155_A_new_classification_method_by_using_Lorentzian_distance_metric ''' pass def LoadModel(self, model_key: str) -> None: ' Loads saved models if they exist' if self.algorithm.ObjectStore.ContainsKey(model_key): file_name = self.algorithm.ObjectStore.GetFilePath(model_key) return joblib.load(file_name)
from AlgorithmImports import * from datetime import date, time, datetime class LongVol(AlphaModel): def __init__(self, algorithm: QCAlgorithm, long_vol_symbols: list): ' Initialize' self.symbol_data_by_symbol = {} self.symbols = [ Symbol.Create(symbol, SecurityType.Equity, Market.USA).Value for symbol in long_vol_symbols ] self.algorithm = algorithm def Update(self, algorithm: QCAlgorithm, data: Slice) -> List[Insight]: ' Generate insights as new data is fed through the algorithm.' insights = [] for symbol in self.symbol_data_by_symbol.keys(): if self.symbol_data_by_symbol[symbol].indicator.Current.Value < -5 and not self.algorithm.Portfolio[symbol].Invested: insights.append(Insight.Price(symbol, timedelta(minutes=5), InsightDirection.Up, float(0.001), None, None, 1.0)) if self.symbol_data_by_symbol[symbol].indicator.Current.Value > 6 and not self.algorithm.Portfolio[symbol].Invested: insights.append(Insight.Price(symbol, self.EndOfTradingDay(self.algorithm.Time), InsightDirection.Up, float(0.001), None, None, 1.0)) return insights def OnSecuritiesChanged(self, algorithm: QCAlgorithm, changes: SecurityChanges) -> None: ' Make one-time actions on securities as they enter or exit the universe.' for security in changes.AddedSecurities: if security.Symbol in self.symbols: self.symbol_data_by_symbol[security.Symbol] = SymbolData(algorithm, security.Symbol) history = algorithm.History[TradeBar](security.Symbol, 5, algorithm.UniverseSettings.Resolution) self.symbol_data_by_symbol[security.Symbol].warmup_consolidator(history) for security in changes.RemovedSecurities: if security.Symbol in self.symbol_data_by_symbol: symbol_data = self.symbol_data_by_symbol.pop(security.Symbol, None) if symbol_data: symbol_data.dispose() def EndOfTradingDay(self, current: datetime) -> timedelta: ' Compute and return timedelta corresponding to the last minute of trading from the moment an insight is created' close = datetime.combine(self.algorithm.Time.date(), time(15, 58, 00)) return (close - current) class SymbolData: def __init__(self, algorithm, symbol): self.algorithm = algorithm self.symbol = symbol self.indicator = MomentumPercent(2) self.consolidator = TradeBarConsolidator(1) algorithm.SubscriptionManager.AddConsolidator(symbol, self.consolidator) algorithm.RegisterIndicator(symbol, self.indicator, self.consolidator) algorithm.WarmUpIndicator(self.symbol, self.indicator) def dispose(self): self.algorithm.SubscriptionManager.RemoveConsolidator(self.symbol, self.consolidator) def warmup_consolidator(self, history): for trade_bar in history: self.consolidator.Update(trade_bar)
from AlgorithmImports import * from datetime import date, time, datetime import pandas as pd import numpy as np import joblib from sklearn.neighbors import KNeighborsClassifier, NearestNeighbors from sklearn.pipeline import Pipeline from sklearn.decomposition import PCA from sklearn.preprocessing import StandardScaler, MinMaxScaler, RobustScaler from sklearn.model_selection import GridSearchCV from scipy.spatial.distance import pdist from collections import deque class LorentzianClassification(AlphaModel): def __init__(self, algorithm: QCAlgorithm, lookback: timedelta): ' Initialize a new instance of LorentzianClassification' self.symbols_data = {} self.algorithm = algorithm self.lookback = lookback def Update(self, algorithm: QCAlgorithm, data: Slice) -> List[Insight]: ' Generate insights as new data is fed through the algorithm.' insights = [] # Don't emit insights during warm up if self.algorithm.IsWarmingUp: return insights # Iterate through each symbol and generate insights for sym in self.symbols_data.keys(): obj = self.symbols_data[sym] invested = self.algorithm.Portfolio[sym].Invested holding_period = (timedelta(minutes=5) * obj.target_shift) - timedelta(minutes=1) ready = obj.IsReady() and self.EndOfTradingDay(self.algorithm.Time) > holding_period and not invested if not ready: return insights else: if obj.long: insights.append(Insight.Price(sym, holding_period, InsightDirection.Up, float(0.001), None, None, 1.0)) # if obj.short: # insights.append(Insight.Price(sym, holding_period, InsightDirection.Down, float(0.001), None, None, 1.0)) return insights def OnSecuritiesChanged(self, algorithm: QCAlgorithm, changes: SecurityChanges) -> None: ' Make one-time actions on securities as they enter or exit the universe.' for security in changes.AddedSecurities: self.symbols_data[security.Symbol] = SymbolData(algorithm, security.Symbol, self.lookback) for security in changes.RemovedSecurities: if security.Symbol in self.symbols_data: symbol_data = self.symbols_data.pop(security.Symbol, None) if symbol_data: symbol_data.Dispose() def EndOfTradingDay(self, current: datetime) -> timedelta: ' Compute and return timedelta corresponding to the last minute of trading from the moment an insight is created' close = datetime.combine(self.algorithm.Time.date(), time(15, 58, 00)) return (close - current) class SymbolData: def __init__(self, algorithm: QCAlgorithm, symbol: Symbol, lookback: timedelta): ' Initialize a new instance of LorentzianClassification' self.algorithm = algorithm self.symbol = symbol self.lookback = lookback self.consolidator_period = timedelta(minutes=5) self.history = self.algorithm.History(self.symbol, self.lookback, self.algorithm.UniverseSettings.Resolution) self.window_size = 666 self.indicators = [] self.combinations = [] self.selected_features = [] self.rolling_windows = {} self.features = pd.DataFrame() self.training_data = pd.DataFrame() self.models = Models(algorithm=algorithm) self.last_trained = 0 self.required_confidence = 0.65 self.target_shift = 4 self.long = False self.short = False # Create indicators self.adx = AverageDirectionalIndex(14) self.adx.Name = 'ADX' self.adx.Updated += (lambda sender, updated: self.rolling_windows[self.adx.Name].Add(updated)) self.rsi = RelativeStrengthIndex(14, movingAverageType=2) self.rsi.Name = 'RSI' self.rsi.Updated += (lambda sender, updated: self.rolling_windows[self.rsi.Name].Add(updated)) self.cci = CommodityChannelIndex(14, movingAverageType=2) self.cci.Name = 'CCI' self.cci.Updated += (lambda sender, updated: self.rolling_windows[self.cci.Name].Add(updated)) self.sema = ExponentialMovingAverage(15) self.sema.Name = 'Short_EMA' self.sema.Updated += (lambda sender, updated: self.rolling_windows[self.sema.Name].Add(updated)) self.lema = ExponentialMovingAverage(29) self.lema.Name = 'Long_EMA' self.lema.Updated += (lambda sender, updated: self.rolling_windows[self.lema.Name].Add(updated)) self.crossover = IndicatorExtensions.Over(self.sema, self.lema, 'Crossover') self.crossover.Updated += (lambda sender, updated: self.rolling_windows[self.crossover.Name].Add(updated)) # Store indicators for easy indexing self.indicators.append(self.adx) self.indicators.append(self.rsi) self.indicators.append(self.cci) self.indicators.append(self.sema) self.indicators.append(self.lema) self.indicators.append(self.crossover) # Combination indicators are treated differently than regular indicators. They don't inherit from IIndicatorWarmUpPeriodProvider and so cannot benefit from WarmUpIndicator # And shouldn't be Registered to the consolidator. self.combinations.append(self.crossover) # Determine which features will be used in machine learning models self.selected_features = ['RSI', 'ADX', 'CCI', 'Crossover'] + ['volume'] # Create a consolidator to update the indicators and create a rolling window for consolidator bars. self.consolidator = TradeBarConsolidator(self.consolidator_period) self.consolidator.DataConsolidated += self.OnDataConsolidated self.rolling_windows['Price'] = RollingWindow[TradeBar](self.window_size) # Register the consolidator to update the indicators. self.algorithm.SubscriptionManager.AddConsolidator(self.symbol, self.consolidator) # Create rolling windows, Warm up the indicators, and update rolling windows for indicator in self.indicators: self.rolling_windows[indicator.Name] = RollingWindow[IndicatorDataPoint](self.window_size) if indicator not in self.combinations: self.algorithm.RegisterIndicator(self.symbol, indicator, self.consolidator) def OnDataConsolidated(self, sender: object, consolidated_bar: TradeBar) -> None: ' Performs actions when the cosolidator object emits a sender bar' # Update the consolidated price rolling window self.rolling_windows['Price'].Add(consolidated_bar) # Summarize all features into a single DataFrame on each consolidated bar. Use the most recent consolidated bar as prediction inputs. if self.IsReady(): self.features = self.PrepareInputs(consolidated_bar) # If the model is finished training predict the direction of the next n-bars. if self.models.trained_models['KNN'] is not None and not self.models.training: model = self.models.trained_models['KNN'] prediction = model.predict(self.features)[0] self.proba = model.predict_proba(self.features).flatten() long_signal = self.proba[1] > self.required_confidence short_signal = self.proba[0] >= self.required_confidence # Determine position. if prediction and long_signal: self.algorithm.Debug(f'{self.last_trained}') self.long = True self.short = False elif not prediction and short_signal: self.long = False self.short = True else: self.long = False self.short = False if self.last_trained != self.algorithm.Time.isocalendar()[1]: self.last_trained = self.algorithm.Time.isocalendar()[1] # if self.last_trained != self.algorithm.Time.month: # self.last_trained = self.algorithm.Time.month # if self.last_trained != self.algorithm.Time.day: # self.last_trained = self.algorithm.Time.day training_features, training_targets = self.PrepareTrainingData() self.models.TrainModels( training_data = training_features.join(training_targets, how='inner', on='time'), ) else: if not self.algorithm.IsWarmingUp: windows = [ind for ind, win in self.rolling_windows.items() if not win.IsReady] indicators = [ind.Name for ind in self.indicators if not ind.IsReady] self.algorithm.Debug(f'Waiting on windows: {windows}, indicators: {indicators} : {self.algorithm.Time}') def PrepareTrainingData(self) -> pd.DataFrame: ' Summarizes the rolling windows into a single DataFrame. Returns training features and training targets.' # Reset the training data DataFrame self.training_data = self.algorithm.PandasConverter.GetDataFrame[TradeBar](list(self.rolling_windows['Price'])).loc[self.symbol] # Construct the training data DataFrame from all rolling windows for indicator in self.indicators: key = indicator.Name wind = self.rolling_windows[key] indicator_df = self.algorithm.PandasConverter.GetDataFrame[IndicatorDataPoint](list(wind)).reset_index().drop(columns='symbol').set_index('time').rename(columns = {'value': key}) self.training_data = self.training_data.join(indicator_df, how='inner', on='time') # Reverse the order of the DataFrame such that the most recent consolidated bar is the last row of the DataFrame self.training_data = self.training_data[::-1] # Generate the targets. Set the threshold of a positive bar to a factor of the standard deviation self.training_data['target'] = np.log(1 + self.training_data.close.pct_change()) - (np.log(1 + self.training_data.close.pct_change()).std()) self.training_data['target'] = (np.sign(self.training_data.target).shift(-self.target_shift) + 1) / 2 self.training_data = self.training_data.dropna() training_features = self.training_data.loc[:, self.selected_features].copy() training_targets = self.training_data.iloc[:, -1].to_frame() return training_features, training_targets def PrepareInputs(self, consolidated_bar: TradeBar) -> pd.DataFrame: ' Extracts the most recent consolidated data into a single DataFrame' # Construct the feature data DataFrame from all rolling windows self.features = pd.DataFrame(data={x.Name: self.rolling_windows[x.Name][0].Value for x in self.indicators if x.Name in self.selected_features}, index=pd.Series(consolidated_bar.EndTime)) bar = self.rolling_windows['Price'][0] prices = pd.DataFrame(data={'open': bar.Open, 'high': bar.High, 'low': bar.Low, 'close': bar.Close, 'volume': bar.Volume}, index=pd.Series(bar.EndTime)) self.features = prices.join(self.features) return self.features.loc[:, self.selected_features] def IsReady(self) -> bool: ' Returns true if all rolling windows are ready' windows_ready = all([win.IsReady for ind, win in self.rolling_windows.items()]) indicators_ready = all([x.IsReady for x in self.indicators]) ready = all([windows_ready, indicators_ready]) return ready # If you have a dynamic universe, remove consolidators for the securities removed from the universe def Dispose(self) -> None: self.algorithm.SubscriptionManager.RemoveConsolidator(self.symbol, self.consolidator) class Models: ' Various models for forward looking insights.' def __init__(self, algorithm: QCAlgorithm): ' Initializes a new instance of Models' self.algorithm = algorithm self.training = False self.trained_models = { 'KNN': None } def Preprocess(self, training_data: pd.DataFrame) -> pd.DataFrame: ' Preprocess the summary DataFrame of all rolling windows.' # Reset the index and drop the timestamp # training_data = training_data.reset_index().drop(columns='time') # Separate the features from the targets train_x = training_data.iloc[:, :-1].copy() train_y = training_data.target.to_frame() return train_x, train_y def TrainModels(self, training_data: pd.DataFrame): ' Trains all models' if not self.training: self.training = True training_features, training_targets = self.Preprocess( training_data = training_data, ) self.TrainSklearnKNN( training_features = training_features, training_targets = training_targets, ) self.TradingViewLorentzianClassification( training_features = training_features, training_targets = training_targets, ) self.ResearchLorentzianClassification( training_features = training_features, training_targets = training_targets, ) if self.training: self.training = False def TrainSklearnKNN(self, training_features: pd.DataFrame, training_targets: pd.DataFrame): ' Trains, optimizes and saves an SKlearn K-Nearest Neighbors classifier' knn = KNeighborsClassifier() self.training = True param_grid = [ { 'n_neighbors': list(np.arange(2, 21, 2)), 'leaf_size': list(np.arange(1, 2, 1)), 'metric': ['cityblock'] }] gcv = GridSearchCV(estimator=knn, param_grid=param_grid, cv=10, verbose=0, n_jobs = -1).fit(training_features, training_targets) # self.algorithm.Debug(f'{gcv.best_params_}') pipe = Pipeline([ # ('scaler', MinMaxScaler()), ('scaler', StandardScaler()), # ('scaler', RobustScaler()), # ('pca', PCA()), ('nearest_neighbors', KNeighborsClassifier(**gcv.best_params_)) ]) model = pipe.fit(training_features, training_targets) # Store the most recent file in the models dictionary model_key = 'KNN' self.trained_models[model_key] = model self.training = False # Save the model to ObjectStore # file_name = self.algorithm.ObjectStore.GetFilePath(model_key) # joblib.dump(model, file_name) # self.algorithm.ObjectStore.Save(model_key) def TradingViewLorentzianClassification(self, training_features: pd.DataFrame, training_targets: pd.DataFrame): ' Replicates the model logic from the Trading View indicator: Lorentzian Classification' pass # neighbors = 8 # distances = deque(maxlen=neighbors) # predictions = deque(maxlen=neighbors) # lastDistance = -1.0 # for i = 0 to sizeLoop //{ # d = get_lorentzian_distance(i, settings.featureCount, featureSeries, featureArrays) # if d >= lastDistance and i%4 //{ # lastDistance := d # array.push(distances, d) # array.push(predictions, math.round(array.get(y_train_array, i))) # if array.size(predictions) > neighbors //{ # lastDistance := array.get(distances, math.round(neighbors * (3/4))) # array.shift(distances) # array.shift(predictions) # //} # //} # //} # prediction := array.sum(predictions) def ResearchLorentzianClassification(self, training_features: pd.DataFrame, training_targets: pd.DataFrame): ''' Modiifies the Trading View Lorentzian Classification by adhering to the findings from the following research paper: https://www.researchgate.net/publication/282441155_A_new_classification_method_by_using_Lorentzian_distance_metric ''' pass def LoadModel(self, model_key: str) -> None: ' Loads saved models if they exist' if self.algorithm.ObjectStore.ContainsKey(model_key): file_name = self.algorithm.ObjectStore.GetFilePath(model_key) return joblib.load(file_name)
from AlgorithmImports import * from datetime import date, time, datetime class ShortVol(AlphaModel): def __init__(self, algorithm: QCAlgorithm, short_vol_symbols: list): ' Initialize' self.symbol_data_by_symbol = {} self.symbols = [ Symbol.Create(symbol, SecurityType.Equity, Market.USA).Value for symbol in short_vol_symbols ] self.algorithm = algorithm def Update(self, algorithm: QCAlgorithm, data: Slice) -> List[Insight]: ' Generate insights as new data is fed through the algorithm.' insights = [] # for symbol in self.symbol_data_by_symbol.keys(): # if self.symbol_data_by_symbol[symbol].indicator.Current.Value < -5 and not self.algorithm.Portfolio[symbol].Invested: # insights.append(Insight.Price(symbol, timedelta(minutes=5), InsightDirection.Up, float(0.001), None, None, 1.0)) return insights def OnSecuritiesChanged(self, algorithm: QCAlgorithm, changes: SecurityChanges) -> None: ' Make one-time actions on securities as they enter or exit the universe.' for security in changes.AddedSecurities: if security.Symbol in self.symbols: self.symbol_data_by_symbol[security.Symbol] = SymbolData(algorithm, security.Symbol) history = algorithm.History[TradeBar](security.Symbol, 5, algorithm.UniverseSettings.Resolution) self.symbol_data_by_symbol[security.Symbol].warmup_consolidator(history) for security in changes.RemovedSecurities: if security.Symbol in self.symbol_data_by_symbol: symbol_data = self.symbol_data_by_symbol.pop(security.Symbol, None) if symbol_data: symbol_data.dispose() def EndOfTradingDay(self, current: datetime) -> timedelta: ' Compute and return timedelta corresponding to the last minute of trading from the moment an insight is created' close = datetime.combine(self.algorithm.Time.date(), time(15, 58, 00)) return (close - current) class SymbolData: def __init__(self, algorithm, symbol): self.algorithm = algorithm self.symbol = symbol self.indicator = MomentumPercent(2) self.consolidator = TradeBarConsolidator(1) algorithm.SubscriptionManager.AddConsolidator(symbol, self.consolidator) algorithm.RegisterIndicator(symbol, self.indicator, self.consolidator) algorithm.WarmUpIndicator(self.symbol, self.indicator) def dispose(self): self.algorithm.SubscriptionManager.RemoveConsolidator(self.symbol, self.consolidator) def warmup_consolidator(self, history): for trade_bar in history: self.consolidator.Update(trade_bar)
# QUANTCONNECT.COM - Democratizing Finance, Empowering Individuals. # Lean Algorithmic Trading Engine v2.0. Copyright 2014 QuantConnect Corporation. # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. # You may obtain a copy of the License at http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. from AlgorithmImports import * class SpreadExecutionModel(ExecutionModel): '''Execution model that submits orders while the current spread is tight. Note this execution model will not work using Resolution.Daily since Exchange.ExchangeOpen will be false, suggested resolution is Minute ''' def __init__(self, acceptingSpreadPercent=0.005): '''Initializes a new instance of the SpreadExecutionModel class''' self.targetsCollection = PortfolioTargetCollection() # Gets or sets the maximum spread compare to current price in percentage. self.acceptingSpreadPercent = Math.Abs(acceptingSpreadPercent) def Execute(self, algorithm, targets): '''Executes market orders if the spread percentage to price is in desirable range. Args: algorithm: The algorithm instance targets: The portfolio targets''' # update the complete set of portfolio targets with the new targets self.targetsCollection.AddRange(targets) # for performance we check count value, OrderByMarginImpact and ClearFulfilled are expensive to call if not self.targetsCollection.IsEmpty: for target in self.targetsCollection.OrderByMarginImpact(algorithm): symbol = target.Symbol # calculate remaining quantity to be ordered unorderedQuantity = OrderSizing.GetUnorderedQuantity(algorithm, target) # check order entry conditions if unorderedQuantity != 0: # get security information security = algorithm.Securities[symbol] if self.SpreadIsFavorable(security): algorithm.MarketOrder(symbol, unorderedQuantity) self.targetsCollection.ClearFulfilled(algorithm) def SpreadIsFavorable(self, security): '''Determines if the spread is in desirable range.''' # Price has to be larger than zero to avoid zero division error, or negative price causing the spread percentage < 0 by error # Has to be in opening hours of exchange to avoid extreme spread in OTC period return security.Exchange.ExchangeOpen \ and security.Price > 0 and security.AskPrice > 0 and security.BidPrice > 0 \ and (security.AskPrice - security.BidPrice) / security.Price <= self.acceptingSpreadPercent
from AlgorithmImports import * class CapacityLimitedPortfolioConstructionModel(PortfolioConstructionModel): '''Respects a threshold percentage of asset volume to avoid market distortion of less liquid assets.''' def __init__(self, rebalance = Resolution.Daily, portfolioBias = PortfolioBias.LongShort): '''Initialize a new instance of CapacityLimitedPortfolioConstructionModel Args: rebalance: Rebalancing parameter. If it is a timedelta, date rules or Resolution, it will be converted into a function. If None will be ignored. The function returns the next expected rebalance time for a given algorithm UTC DateTime. The function returns null if unknown, in which case the function will be called again in the next loop. Returning current time will trigger rebalance. portfolioBias: Specifies the bias of the portfolio (Short, Long/Short, Long)''' super().__init__() self.portfolioBias = portfolioBias # If the argument is an instance of Resolution or Timedelta # Redefine rebalancingFunc rebalancingFunc = rebalance if isinstance(rebalance, int): rebalance = Extensions.ToTimeSpan(rebalance) if isinstance(rebalance, timedelta): rebalancingFunc = lambda dt: dt + rebalance if rebalancingFunc: self.SetRebalancingFunc(rebalancingFunc) def DetermineTargetPercent(self, activeInsights): '''Will determine the target percent for each insight Args: activeInsights: The active insights to generate a target for''' result = {} # give equal weighting to each security count = sum(x.Direction != InsightDirection.Flat and self.RespectPortfolioBias(x) for x in activeInsights) percent = 0 if count == 0 else 1.0 / count for insight in activeInsights: result[insight] = (insight.Direction if self.RespectPortfolioBias(insight) else InsightDirection.Flat) * percent return result def RespectPortfolioBias(self, insight): '''Method that will determine if a given insight respects the portfolio bias Args: insight: The insight to create a target for ''' return self.portfolioBias == PortfolioBias.LongShort or insight.Direction == self.portfolioBias
# QUANTCONNECT.COM - Democratizing Finance, Empowering Individuals. # Lean Algorithmic Trading Engine v2.0. Copyright 2014 QuantConnect Corporation. # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. # You may obtain a copy of the License at http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. from AlgorithmImports import * class EqualWeightingPortfolioConstructionModel(PortfolioConstructionModel): '''Provides an implementation of IPortfolioConstructionModel that gives equal weighting to all securities. The target percent holdings of each security is 1/N where N is the number of securities. For insights of direction InsightDirection.Up, long targets are returned and for insights of direction InsightDirection.Down, short targets are returned.''' def __init__(self, rebalance = Resolution.Daily, portfolioBias = PortfolioBias.LongShort): '''Initialize a new instance of EqualWeightingPortfolioConstructionModel Args: rebalance: Rebalancing parameter. If it is a timedelta, date rules or Resolution, it will be converted into a function. If None will be ignored. The function returns the next expected rebalance time for a given algorithm UTC DateTime. The function returns null if unknown, in which case the function will be called again in the next loop. Returning current time will trigger rebalance. portfolioBias: Specifies the bias of the portfolio (Short, Long/Short, Long)''' super().__init__() self.portfolioBias = portfolioBias # If the argument is an instance of Resolution or Timedelta # Redefine rebalancingFunc rebalancingFunc = rebalance if isinstance(rebalance, int): rebalance = Extensions.ToTimeSpan(rebalance) if isinstance(rebalance, timedelta): rebalancingFunc = lambda dt: dt + rebalance if rebalancingFunc: self.SetRebalancingFunc(rebalancingFunc) def DetermineTargetPercent(self, activeInsights): '''Will determine the target percent for each insight Args: activeInsights: The active insights to generate a target for''' result = {} # give equal weighting to each security count = sum(x.Direction != InsightDirection.Flat and self.RespectPortfolioBias(x) for x in activeInsights) percent = 0 if count == 0 else 1.0 / count for insight in activeInsights: result[insight] = (insight.Direction if self.RespectPortfolioBias(insight) else InsightDirection.Flat) * percent return result def RespectPortfolioBias(self, insight): '''Method that will determine if a given insight respects the portfolio bias Args: insight: The insight to create a target for ''' return self.portfolioBias == PortfolioBias.LongShort or insight.Direction == self.portfolioBias
from AlgorithmImports import * class TrailingStopRiskManagementModel(RiskManagementModel): '''Provides an implementation of IRiskManagementModel that limits the maximum possible loss measured from the highest unrealized profit''' def __init__(self, maximumDrawdownPercent = 0.05): '''Initializes a new instance of the TrailingStopRiskManagementModel class Args: maximumDrawdownPercent: The maximum percentage drawdown allowed for algorithm portfolio compared with the highest unrealized profit, defaults to 5% drawdown''' self.maximumDrawdownPercent = abs(maximumDrawdownPercent) self.trailingAbsoluteHoldingsState = dict() def ManageRisk(self, algorithm, targets): '''Manages the algorithm's risk at each time step Args: algorithm: The algorithm instance targets: The current portfolio targets to be assessed for risk''' riskAdjustedTargets = list() for kvp in algorithm.Securities: symbol = kvp.Key security = kvp.Value # Remove if not invested if not security.Invested: self.trailingAbsoluteHoldingsState.pop(symbol, None) continue position = PositionSide.Long if security.Holdings.IsLong else PositionSide.Short absoluteHoldingsValue = security.Holdings.AbsoluteHoldingsValue trailingAbsoluteHoldingsState = self.trailingAbsoluteHoldingsState.get(symbol) # Add newly invested security (if doesn't exist) or reset holdings state (if position changed) if trailingAbsoluteHoldingsState == None or position != trailingAbsoluteHoldingsState.position: self.trailingAbsoluteHoldingsState[symbol] = trailingAbsoluteHoldingsState = self.HoldingsState(position, security.Holdings.AbsoluteHoldingsCost) trailingAbsoluteHoldingsValue = trailingAbsoluteHoldingsState.absoluteHoldingsValue # Check for new max (for long position) or min (for short position) absolute holdings value if ((position == PositionSide.Long and trailingAbsoluteHoldingsValue < absoluteHoldingsValue) or (position == PositionSide.Short and trailingAbsoluteHoldingsValue > absoluteHoldingsValue)): self.trailingAbsoluteHoldingsState[symbol].absoluteHoldingsValue = absoluteHoldingsValue continue drawdown = abs((trailingAbsoluteHoldingsValue - absoluteHoldingsValue) / trailingAbsoluteHoldingsValue) if self.maximumDrawdownPercent < drawdown: self.trailingAbsoluteHoldingsState.pop(symbol, None) # liquidate riskAdjustedTargets.append(PortfolioTarget(symbol, 0)) return riskAdjustedTargets class HoldingsState: def __init__(self, position, absoluteHoldingsValue): self.position = position self.absoluteHoldingsValue = absoluteHoldingsValue
from AlgorithmImports import * from clr import GetClrType as typeof from Selection.UniverseSelectionModel import UniverseSelectionModel from itertools import groupby class ManualUniverseSelectionModel(UniverseSelectionModel): '''Provides an implementation of IUniverseSelectionModel that simply subscribes to the specified set of symbols''' def __init__(self, symbols = list(), universeSettings = None): self.MarketHours = MarketHoursDatabase.FromDataFolder() self.symbols = [ Symbol.Create(symbol, SecurityType.Equity, Market.USA) for symbol in symbols ] self.universeSettings = universeSettings for symbol in self.symbols: SymbolCache.Set(symbol.Value, symbol) def CreateUniverses(self, algorithm): '''Creates the universes for this algorithm. Called once after IAlgorithm.Initialize Args: algorithm: The algorithm instance to create universes for</param> Returns: The universes to be used by the algorithm''' universeSettings = self.universeSettings \ if self.universeSettings is not None else algorithm.UniverseSettings resolution = universeSettings.Resolution type = typeof(Tick) if resolution == Resolution.Tick else typeof(TradeBar) universes = list() # universe per security type/market self.symbols = sorted(self.symbols, key=lambda s: (s.ID.Market, s.SecurityType)) for key, grp in groupby(self.symbols, lambda s: (s.ID.Market, s.SecurityType)): market = key[0] securityType = key[1] securityTypeString = Extensions.GetEnumString(securityType, SecurityType) universeSymbol = Symbol.Create(f"manual-universe-selection-model-{securityTypeString}-{market}", securityType, market) if securityType == SecurityType.Base: # add an entry for this custom universe symbol -- we don't really know the time zone for sure, # but we set it to TimeZones.NewYork in AddData, also, since this is a manual universe, the time # zone doesn't actually matter since this universe specifically doesn't do anything with data. symbolString = MarketHoursDatabase.GetDatabaseSymbolKey(universeSymbol) alwaysOpen = SecurityExchangeHours.AlwaysOpen(TimeZones.NewYork) entry = self.MarketHours.SetEntry(market, symbolString, securityType, alwaysOpen, TimeZones.NewYork) else: entry = self.MarketHours.GetEntry(market, None, securityType) config = SubscriptionDataConfig(type, universeSymbol, resolution, entry.DataTimeZone, entry.ExchangeHours.TimeZone, False, False, True) universes.append( ManualUniverse(config, universeSettings, list(grp))) return universes
from AlgorithmImports import * from Selection.FundamentalUniverseSelectionModel import FundamentalUniverseSelectionModel from itertools import groupby from math import ceil class QC500UniverseSelectionModel(FundamentalUniverseSelectionModel): '''Defines the QC500 universe as a universe selection model for framework algorithm For details: https://github.com/QuantConnect/Lean/pull/1663''' def __init__(self, filterFineData = True, universeSettings = None): '''Initializes a new default instance of the QC500UniverseSelectionModel''' super().__init__(filterFineData, universeSettings) self.numberOfSymbolsCoarse = 750 self.numberOfSymbolsFine = 500 self.dollarVolumeBySymbol = {} self.lastMonth = -1 def SelectCoarse(self, algorithm, coarse): '''Performs coarse selection for the QC500 constituents. The stocks must have fundamental data The stock must have positive previous-day close price The stock must have positive volume on the previous trading day''' if algorithm.Time.month == self.lastMonth: return Universe.Unchanged sortedByDollarVolume = sorted([x for x in coarse if x.HasFundamentalData and x.Volume > 0 and x.Price > 0], key = lambda x: x.DollarVolume, reverse=True)[:self.numberOfSymbolsCoarse] self.dollarVolumeBySymbol = {x.Symbol:x.DollarVolume for x in sortedByDollarVolume} # If no security has met the QC500 criteria, the universe is unchanged. # A new selection will be attempted on the next trading day as self.lastMonth is not updated if len(self.dollarVolumeBySymbol) == 0: return Universe.Unchanged # return the symbol objects our sorted collection return list(self.dollarVolumeBySymbol.keys()) def SelectFine(self, algorithm, fine): '''Performs fine selection for the QC500 constituents The company's headquarter must in the U.S. The stock must be traded on either the NYSE or NASDAQ At least half a year since its initial public offering The stock's market cap must be greater than 500 million''' sortedBySector = sorted([x for x in fine if x.CompanyReference.CountryId == "USA" and x.CompanyReference.PrimaryExchangeID in ["NYS","NAS"] and (algorithm.Time - x.SecurityReference.IPODate).days > 180 and x.MarketCap > 5e8], key = lambda x: x.CompanyReference.IndustryTemplateCode) count = len(sortedBySector) # If no security has met the QC500 criteria, the universe is unchanged. # A new selection will be attempted on the next trading day as self.lastMonth is not updated if count == 0: return Universe.Unchanged # Update self.lastMonth after all QC500 criteria checks passed self.lastMonth = algorithm.Time.month percent = self.numberOfSymbolsFine / count sortedByDollarVolume = [] # select stocks with top dollar volume in every single sector for code, g in groupby(sortedBySector, lambda x: x.CompanyReference.IndustryTemplateCode): y = sorted(g, key = lambda x: self.dollarVolumeBySymbol[x.Symbol], reverse = True) c = ceil(len(y) * percent) sortedByDollarVolume.extend(y[:c]) sortedByDollarVolume = sorted(sortedByDollarVolume, key = lambda x: self.dollarVolumeBySymbol[x.Symbol], reverse=True) return [x.Symbol for x in sortedByDollarVolume[:self.numberOfSymbolsFine]]
# Generic Imports from AlgorithmImports import * # Import Alphas from project.Alphas.LongVol import LongVol from project.Alphas.ShortVol import ShortVol from project.Alphas.LorentzianKNN import LorentzianClassification # Import Universes from project.Universes.QC500 import QC500UniverseSelectionModel from project.Universes.ManualSelection import ManualUniverseSelectionModel # Import Portfolio Constructors from project.Portfolio_Constructions.EqualWeightPortfolioConstruction import EqualWeightingPortfolioConstructionModel # Import Execution Models from project.Executions.Execution import SpreadExecutionModel # Import Risk Management from project.Risk_Managements.RiskManagement import TrailingStopRiskManagementModel # Main. class VolatilityTrader(QCAlgorithm): def Initialize(self): self.lookback = timedelta(days=50) self.UniverseSettings.Resolution = Resolution.Minute self.SetStartDate(2022, 5, 1) # Set Start Date self.SetCash(1000000) # Set Strategy Cash # Framework self.SetBrokerageModel(InteractiveBrokersBrokerageModel()) self.SetUniverseSelection(ManualUniverseSelectionModel(['UVIX'])) self.AddAlpha(LorentzianClassification(algorithm=self, lookback=self.lookback)) # self.SetUniverseSelection(ManualUniverseSelectionModel(['UVIX', 'SVXY'])) # self.AddAlpha(LongVol(algorithm=self, long_vol_symbols=['UVIX'])) # self.AddAlpha(ShortVol(algorithm=self, short_vol_symbols=['SVXY'])) self.SetPortfolioConstruction(EqualWeightingPortfolioConstructionModel()) # self.SetExecution(SpreadExecutionModel()) # self.SetRiskManagement(TrailingStopRiskManagementModel(0.03)) self.SetWarmup(self.lookback, self.UniverseSettings.Resolution) def OnData(self, data: Slice): if self.IsWarmingUp: return