Overall Statistics |
Total Orders 70 Average Win 5.68% Average Loss -2.61% Compounding Annual Return 17.613% Drawdown 22.600% Expectancy 0.634 Start Equity 100000 End Equity 162763.27 Net Profit 62.763% Sharpe Ratio 0.744 Sortino Ratio 0.76 Probabilistic Sharpe Ratio 31.280% Loss Rate 49% Win Rate 51% Profit-Loss Ratio 2.18 Alpha 0.058 Beta 0.351 Annual Standard Deviation 0.163 Annual Variance 0.027 Information Ratio -0.309 Tracking Error 0.191 Treynor Ratio 0.345 Total Fees $451.50 Estimated Strategy Capacity $13000000.00 Lowest Capacity Asset DXCM T7TBFF72DG11 Portfolio Turnover 3.28% |
""" This code is made public in accordance with the terms and conditions outlined by QuantConnect. For more information on these terms, please visit the QuantConnect terms of service page at: https://www.quantconnect.com/terms/ """ """ DISCLAMER: This trading algorithm is provided for research purposes only and does not constitute financial advice. Trading in financial markets involves substantial risk and is not suitable for every investor. Past performance is not indicative of future results. The author assumes no responsibility for any financial losses or damages incurred as a result of using this software. Use at your own risk. """ import numpy as np # Import NumPy for numerical operations from hmmlearn.hmm import GaussianHMM # Import Gaussian Hidden Markov Model from hmmlearn import torch # Import PyTorch import torch.nn as nn # Import PyTorch's neural network module import torch.optim as optim # Import optimization algorithms from PyTorch from AlgorithmImports import * # Import necessary classes and methods from QuantConnect from neural_network import NeuralNetwork # Import the custom neural network class DualModelAlphaGenerator(AlphaModel): """ A dual-model alpha generator combining Gaussian Hidden Markov Model (HMM) and a neural network. - Utilizes HMM to predict market states based on historical returns. - Uses a neural network to generate predictions based on recent price movements. - Generates trading insights by combining predictions from both models. """ def __init__(self, lookback=20, hmm_components=5, nn_input_size=5, nn_hidden_size=10, retrain_interval=30): """ Initializes the dual-model alpha generator with specified parameters. :param lookback: Number of periods to look back for rolling window data :param hmm_components: Number of hidden states in the HMM :param nn_input_size: Input size for the neural network :param nn_hidden_size: Hidden layer size for the neural network :param retrain_interval: Days between retraining the models """ super().__init__() # Initialize the parent AlphaModel class # Model parameters self.lookback = lookback self.hmm_components = hmm_components self.nn_input_size = nn_input_size self.nn_hidden_size = nn_hidden_size self.retrain_interval = retrain_interval # Initialize the models self.hmm = GaussianHMM(n_components=self.hmm_components) self.nn = NeuralNetwork(nn_input_size, nn_hidden_size, 1) # Set up the optimizer and loss function for the neural network self.optimizer = optim.Adam(self.nn.parameters(), lr=0.001) self.loss_function = nn.MSELoss() # Data storage and training state self.data = {} self.last_train_time = None def add_data(self, symbol, trade_bar): """ Add trade bar data to the rolling window for a given symbol. :param symbol: Symbol of the security :param trade_bar: TradeBar object containing the latest market data """ try: if symbol not in self.data: self.data[symbol] = RollingWindow[TradeBar](self.lookback) if self.is_valid_trade_bar(trade_bar): self.data[symbol].Add(trade_bar) else: print(f"Invalid trade bar for symbol {symbol}: {trade_bar}") except Exception as e: print(f"Error adding data for symbol {symbol}: {e}") def is_valid_trade_bar(self, trade_bar): """ Check if the trade bar has a valid close price. :param trade_bar: TradeBar object :return: True if valid, False otherwise """ return trade_bar.Close > 0 def is_valid_data(self, close_prices): """ Validate if close prices are all positive. :param close_prices: Array of close prices :return: True if all prices are positive, False otherwise """ return np.all(close_prices > 0) def update_models(self, algorithm): """ Retrain models if the retrain interval has passed. :param algorithm: The algorithm instance providing the current time """ try: if self.last_train_time is None or (algorithm.Time - self.last_train_time).days >= self.retrain_interval: self.retrain_models() self.last_train_time = algorithm.Time except Exception as e: print(f"Error updating models: {e}") def update(self, algorithm, data): """ Update the model and generate insights based on incoming data. :param algorithm: The algorithm instance :param data: Slice object containing the current market data :return: List of generated insights """ insights = [] self.update_models(algorithm) for symbol in data.Bars.Keys: trade_bar = data.Bars[symbol] self.add_data(symbol, trade_bar) if not self.data[symbol].IsReady: continue close_prices = np.array([bar.Close for bar in self.data[symbol]]) if not self.is_valid_data(close_prices): continue try: insights.extend(self.generate_insights(symbol, close_prices)) except Exception as e: print(f"Error generating insights for {symbol}: {e}") return insights def generate_insights(self, symbol, close_prices): """ Generate insights for a given symbol based on model predictions. :param symbol: Symbol of the security :param close_prices: Array of close prices :return: List of generated insights """ insights = [] # Calculate log returns from close prices returns = self.calculate_returns(close_prices) # Predict market states using HMM hmm_states = self.hmm_predict_states(returns) # Determine the best HMM state with the highest mean return best_state = self.get_best_hmm_state(returns, hmm_states) # Train the neural network and get the output prediction nn_output = self.train_neural_network(close_prices) # Determine the direction of the insight insight_direction = self.determine_insight_direction(hmm_states, best_state, nn_output, close_prices[-1]) # Append the generated insight with a confidence level insights.append(Insight.Price(symbol, timedelta(days=1), insight_direction, confidence=1)) return insights def calculate_returns(self, close_prices): """ Calculate log returns from close prices. :param close_prices: Array of close prices :return: Array of log returns """ return np.diff(np.log(close_prices)) def hmm_predict_states(self, returns): """ Fit HMM and predict states based on returns. :param returns: Array of log returns :return: Array of predicted HMM states """ try: self.hmm.fit(returns.reshape(-1, 1)) return self.hmm.predict(returns.reshape(-1, 1)) except Exception as e: print(f"Error in HMM state prediction: {e}") return np.zeros(len(returns), dtype=int) def get_best_hmm_state(self, returns, hmm_states): """ Identify the best HMM state with the highest mean return. :param returns: Array of log returns :param hmm_states: Array of HMM states :return: The state with the highest mean return """ try: category_means = [(state, np.mean(returns[hmm_states == state])) for state in np.unique(hmm_states)] return max(category_means, key=lambda x: x[1])[0] except Exception as e: print(f"Error determining best HMM state: {e}") return 0 def train_neural_network(self, close_prices): """ Train the neural network and return the output prediction. :param close_prices: Array of close prices :return: Output prediction from the neural network """ # Generate features for the neural network features = self.get_nn_features(close_prices) target = close_prices[-1] feature_tensor = torch.tensor(features, dtype=torch.float32) target_tensor = torch.tensor([target], dtype=torch.float32) try: # Zero the gradients, perform forward pass, compute loss, and backpropagate self.optimizer.zero_grad() nn_output = self.nn(feature_tensor) loss = self.loss_function(nn_output, target_tensor) loss.backward() self.optimizer.step() return nn_output.item() except Exception as e: print(f"Error training neural network: {e}") return target # Default to last known price if training fails def get_nn_features(self, close_prices): """ Generate features for the neural network from close prices. :param close_prices: Array of close prices :return: Array of features for the neural network """ try: return np.diff(close_prices[-(self.nn_input_size + 1):]) except Exception as e: print(f"Error generating NN features: {e}") return np.zeros(self.nn_input_size) # Return zeros if there's an error def determine_insight_direction(self, hmm_states, best_state, nn_output, last_price): """ Determine the direction of the insight based on model outputs. :param hmm_states: Array of HMM states :param best_state: Best HMM state with the highest mean return :param nn_output: Output from the neural network :param last_price: The last known price of the security :return: Insight direction (Up, Down, or Flat) """ try: if hmm_states[-1] == best_state and nn_output > last_price: return InsightDirection.Up elif hmm_states[-1] != best_state and nn_output < last_price: return InsightDirection.Down else: return InsightDirection.Flat except Exception as e: print(f"Error determining insight direction: {e}") return InsightDirection.Flat # Default to flat if an error occurs def retrain_models(self): """ Retrain both the HMM and neural network models with current data. """ try: for symbol, window in self.data.items(): if not window.IsReady: continue # Extract close prices and calculate returns close_prices = np.array([bar.Close for bar in window]) returns = self.calculate_returns(close_prices) # Fit HMM and train the neural network with current data self.hmm.fit(returns.reshape(-1, 1)) self.train_neural_network(close_prices) except Exception as e: print(f"Error retraining models: {e}")
import numpy as np from hmmlearn.hmm import GaussianHMM # For state classification HMM from statsmodels.tsa.regime_switching.markov_regression import MarkovRegression # For regime-switching HMM import torch import torch.nn as nn import torch.optim as optim from AlgorithmImports import * from neural_network import NeuralNetwork class DualModelAlphaGenerator(AlphaModel): def __init__(self, lookback=20, hmm_components=5, nn_input_size=5, nn_hidden_size=10, retrain_interval=30): super().__init__() self.lookback = lookback self.hmm_components = hmm_components self.nn_input_size = nn_input_size self.nn_hidden_size = nn_hidden_size self.retrain_interval = retrain_interval # Initialize HMMs: one for state classification and another for regime switching self.state_hmm = GaussianHMM(n_components=self.hmm_components) self.regime_hmm = None # Will initialize the MarkovRegression model later # Neural network initialization self.nn = NeuralNetwork(nn_input_size, nn_hidden_size, 1) self.optimizer = optim.Adam(self.nn.parameters(), lr=0.001) self.loss_function = nn.MSELoss() self.data = {} self.last_train_time = None def add_data(self, symbol, trade_bar): if symbol not in self.data: self.data[symbol] = RollingWindow[TradeBar](self.lookback) if self.is_valid_trade_bar(trade_bar): self.data[symbol].Add(trade_bar) def is_valid_trade_bar(self, trade_bar): return trade_bar.Close > 0 def is_valid_data(self, close_prices): return np.all(close_prices > 0) def update_models(self, algorithm): if self.last_train_time is None or (algorithm.Time - self.last_train_time).days >= self.retrain_interval: self.retrain_models() self.last_train_time = algorithm.Time def update(self, algorithm, data): insights = [] self.update_models(algorithm) for symbol in data.Bars.Keys: trade_bar = data.Bars[symbol] self.add_data(symbol, trade_bar) if not self.data[symbol].IsReady: continue close_prices = np.array([bar.Close for bar in self.data[symbol]]) if not self.is_valid_data(close_prices): continue try: insights.extend(self.generate_insights(symbol, close_prices)) except Exception as e: print(f"Error generating insights for {symbol}: {e}") return insights def generate_insights(self, symbol, close_prices): insights = [] returns = self.calculate_returns(close_prices) # State classification HMM state_hmm_states = self.hmm_predict_states(returns) # Regime-switching HMM (using MarkovRegression) if self.regime_hmm is None: self.regime_hmm = MarkovRegression(returns, k_regimes=2, switching_variance=True) regime_hmm_states = self.regime_hmm.fit().predict() best_state = self.get_best_hmm_state(returns, state_hmm_states) nn_output = self.train_neural_network(close_prices) insight_direction = self.determine_insight_direction(state_hmm_states, regime_hmm_states, best_state, nn_output, close_prices[-1]) insights.append(Insight.Price(symbol, timedelta(days=1), insight_direction, confidence=1)) return insights def calculate_returns(self, close_prices): return np.diff(np.log(close_prices)) def hmm_predict_states(self, returns): try: self.state_hmm.fit(returns.reshape(-1, 1)) return self.state_hmm.predict(returns.reshape(-1, 1)) except Exception as e: print(f"Error in HMM state prediction: {e}") return np.zeros(len(returns), dtype=int) def get_best_hmm_state(self, returns, hmm_states): try: category_means = [(state, np.mean(returns[hmm_states == state])) for state in np.unique(hmm_states)] return max(category_means, key=lambda x: x[1])[0] except Exception as e: print(f"Error determining best HMM state: {e}") return 0 def train_neural_network(self, close_prices): features = self.get_nn_features(close_prices) target = close_prices[-1] feature_tensor = torch.tensor(features, dtype=torch.float32) target_tensor = torch.tensor([target], dtype=torch.float32) try: self.optimizer.zero_grad() nn_output = self.nn(feature_tensor) loss = self.loss_function(nn_output, target_tensor) loss.backward() self.optimizer.step() return nn_output.item() except Exception as e: print(f"Error training neural network: {e}") return target def get_nn_features(self, close_prices): try: return np.diff(close_prices[-(self.nn_input_size + 1):]) except Exception as e: print(f"Error generating NN features: {e}") return np.zeros(self.nn_input_size) def determine_insight_direction(self, state_hmm_states, regime_hmm_states, best_state, nn_output, last_price): try: if state_hmm_states[-1] == best_state and regime_hmm_states[-1] == 0 and nn_output > last_price: return InsightDirection.Up elif state_hmm_states[-1] != best_state and regime_hmm_states[-1] == 1 and nn_output < last_price: return InsightDirection.Down else: return InsightDirection.Flat except Exception as e: print(f"Error determining insight direction: {e}") return InsightDirection.Flat def retrain_models(self): try: for symbol, window in self.data.items(): if not window.IsReady: continue close_prices = np.array([bar.Close for bar in window]) returns = self.calculate_returns(close_prices) # Retrain state classification HMM and neural network self.state_hmm.fit(returns.reshape(-1, 1)) self.train_neural_network(close_prices) except Exception as e: print(f"Error retraining models: {e}")
""" This code is made public in accordance with the terms and conditions outlined by QuantConnect. For more information on these terms, please visit the QuantConnect terms of service page at: https://www.quantconnect.com/terms/ """ """ DISCLAMER: This trading algorithm is provided for research purposes only and does not constitute financial advice. Trading in financial markets involves substantial risk and is not suitable for every investor. Past performance is not indicative of future results. The author assumes no responsibility for any financial losses or damages incurred as a result of using this software. Use at your own risk. """ from AlgorithmImports import * # Import necessary classes and methods from QuantConnect from alpha import DualModelAlphaGenerator # Import custom alpha generator class WellDressedSkyBlueSardine(QCAlgorithm): """ This algorithm implements a quantitative trading strategy using the QuantConnect platform. - Initializes with a specific start and end date, and an initial cash balance. - Uses a dual-model alpha generator for signal generation. - Constructs a portfolio using the Black-Litterman model optimized for maximum Sharpe ratio. - Applies risk management through maximum drawdown and trailing stop models. - Rebalances the universe of stocks based on a specific fundamental filter. """ def Initialize(self): """Initializes the algorithm with predefined settings and models.""" self.SetStartDate(2019, 1, 1) # Start date of backtest self.SetEndDate(2022, 1, 1) # End date of backtest self.SetCash(100000) # Starting capital for the strategy # Set brokerage model to Interactive Brokers self.SetBrokerageModel(BrokerageName.InteractiveBrokersBrokerage) # Set benchmark to SPY for performance comparison # self.SetBenchmark("SPY") # self.AddEquity("XLE", Resolution.Daily) # self.benchmarkTicker = 'XLE' # self.AddEquity("XLK", Resolution.Daily) # self.benchmarkTicker = 'XLK' # self.AddEquity("XLF", Resolution.Daily) # self.benchmarkTicker = 'XLF' # self.AddEquity("XLV", Resolution.Daily) # self.benchmarkTicker = 'XLV' # self.AddEquity("XLU", Resolution.Daily) # self.benchmarkTicker = 'XLU' self.AddEquity("SPY", Resolution.Daily) self.benchmarkTicker = 'SPY' self.SetBenchmark(self.benchmarkTicker) self.initBenchmarkPrice = 0 self.benchmarkExposure = 1 # Warm up period of 3 years to initialize indicators and models self.SetWarmUp(365 * 3) # Add universe selection model with fundamental filtering self.AddUniverse(self.FundamentalUniverseSelection) self.UniverseSettings.Resolution = Resolution.Daily # Set resolution for universe data # Add custom alpha generator self.AddAlpha(DualModelAlphaGenerator()) # Set up portfolio optimizer and construction model optimizer = MaximumSharpeRatioPortfolioOptimizer() self.SetPortfolioConstruction( BlackLittermanOptimizationPortfolioConstructionModel(optimizer=optimizer) ) # Add risk management models self.AddRiskManagement(MaximumDrawdownPercentPerSecurity()) self.AddRiskManagement(TrailingStopRiskManagementModel()) self.portfolio_targets = [] # List to store portfolio targets self.active_stocks = set() # Set to store active stocks in the universe # Schedule universe rebalancing every Monday at midnight self.Schedule.On( self.DateRules.Every(DayOfWeek.Monday), self.TimeRules.At(0, 0), self.RebalanceUniverse ) def FundamentalUniverseSelection(self, fundamental): """ Selects stocks based on fundamental data. - Filters stocks in the energy sector with a positive market cap. - Sorts filtered stocks by market capitalization in descending order. - Selects the top 20 stocks by market cap. :param fundamental: List of fundamental data objects :return: List of selected stock symbols """ # energy_sector_code = MorningstarSectorCode.ENERGY # Define sector code for energy # energy_sector_code = MorningstarSectorCode.TECHNOLOGY # energy_sector_code = MorningstarSectorCode.FINANCIAL_SERVICES # Define sector code for energy # energy_sector_code = MorningstarSectorCode.HEALTHCARE # energy_sector_code = MorningstarSectorCode.UTILITIES energy_sector_code = [MorningstarSectorCode.ENERGY, MorningstarSectorCode.FINANCIAL_SERVICES, MorningstarSectorCode.HEALTHCARE] # Filter stocks based on sector and market cap # filtered = [ # x for x in fundamental # if x.AssetClassification.MorningstarSectorCode == energy_sector_code and x.MarketCap > 0 # ] filtered = [ x for x in fundamental if x.AssetClassification.MorningstarSectorCode in energy_sector_code and x.MarketCap > 0 ] # Sort filtered stocks by market capitalization sorted_by_market_cap = sorted(filtered, key=lambda x: x.MarketCap, reverse=True) # Return top 20 stocks by market capitalization return [x.Symbol for x in sorted_by_market_cap][:60] def RebalanceUniverse(self): """Rebalances the universe of stocks at the specified schedule.""" self.UniverseSettings.Rebalance = Resolution.Daily self.Debug("Universe rebalanced at: " + str(self.Time)) def OnSecuritiesChanged(self, changes): """ Handles changes in the securities universe. - Updates active stocks set based on added securities. - Liquidates removed securities from the portfolio. - Computes equal weight for each active stock and sets portfolio targets. :param changes: SecurityChanges object containing added and removed securities """ # Update active stocks based on added securities self.active_stocks = {x.Symbol for x in changes.AddedSecurities} # Liquidate removed securities for x in changes.RemovedSecurities: self.Liquidate(x.Symbol) # Compute equal weight for each active stock if self.active_stocks: weight = 1.0 / len(self.active_stocks) self.portfolio_targets = [ PortfolioTarget(symbol, weight) for symbol in self.active_stocks ] def OnData(self, data): """ Handles incoming data and executes trades based on portfolio targets. - Skips processing if warming up or if data for all active stocks is not available. - Calculates the required trade quantity to achieve target portfolio weights. - Executes market orders to adjust holdings based on calculated quantities. :param data: Slice object containing current market data """ # Skip processing if warming up if self.IsWarmingUp: return self.UpdateBenchmarkValue() self.Plot('Strategy Equity', self.benchmarkTicker, self.benchmarkValue) # Skip processing if no targets or incomplete data if not self.portfolio_targets or not all(symbol in data for symbol in self.active_stocks): return # Iterate over portfolio targets and adjust holdings for target in self.portfolio_targets: symbol, target_weight = target.Symbol, target.Quantity # Skip if data for symbol is not available if not data.ContainsKey(symbol): continue # Calculate current and target values current_price = data[symbol].Price current_value = self.Portfolio[symbol].HoldingsValue target_value = self.Portfolio.TotalPortfolioValue * target_weight quantity = (target_value - current_value) / current_price # Execute market orders to adjust holdings if quantity > 0 and self.Portfolio.Cash >= quantity * current_price: self.MarketOrder(symbol, quantity) elif quantity < 0: self.MarketOrder(symbol, quantity) # Clear portfolio targets after orders are placed self.portfolio_targets = [] def UpdateBenchmarkValue(self): ''' Simulate buy and hold the Benchmark ''' # if self.initBenchmarkPrice is None: if self.initBenchmarkPrice == 0: # Use if Plotting Short Position of Benchmark self.initBenchmarkCash = self.Portfolio.Cash self.initBenchmarkPrice = self.Benchmark.Evaluate(self.Time) self.benchmarkValue = self.initBenchmarkCash else: currentBenchmarkPrice = self.Benchmark.Evaluate(self.Time) # self.benchmarkValue = (currentBenchmarkPrice / self.initBenchmarkPrice) * self.initBenchmarkCash # Use if Plotting Short Position of Benchmark lastReturn = ((currentBenchmarkPrice / self.initBenchmarkPrice) - 1) * self.benchmarkExposure self.benchmarkValue = (1 + lastReturn) * self.initBenchmarkCash
""" This code is made public in accordance with the terms and conditions outlined by QuantConnect. For more information on these terms, please visit the QuantConnect terms of service page at: https://www.quantconnect.com/terms/ """ """ DISCLAMER: This trading algorithm is provided for research purposes only and does not constitute financial advice. Trading in financial markets involves substantial risk and is not suitable for every investor. Past performance is not indicative of future results. The author assumes no responsibility for any financial losses or damages incurred as a result of using this software. Use at your own risk. """ import torch.nn as nn # Import PyTorch's neural network module from AlgorithmImports import * # Import necessary classes and methods from QuantConnect class NeuralNetwork(nn.Module): """ A neural network model implemented using PyTorch's nn.Module. - Consists of multiple hidden layers with ReLU activation functions. - Designed to process inputs and produce outputs through a series of linear transformations. """ def __init__(self, input_size, hidden_size, output_size): """ Initializes the neural network layers and activation function. :param input_size: Number of input features :param hidden_size: Number of neurons in each hidden layer :param output_size: Number of output features """ super(NeuralNetwork, self).__init__() # Call the parent class initializer # Define the hidden layers with specified input and output sizes self.hidden_layer1 = nn.Linear(input_size, hidden_size) # First hidden layer self.hidden_layer2 = nn.Linear(hidden_size, hidden_size) # Second hidden layer self.hidden_layer3 = nn.Linear(hidden_size, hidden_size) # Third hidden layer self.hidden_layer4 = nn.Linear(hidden_size, 5) # Fourth hidden layer self.hidden_layer5 = nn.Linear(5, 1) # Fifth hidden layer # Define the output layer self.output_layer = nn.Linear(1, output_size) # Final output layer # Define the activation function self.activation = nn.ReLU() # ReLU activation function for non-linearity def forward(self, x): """ Defines the forward pass of the neural network. - Applies ReLU activation to each hidden layer. - Processes the input through the series of hidden layers to the output layer. :param x: Input tensor :return: Output tensor after passing through the network """ # Pass the input through each layer with ReLU activation x = self.activation(self.hidden_layer1(x)) x = self.activation(self.hidden_layer2(x)) x = self.activation(self.hidden_layer3(x)) x = self.activation(self.hidden_layer4(x)) x = self.activation(self.hidden_layer5(x)) # Pass through the output layer to get the final output x = self.output_layer(x) return x
import numpy as np from hmmlearn.hmm import GaussianHMM # For state classification HMM from statsmodels.tsa.regime_switching.markov_regression import MarkovRegression # For regime-switching HMM import torch import torch.nn as nn import torch.optim as optim from AlgorithmImports import * from neural_network import NeuralNetwork class DualModelAlphaGenerator(AlphaModel): def __init__(self, lookback=20, hmm_components=5, nn_input_size=5, nn_hidden_size=10, retrain_interval=30): super().__init__() self.lookback = lookback self.hmm_components = hmm_components self.nn_input_size = nn_input_size self.nn_hidden_size = nn_hidden_size self.retrain_interval = retrain_interval # Initialize HMMs: one for state classification and another for regime switching self.state_hmm = GaussianHMM(n_components=self.hmm_components) self.regime_hmm = None # Will initialize the MarkovRegression model later # Neural network initialization self.nn = NeuralNetwork(nn_input_size, nn_hidden_size, 1) self.optimizer = optim.Adam(self.nn.parameters(), lr=0.001) self.loss_function = nn.MSELoss() self.data = {} self.last_train_time = None def add_data(self, symbol, trade_bar): if symbol not in self.data: self.data[symbol] = RollingWindow[TradeBar](self.lookback) if self.is_valid_trade_bar(trade_bar): self.data[symbol].Add(trade_bar) def is_valid_trade_bar(self, trade_bar): return trade_bar.Close > 0 def is_valid_data(self, close_prices): """ Validate if close prices are all positive. :param close_prices: Array of close prices :return: True if all prices are positive, False otherwise """ return np.all(close_prices > 0) def update_models(self, algorithm): if self.last_train_time is None or (algorithm.Time - self.last_train_time).days >= self.retrain_interval: self.retrain_models() self.last_train_time = algorithm.Time def update(self, algorithm, data): insights = [] self.update_models(algorithm) for symbol in data.Bars.Keys: trade_bar = data.Bars[symbol] self.add_data(symbol, trade_bar) if not self.data[symbol].IsReady: continue close_prices = np.array([bar.Close for bar in self.data[symbol]]) if not self.is_valid_data(close_prices): continue try: insights.extend(self.generate_insights(symbol, close_prices)) except Exception as e: print(f"Error generating insights for {symbol}: {e}") return insights def generate_insights(self, symbol, close_prices): insights = [] returns = self.calculate_returns(close_prices) # State classification HMM state_hmm_states = self.hmm_predict_states(returns) # Regime-switching HMM (using MarkovRegression) if self.regime_hmm is None: # Initialize MarkovRegression with desired number of regimes self.regime_hmm = MarkovRegression(returns, k_regimes=2, switching_variance=True) try: # Fit the model directly and get the predicted states regime_hmm_results = self.regime_hmm.fit() # Fit the model and assign results to a new variable regime_hmm_states = regime_hmm_results.predict() # Predict using the fitted model except Exception as e: print(f"Error fitting regime-switching HMM for {symbol}: {e}") regime_hmm_states = np.zeros(len(returns), dtype=int) best_state = self.get_best_hmm_state(returns, state_hmm_states) nn_output = self.train_neural_network(close_prices) insight_direction = self.determine_insight_direction(state_hmm_states, regime_hmm_states, best_state, nn_output, close_prices[-1]) insights.append(Insight.Price(symbol, timedelta(days=1), insight_direction, confidence=1)) return insights def calculate_returns(self, close_prices): return np.diff(np.log(close_prices)) def hmm_predict_states(self, returns): try: self.state_hmm.fit(returns.reshape(-1, 1)) return self.state_hmm.predict(returns.reshape(-1, 1)) except Exception as e: print(f"Error in HMM state prediction: {e}") return np.zeros(len(returns), dtype=int) def get_best_hmm_state(self, returns, hmm_states): try: category_means = [(state, np.mean(returns[hmm_states == state])) for state in np.unique(hmm_states)] return max(category_means, key=lambda x: x[1])[0] except Exception as e: print(f"Error determining best HMM state: {e}") return 0 def train_neural_network(self, close_prices): features = self.get_nn_features(close_prices) target = close_prices[-1] feature_tensor = torch.tensor(features, dtype=torch.float32).unsqueeze(0) # Add batch dimension target_tensor = torch.tensor([target], dtype=torch.float32).unsqueeze(0) # Add batch dimension try: self.optimizer.zero_grad() nn_output = self.nn(feature_tensor) loss = self.loss_function(nn_output, target_tensor) loss.backward() self.optimizer.step() return nn_output.item() except Exception as e: print(f"Error training neural network: {e}") return target def get_nn_features(self, close_prices): try: return np.diff(close_prices[-(self.nn_input_size + 1):]) except Exception as e: print(f"Error generating NN features: {e}") return np.zeros(self.nn_input_size) def determine_insight_direction(self, state_hmm_states, regime_hmm_states, best_state, nn_output, last_price): try: # Example logic combining both HMMs and neural network output # Adjust this logic based on your specific strategy requirements if state_hmm_states[-1] == best_state and regime_hmm_states[-1] == 0 and nn_output > last_price: return InsightDirection.Up elif state_hmm_states[-1] != best_state and regime_hmm_states[-1] == 1 and nn_output < last_price: return InsightDirection.Down else: return InsightDirection.Flat except Exception as e: print(f"Error determining insight direction: {e}") return InsightDirection.Flat def retrain_models(self): try: for symbol, window in self.data.items(): if not window.IsReady: continue close_prices = np.array([bar.Close for bar in window]) returns = self.calculate_returns(close_prices) # Retrain state classification HMM self.state_hmm.fit(returns.reshape(-1, 1)) # Retrain regime-switching HMM without overwriting self.regime_hmm try: regime_hmm_results = self.regime_hmm.fit() # Fit the model but don't overwrite self.regime_hmm except Exception as e: print(f"Error retraining regime-switching HMM for {symbol}: {e}") # Retrain neural network self.train_neural_network(close_prices) except Exception as e: print(f"Error retraining models: {e}")