Overall Statistics |
Total Orders 3654 Average Win 8.87% Average Loss -5.48% Compounding Annual Return 88.137% Drawdown 86.500% Expectancy 0.147 Start Equity 100000 End Equity 2066281.43 Net Profit 1966.281% Sharpe Ratio 0.902 Sortino Ratio 8.999 Probabilistic Sharpe Ratio 0.065% Loss Rate 56% Win Rate 44% Profit-Loss Ratio 1.62 Alpha 2.382 Beta 0.23 Annual Standard Deviation 2.666 Annual Variance 7.106 Information Ratio 0.866 Tracking Error 2.669 Treynor Ratio 10.433 Total Fees $4277.71 Estimated Strategy Capacity $0 Lowest Capacity Asset FB V6OIPNZEM8V9 Portfolio Turnover 4.85% |
import numpy as np from AlgorithmImports import * class AssetWeightCalculator: def __init__(self, algorithm: QCAlgorithm): self.algorithm = algorithm self.risk_free = self.algorithm.add_equity("BIL", Resolution.HOUR) def coarse_selection(self, coarse): """ Selects stonks, first filter """ # Sorts by dollar volume before taking top 200 sorted_by_volume = sorted([x for x in coarse if x.price > 10 and x.has_fundamental_data], key=lambda x: x.dollar_volume, reverse=True) return [x.symbol for x in sorted_by_volume][:200] def fine_selection(self, fine): """ Selects stonks, second filter """ filtered = [x.symbol for x in fine if x.market_cap is not None and x.market_cap > 10e9] self.algorithm.debug(f"Fine Selection: {len(filtered)} symbols passed filters") # Doing it this way makes it so that stocks are ranked on each universe update and then the macds can be redone with the scheduler in main ranked_symbols = self.rank_stocks(filtered) return ranked_symbols def calculate_sharpe_ratio(self, symbol, period=4914): # This is 3 yrs worth of trading days """ Calculates the sharpe """ try: # If a KeyValuePair was recieved only take the symbol if hasattr(symbol, "Key"): symbol = symbol.Key history = self.algorithm.history([symbol], period, Resolution.HOUR) if history.empty: self.algorithm.debug(f"No history for {symbol.value}") return None # Get risk-free rate rf_history = self.algorithm.history(self.risk_free.symbol, 1, Resolution.HOUR) risk_free_rate = rf_history['close'].iloc[-1]/100 if not rf_history.empty else 0.02 # Default to 2% if no data # Sharpe ratio logic returns = history['close'].pct_change().dropna() excess_returns = returns - (risk_free_rate/1638) mean_excess_return = excess_returns.mean() * 1638 std_dev = excess_returns.std() * np.sqrt(1638) return mean_excess_return / std_dev if std_dev != 0 else None except Exception as e: self.algorithm.debug(f"Error calculating Sharpe for {symbol.value}: {str(e)}") return None def rank_stocks(self, symbols): """ Ranks da top 50 stocks based on sharpe """ if not symbols: self.algorithm.debug("No symbols to rank") return [] self.algorithm.debug(f"Ranking {len(symbols)} symbols") # Converting from key pair if neccessary symbols = [s.Key if hasattr(s, 'Key') else s for s in symbols] scores = {symbol: self.calculate_sharpe_ratio(symbol) for symbol in symbols} valid_scores = {k: v for k, v in scores.items() if v is not None} self.algorithm.debug(f"Valid Sharpe ratios: {len(valid_scores)} out of {len(symbols)}") if not valid_scores: return [] sorted_scores = sorted(valid_scores, key=valid_scores.get, reverse=True)[:20] self.algorithm.log(f"All symbols before ranking: {[s.value for s in symbols]}") self.algorithm.log(f"Symbols after filtering: {[s.value for s in valid_scores.keys()]}") return sorted_scores def normalize_scores(self, scores): """ The list of scores from the ranking method are normalized using a z score so that an additive operation may be used in WeightCombiner() """ values = np.array(list(scores.values())) mean = np.mean(values) std_dev = np.std(values) if std_dev == 0: # If no variation in scores, assign equal normalized scores return {symbol: 0 for symbol in scores.keys()} normalized_scores = {symbol: (score - mean) / std_dev for symbol, score in scores.items()} print(normalized_scores) #To see output for debugging return normalized_scores
from AlgorithmImports import * class MACDSignalGenerator: def __init__(self, algorithm: QCAlgorithm, symbols: list, cash_buffer: float = 0.05): self.algorithm = algorithm self.symbols = symbols self.cash_buffer = cash_buffer self.macd_indicators = {} # {symbol: {variant: MACD}} # Define MACD parameters for different variants self.macd_variants = { "slow": {"fast": 12, "slow": 26, "signal": 9}, "slow-med": {"fast": 9, "slow": 19, "signal": 5}, "med-fast": {"fast": 7, "slow": 15, "signal": 3}, "fast": {"fast": 5, "slow": 12, "signal": 2}, } def remove_symbols(self, symbols: list): """ Removes MACD indicators for the specified symbols. """ for symbol in symbols: # Liquidate position before removing indicator self.algorithm.liquidate(symbol) # Unregister and delete indicators tied to each symbol if symbol in self.macd_indicators: for macd in self.macd_indicators[symbol].values(): # Better: gets MACD objects directly self.algorithm.unregister_indicator(macd) del self.macd_indicators[symbol] def add_symbols(self, new_symbols): """ Add in the new symbols that are given by AssetWeightCalculator. """ # Log initial attempt self.algorithm.debug(f"Attempting to add symbols: {[s.value for s in new_symbols]}") # Get historical data for new symbols history = self.algorithm.history([s for s in new_symbols], 35, # Longest MACD period needed Resolution.HOUR) # Log history data availability self.algorithm.debug(f"History data available for: {history.index.get_level_values(0).unique()}") self.symbols.extend(new_symbols) for symbol in new_symbols: security = self.algorithm.securities[symbol] # Detailed security check logging # self.algorithm.debug(f"Security {symbol.value} check:" # f" has_data={security.has_data}," # f" is_tradable={security.is_tradable}," # f" price={security.price}") # Checking if price is 0 if not (security.has_data and security.is_tradable and security.price > 0): self.algorithm.debug(f"Waiting for valid price data: {symbol.value}") continue # Adding the symbol if symbol not in self.macd_indicators: self.macd_indicators[symbol] = {} # Get symbol's historical data if symbol not in history.index.get_level_values(0): self.algorithm.debug(f"No history data for: {symbol.value}") continue symbol_history = history.loc[symbol] self.algorithm.debug(f"History rows for {symbol.value}: {len(symbol_history)}") for variant, params in self.macd_variants.items(): macd = self.algorithm.macd( symbol=symbol, fast_period=params["fast"], slow_period=params["slow"], signal_period=params["signal"], type=MovingAverageType.EXPONENTIAL, resolution=Resolution.HOUR, selector=Field.CLOSE ) self.macd_indicators[symbol][variant] = macd # Warm up MACD with historical data for time, row in symbol_history.iterrows(): macd.update(time, row['close']) self.macd_indicators[symbol][variant] = macd def calculate_position_sizes(self): position_sizes = {} max_position_limit = 0.1 # Check if we have any symbols to process if not self.symbols or not self.macd_indicators: self.algorithm.debug("No symbols available for position calculation") return position_sizes # Calculating the maximum one variant can be in size max_position = (1 - self.cash_buffer) / (len(self.symbols) * len(self.macd_variants)) for symbol in self.macd_indicators: position_sizes[symbol] = {} for variant, macd in self.macd_indicators[symbol].items(): if macd.is_ready: security = self.algorithm.securities[symbol] # Detailed security check logging # self.algorithm.debug(f"Position Check for {symbol.value}:" # f" has_data={security.has_data}," # f" is_tradable={security.is_tradable}," # f" price={security.price}," # f" last_data={security.get_last_data() is not None},") # More comprehensive check # if not (security.has_data and # security.is_tradable and # security.price > 0 and # security.get_last_data() is not None): # self.algorithm.debug(f"Security not ready: {symbol.value}") # continue # Distance between fast and slow distance = macd.fast.current.value - macd.slow.current.value # Normalize the distance as a percentage difference and then as a fraction of max position position_size = max_position * (distance / macd.slow.current.value) * 70 # Scalar value of max_position, the scalar integer can be though of as a form of leverage setting # Only allow positive positions, cap at maximum position_size = max(0, min(position_size, max_position_limit)) position_sizes[symbol][variant] = position_size #self.algorithm.debug(f"Calculated position for {symbol.value} {variant}: {position_size}") else: position_sizes[symbol][variant] = 0 # Running daily cause the logging is too heavy hourly if self.algorithm.time.hour == 10 and self.algorithm.time.minute == 0: rounded_positions = [(s.value, {k: round(v, 5) for k, v in sizes.items()}) for s, sizes in position_sizes.items()] #self.algorithm.debug(f"Daily position sizes proposed: {rounded_positions}") return position_sizes
# region imports from AlgorithmImports import * # endregion from QuantConnect import * from QuantConnect.Algorithm import * from QuantConnect.Data import * from QuantConnect.Indicators import * from datetime import timedelta import numpy as np import pandas as pd import torch import os import torch.nn as nn from strategy import KQTStrategy def calculate_ema(prices, span): # Using adjust=False mimics how many trading platforms calculate EMA return pd.Series(prices).ewm(span=span, adjust=False).mean().values def calculate_rsi(prices, period=14): """Calculate Relative Strength Index""" deltas = np.diff(prices) seed = deltas[:period+1] up = seed[seed >= 0].sum()/period down = -seed[seed < 0].sum()/period if down == 0: return 100 # No downward movement means RSI == 100 rs = up/down rsi = np.zeros_like(prices) rsi[:period] = 100. - 100./(1. + rs) for i in range(period, len(prices)): delta = deltas[i-1] if delta > 0: upval = delta downval = 0. else: upval = 0. downval = -delta up = (up * (period-1) + upval) / period down = (down * (period-1) + downval) / period rs = up/down if down != 0 else float('inf') rsi[i] = 100. - 100./(1. + rs) return rsi def calculate_macd(prices, fast=12, slow=26, signal=9): """Calculate MACD line and histogram""" # Convert to numpy array if not already prices = np.array(prices) # Calculate EMAs ema_fast = pd.Series(prices).ewm(span=fast, adjust=False).mean().values ema_slow = pd.Series(prices).ewm(span=slow, adjust=False).mean().values # Calculate MACD line and signal line macd_line = ema_fast - ema_slow signal_line = pd.Series(macd_line).ewm(span=signal, adjust=False).mean().values # Calculate histogram histogram = macd_line - signal_line return macd_line, signal_line, histogram def calculate_atr(high, low, close, period=14): """Calculate Average True Range""" if len(high) != len(low) or len(high) != len(close): raise ValueError("Input arrays must have the same length") tr = np.zeros(len(high)) tr[0] = high[0] - low[0] # Initial TR = high - low of first bar for i in range(1, len(tr)): tr[i] = max( high[i] - low[i], abs(high[i] - close[i-1]), abs(low[i] - close[i-1]) ) # Calculate ATR atr = np.zeros_like(tr) atr[0] = tr[0] for i in range(1, len(atr)): atr[i] = (atr[i-1] * (period-1) + tr[i]) / period return atr class KQTAlgorithm(QCAlgorithm): def Initialize(self): """Initialize the algorithm""" # Set start date, end date, and cash self.SetStartDate(2019, 1, 1) self.SetEndDate(2024, 12, 31) self.SetCash(1000000) self.previous_portfolio_value = 0 # Set benchmark to SPY self.SetBenchmark("SPY") # Initialize the KQT strategy self.strategy = KQTStrategy() self.lookback = 60 # Need enough data for technical indicators self.tickers = [] self.symbols = {} self.sector_mappings = {} self.strategy.sector_mappings = self.sector_mappings # Share the dictionary self._universe = self.AddUniverse(self.CoarseSelectionFunction, self.FineSelectionFunction) # Add SPY for market data self.spy = self.AddEquity("SPY", Resolution.Daily).Symbol # Storage for historical data and predictions self.stock_data = {} self.current_predictions = {} self.previous_positions = {} # Track stopped out positions self.stopped_out = set() # Schedule the trading function to run before market close self.Schedule.On(self.DateRules.EveryDay(), self.TimeRules.At(10, 0), # 10:00 AM Eastern self.TradeExecute) # Initialize model with feature count feature_count = 18 # Adjust based on your actual feature count # Load pre-trained model weights if available #self.TryLoadModelWeights() def CoarseSelectionFunction(self, coarse): sorted_by_dollar_volume = sorted(coarse, key=lambda x: x.DollarVolume, reverse=True) return [x.Symbol for x in sorted_by_dollar_volume[:500]] def FineSelectionFunction(self, fine): sorted_by_market_cap = sorted(fine, key=lambda x: x.MarketCap, reverse=True) selected = sorted_by_market_cap[:100] # Debug the first item to understand available properties if len(selected) > 0: f = selected[0] self.Debug(f"Available fundamental properties: {[attr for attr in dir(f) if not attr.startswith('_')]}") if hasattr(f, 'AssetClassification'): self.Debug(f"AssetClassification properties: {[attr for attr in dir(f.AssetClassification) if not attr.startswith('_')]}") for f in selected: ticker = f.Symbol.Value # Try multiple ways to get sector information sector = "Unknown" try: if hasattr(f, 'AssetClassification') and f.AssetClassification is not None: # Try commonly used sector properties if hasattr(f.AssetClassification, 'MorningstarSectorCode'): sector = str(f.AssetClassification.MorningstarSectorCode) elif hasattr(f.AssetClassification, 'MorningstarIndustryCode'): sector = str(f.AssetClassification.MorningstarIndustryCode) elif hasattr(f.AssetClassification, 'GicsCode'): sector = str(f.AssetClassification.GicsCode) # Additional fallbacks elif hasattr(f.AssetClassification, 'Sector'): sector = f.AssetClassification.Sector elif hasattr(f.AssetClassification, 'Industry'): sector = f.AssetClassification.Industry except Exception as e: self.Debug(f"Error getting sector for {ticker}: {str(e)}") self.sector_mappings[ticker] = sector return [f.Symbol for f in selected] def OnSecuritiesChanged(self, changes): self.Debug(f"Universe changed: Added {len(changes.AddedSecurities)}, Removed {len(changes.RemovedSecurities)}") for added in changes.AddedSecurities: self.Debug(f"Added: {added.Symbol.Value}") for removed in changes.RemovedSecurities: self.Debug(f"Removed: {removed.Symbol.Value}") for added in changes.AddedSecurities: ticker = added.Symbol.Value if ticker not in self.tickers: self.tickers.append(ticker) self.symbols[ticker] = added.Symbol for removed in changes.RemovedSecurities: ticker = removed.Symbol.Value if ticker in self.tickers: self.tickers.remove(ticker) if ticker in self.symbols: del self.symbols[ticker] if ticker in self.sector_mappings: del self.sector_mappings[ticker] if ticker in self.stock_data: del self.stock_data[ticker] def TryLoadModelWeights(self): """Try to load model weights from ObjectStore""" try: if self.ObjectStore.ContainsKey("kqt_model_weights"): self.Debug("Found model weights in ObjectStore, loading...") # Get base64 encoded string encoded_bytes = self.ObjectStore.Read("kqt_model_weights") # Decode back to binary import base64 model_bytes = base64.b64decode(encoded_bytes) # Save temporarily to file import tempfile with tempfile.NamedTemporaryFile(delete=False, suffix='.pth') as temp: temp_path = temp.name temp.write(model_bytes) # Extract input size from saved weights and reinitialize model try: state_dict = torch.load(temp_path) input_shape = state_dict['embedding.0.weight'].shape actual_input_size = input_shape[1] self.Debug(f"Detected input size from weights: {actual_input_size}") # Reinitialize model with correct input size self.Debug("Successfully loaded model weights") except Exception as inner_e: self.Debug(f"Error examining weights: {str(inner_e)}") # Clean up import os os.unlink(temp_path) else: self.Debug("No model weights found in ObjectStore") except Exception as e: self.Debug(f"Error loading model weights: {str(e)}") def OnData(self, data): """OnData event is the primary entry point for your algorithm.""" # We're using scheduled events instead of processing each data point pass def TradeExecute(self): """Execute trading logic with extreme safety measures to prevent leverage spikes""" # Check if we're in emergency safety mode in_emergency_mode = hasattr(self, 'EMERGENCY_SAFETY_MODE') and self.EMERGENCY_SAFETY_MODE if in_emergency_mode: days_since_transition = (self.Time - self.transition_date).days if hasattr(self, 'transition_date') else 0 self.Debug(f"EMERGENCY SAFETY MODE: Day {days_since_transition+1} after transition. " + f"Max allowed leverage: {self.MAX_LEVERAGE*100:.0f}%, Position cap: {self.POSITION_CAP*100:.0f}%") # Completely skip trading in the first 2 days for safety if days_since_transition < 2: self.Debug(f"EMERGENCY SAFETY: Skipping trading entirely for the first 2 days after transition") return # *** FIXED Market Open Check *** if not self.Securities.ContainsKey(self.spy) or not self.Securities[self.spy].Exchange.ExchangeOpen: return # Skip if market is closed or SPY data isn't loaded yet # Standard trade execution code continues... self.Debug(f"Current universe size: {len(self.tickers)}") self.Debug(f"Using sectors: {list(self.sector_mappings.keys())[:5]}...") self.Debug(f"TradeExecute running on {self.Time}") # 1. Update historical data for all stocks self.UpdateHistoricalData() # 2. Generate predictions for each stock self.current_predictions = self.GeneratePredictions() # 3. Check for stop losses self.ProcessStopLosses() # 4. Generate new position sizes market_returns = self.GetMarketReturns() target_positions = self.strategy.generate_positions(self.current_predictions, market_returns) self.Debug(f"Target positions before execution: {target_positions}") self.Debug(f"Market returns: {market_returns}") # 5. Execute trades to reach target positions self.ExecuteTrades(target_positions) # 6. Update portfolio return for regime detection daily_return = self.CalculatePortfolioReturn() self.strategy.update_portfolio_returns(daily_return) # 7. Add this line to store today's value for tomorrow's calculation self.previous_portfolio_value = self.Portfolio.TotalPortfolioValue self.Debug(f"Generated {len(target_positions)} positions: {target_positions}") def CalculatePortfolioReturn(self): """Calculate today's portfolio return""" # Get the portfolio value change current_value = self.Portfolio.TotalPortfolioValue # Use our stored previous value instead of the non-existent property if self.previous_portfolio_value > 0: return (current_value / self.previous_portfolio_value - 1) * 100 # as percentage # On first day, just store the value and return 0 self.previous_portfolio_value = current_value return 0 def UpdateHistoricalData(self): """Fetch and update historical data for all symbols""" for ticker in self.tickers: # Use the stored Symbol object instead of accessing Securities by ticker if ticker not in self.symbols: self.Debug(f"Symbol not found for ticker {ticker}, skipping") continue symbol = self.symbols[ticker] # Request sufficient history for features history = self.History(symbol, self.lookback, Resolution.Daily) if history.empty or len(history) < self.lookback: self.Debug(f"Not enough historical data for {ticker}, skipping") continue # Store historical data if isinstance(history.index, pd.MultiIndex): history_reset = history.reset_index() symbol_data = history_reset[history_reset['symbol'] == symbol] self.stock_data[ticker] = symbol_data else: self.stock_data[ticker] = history def GetMarketReturns(self): """Get recent market returns for regime detection""" spy_history = self.History(self.spy, 10, Resolution.Daily) if spy_history.empty: return [] # Handle both MultiIndex and regular index formats if isinstance(spy_history.index, pd.MultiIndex): spy_history_reset = spy_history.reset_index() spy_history_filtered = spy_history_reset[spy_history_reset['symbol'] == self.spy] spy_prices = spy_history_filtered['close'].values else: spy_prices = spy_history['close'].values # Calculate returns spy_returns = [] for i in range(1, len(spy_prices)): daily_return = (spy_prices[i] / spy_prices[i-1] - 1) * 100 spy_returns.append(daily_return) return spy_returns def GeneratePredictions(self): """Generate predictions for all stocks""" predictions = {} self.Debug(f"Generating predictions with {len(self.stock_data)} stocks in data") for ticker, history in self.stock_data.items(): try: if len(history) < self.lookback: self.Debug(f"Skipping {ticker}: Not enough history ({len(history)} < {self.lookback})") continue # Prepare data for this stock X, stock_df = self.strategy.prepare_stock_data(history, ticker) if X is None or len(X) == 0: # FALLBACK: Simple prediction if ML fails if isinstance(history.index, pd.MultiIndex): history_reset = history.reset_index() closes = history_reset[history_reset['symbol'] == self.symbols[ticker]]['close'].values else: closes = history['close'].values if len(closes) > 20: # Simple momentum strategy for fallback short_ma = np.mean(closes[-5:]) long_ma = np.mean(closes[-20:]) momentum = closes[-1] / closes[-10] - 1 if len(closes) > 10 else 0 pred_score = momentum + 0.5 * (short_ma/long_ma - 1) predictions[ticker] = { "pred_return": pred_score * 2, "composite_score": pred_score * 5 } self.Debug(f"Used fallback prediction for {ticker}: {pred_score}") continue # If we reach here, X must be valid data for the ML model # Generate prediction with ML model pred_return = self.strategy.predict_returns(X, ticker) # Now X is guaranteed not to be None # Check if prediction itself returned None (handle potential issues in predict_returns) if pred_return is None: self.Debug(f"ML prediction for {ticker} returned None. Skipping.") continue # Skip this ticker if ML prediction failed internally # Store ML prediction predictions[ticker] = { "pred_return": pred_return, "composite_score": pred_return / self.strategy.adaptive_threshold if self.strategy.adaptive_threshold != 0 else pred_return # Avoid division by zero } self.Debug(f"ML prediction for {ticker}: {pred_return}") except Exception as e: # Catch errors during data prep or ML prediction call itself self.Debug(f"Error processing {ticker} in GeneratePredictions loop: {str(e)}") import traceback self.Debug(traceback.format_exc()) # Log full traceback for debugging continue # Skip to next ticker on any error in the main try block self.Debug(f"Generated {len(predictions)} predictions") return predictions def ProcessStopLosses(self): """Check and process stop loss orders""" stop_loss_level = self.strategy.get_stop_loss_level() for ticker in self.tickers: if ticker not in self.symbols or not self.Portfolio[self.symbols[ticker]].Invested: continue symbol = self.symbols[ticker] position = self.Portfolio[symbol] # Get today's return history = self.History(symbol, 2, Resolution.Daily) if history.empty or len(history) < 2: continue # Handle both MultiIndex and regular index formats if isinstance(history.index, pd.MultiIndex): history_reset = history.reset_index() symbol_data = history_reset[history_reset['symbol'] == symbol] if len(symbol_data) < 2: continue close_prices = symbol_data['close'].values else: close_prices = history['close'].values daily_return = (close_prices[-1] / close_prices[-2] - 1) * 100 position_type = "long" if position.Quantity > 0 else "short" hit_stop = False if position_type == "long" and daily_return < stop_loss_level: hit_stop = True self.Debug(f"Stop loss triggered for {ticker} (long): {daily_return:.2f}%") elif position_type == "short" and daily_return > -stop_loss_level: hit_stop = True self.Debug(f"Stop loss triggered for {ticker} (short): {daily_return:.2f}%") if hit_stop: self.stopped_out.add(ticker) def ExecuteTrades(self, target_positions): """Execute trades with extreme caution to prevent leverage spikes""" if not target_positions: self.Debug("No target positions received") return # FIRST STEP: Check for emergency safety mode in_emergency_mode = hasattr(self, 'EMERGENCY_SAFETY_MODE') and self.EMERGENCY_SAFETY_MODE max_leverage_allowed = 0.8 # Default conservative cap max_position_size = 0.15 # Default conservative position cap if in_emergency_mode: max_leverage_allowed = self.MAX_LEVERAGE max_position_size = self.POSITION_CAP self.Debug(f"EMERGENCY SAFETY: Limiting allocation to {max_leverage_allowed*100:.0f}% with {max_position_size*100:.0f}% position cap") # CRITICAL STEP 1: Calculate and log current leverage BEFORE making changes portfolio_value = self.Portfolio.TotalPortfolioValue current_holdings_value = sum(holding.AbsoluteHoldingsValue for holding in self.Portfolio.Values if holding.Invested) current_leverage = current_holdings_value / portfolio_value if portfolio_value > 0 else 0 self.Debug(f"CURRENT LEVERAGE: {current_leverage:.3f}x - Holdings value: ${current_holdings_value:.2f}, Portfolio value: ${portfolio_value:.2f}") # CRITICAL STEP 2: Calculate total target allocation and compare to limit total_target_allocation = sum(abs(weight) for weight in target_positions.values()) self.Debug(f"ALLOCATION CHECK: Target allocation {total_target_allocation:.3f}x vs limit {max_leverage_allowed:.3f}x") # CRITICAL STEP 3: Enforce more aggressive scaling if needed if total_target_allocation > max_leverage_allowed: scaling_factor = max_leverage_allowed / total_target_allocation original_positions = target_positions.copy() # Save original for logging target_positions = {ticker: weight * scaling_factor for ticker, weight in target_positions.items()} # Log the scaling in detail self.Debug(f"LEVERAGE PROTECTION: Scaling down target allocation from {total_target_allocation:.3f}x to {max_leverage_allowed:.3f}x") # Log sample of position changes sample_tickers = list(original_positions.keys())[:5] # First 5 positions for ticker in sample_tickers: if ticker in original_positions and ticker in target_positions: self.Debug(f" {ticker}: {original_positions[ticker]:.4f} → {target_positions[ticker]:.4f}") # STEP 4: Enforce position size cap for ticker in list(target_positions.keys()): if abs(target_positions[ticker]) > max_position_size: original = target_positions[ticker] target_positions[ticker] = max_position_size * (1 if original > 0 else -1) self.Debug(f"POSITION CAP: Limiting {ticker} from {original:.4f} to {target_positions[ticker]:.4f}") # STEP 5: Additional safety - enforce maximum number of positions in emergency mode if in_emergency_mode: max_positions = 10 # Severely limit number of positions during emergency if len(target_positions) > max_positions: # Keep only the highest conviction positions top_positions = sorted(target_positions.items(), key=lambda x: abs(x[1]), reverse=True)[:max_positions] target_positions = {ticker: weight for ticker, weight in top_positions} self.Debug(f"SAFETY LIMIT: Restricted to top {max_positions} positions during emergency mode") # STEP 6: Calculate expected final leverage after proposed changes # This helps us detect if our changes would still result in too much leverage expected_total_allocation = sum(abs(weight) for weight in target_positions.values()) self.Debug(f"FINAL CHECK: Expected allocation after all safety measures: {expected_total_allocation:.3f}x") # STEP 7: Set a minimum cash buffer in emergency mode if in_emergency_mode: cash_buffer = 0.3 # Keep 30% in cash during emergency if expected_total_allocation > (1.0 - cash_buffer): final_scaling = (1.0 - cash_buffer) / expected_total_allocation target_positions = {ticker: weight * final_scaling for ticker, weight in target_positions.items()} self.Debug(f"CASH BUFFER: Added {cash_buffer*100:.0f}% cash buffer, final scaling: {final_scaling:.3f}x") # Now execute the trades with our ultra-safe target positions # Continue with the existing execution logic # Available buying power tracking - FIXED LINE available_buying_power = self.Portfolio.MarginRemaining # Use MarginRemaining instead of GetBuyingPower() self.Debug(f"Starting available margin: ${available_buying_power:.2f}") # Execute trades to reach target positions for ticker, target_weight in target_positions.items(): symbol = self.symbols[ticker] current_security = self.Securities[symbol] # Calculate target share amount price = current_security.Price target_value = portfolio_value * target_weight target_shares = int(target_value / price) if price > 0 else 0 # Get current holdings holding = self.Portfolio[symbol] current_shares = holding.Quantity # Calculate shares to trade shares_to_trade = target_shares - current_shares # Skip tiny orders if abs(shares_to_trade) > 0: try: # Check if we have enough buying power for this order required_buying_power = abs(shares_to_trade) * price * 1.01 # Add 1% buffer if shares_to_trade < 0 or required_buying_power <= available_buying_power: # Place the order if shares_to_trade > 0: self.MarketOrder(symbol, shares_to_trade) self.Debug(f"BUY {shares_to_trade} shares of {ticker} (${shares_to_trade * price:.2f})") available_buying_power -= required_buying_power else: self.MarketOrder(symbol, shares_to_trade) # Negative for sell self.Debug(f"SELL {abs(shares_to_trade)} shares of {ticker}") # Selling increases buying power available_buying_power += abs(shares_to_trade) * price * 0.98 # Assume 2% impact/fees else: # Not enough buying power, try reduced size max_affordable = int(available_buying_power / (price * 1.01)) if max_affordable > 0: self.MarketOrder(symbol, max_affordable) self.Debug(f"REDUCED BUY {max_affordable} shares of {ticker} due to buying power constraints") available_buying_power -= max_affordable * price * 1.01 else: self.Debug(f"Skipped {ticker} order: Insufficient buying power") except Exception as e: self.Debug(f"Order error for {ticker}: {str(e)}") # Store current positions for next day self.previous_positions = target_positions.copy() # After executing trades, verify total allocation and deleverage if needed current_allocation = sum(self.Portfolio[self.symbols[ticker]].AbsoluteHoldingsValue for ticker in target_positions if ticker in self.symbols and self.Portfolio[self.symbols[ticker]].Invested) / portfolio_value # FIX: Use max_leverage_allowed instead of max_allocation_limit if current_allocation > max_leverage_allowed * 1.05: # 5% buffer self.Debug(f"EMERGENCY: Current allocation {current_allocation*100:.1f}% exceeds limit, executing deleveraging") self._ExecuteEmergencyDeleveraging(max_leverage_allowed) def _LimitSectorExposure(self, target_positions, max_sector_allocation): """Limit exposure to any single sector during post-transition period""" # Group positions by sector sector_allocations = {} for ticker, weight in target_positions.items(): sector = self.sector_mappings.get(ticker, "Unknown") sector_allocations[sector] = sector_allocations.get(sector, 0) + abs(weight) # Identify sectors that exceed the limit for sector, allocation in sector_allocations.items(): if allocation > max_sector_allocation: scaling_factor = max_sector_allocation / allocation self.Debug(f"Limiting {sector} sector from {allocation*100:.1f}% to {max_sector_allocation*100:.1f}%") # Scale down all positions in this sector for ticker in list(target_positions.keys()): if self.sector_mappings.get(ticker, "Unknown") == sector: target_positions[ticker] *= scaling_factor def _ExecuteEmergencyDeleveraging(self, target_allocation): """Emergency deleveraging procedure to quickly reduce position sizes""" self.Debug("EXECUTING EMERGENCY DELEVERAGING") # Get current total allocation portfolio_value = self.Portfolio.TotalPortfolioValue current_allocation = sum(holding.AbsoluteHoldingsValue for holding in self.Portfolio.Values if holding.Invested) / portfolio_value if current_allocation <= target_allocation: self.Debug(f"Current allocation {current_allocation*100:.1f}% already within target {target_allocation*100:.1f}%") return # Calculate how much we need to reduce positions by reduction_factor = target_allocation / current_allocation self.Debug(f"Reducing all positions by {(1-reduction_factor)*100:.1f}%") # Get all positions sorted by size (largest first) positions = sorted( [(kvp.Key, kvp.Value) for kvp in self.Portfolio if kvp.Value.Invested], key=lambda p: abs(p[1].HoldingsValue), reverse=True ) # Reduce each position proportionally for symbol, position in positions: current_shares = position.Quantity # Calculate reduced position new_shares = int(current_shares * reduction_factor) shares_to_sell = current_shares - new_shares # Only execute if the change is significant if abs(shares_to_sell) > 0: self.MarketOrder(symbol, -shares_to_sell) # Negative to reduce position self.Debug(f"DELEVERAGING: Reduced {symbol} by {abs(shares_to_sell)} shares") self.Debug(f"DELEVERAGING COMPLETE: Target allocation {target_allocation*100:.1f}%") def _ExecuteEmergencyCircuitBreaker(self, target_leverage): """Last-resort emergency circuit breaker to prevent extreme leverage""" self.Debug("!!! EXECUTING EMERGENCY CIRCUIT BREAKER !!!") # Calculate current leverage portfolio_value = self.Portfolio.TotalPortfolioValue holdings_value = sum(holding.AbsoluteHoldingsValue for holding in self.Portfolio.Values if holding.Invested) current_leverage = holdings_value / portfolio_value if portfolio_value > 0 else 0 if current_leverage <= target_leverage: self.Debug(f"Circuit breaker check passed: {current_leverage:.2f}x <= {target_leverage:.2f}x") return # Calculate reduction needed reduction_factor = target_leverage / current_leverage self.Debug(f"EMERGENCY: Reducing all positions by {(1-reduction_factor)*100:.1f}%") # Get all positions sorted by size (largest first for biggest impact) positions = sorted( [(kvp.Key, kvp.Value) for kvp in self.Portfolio if kvp.Value.Invested], key=lambda p: abs(p[1].HoldingsValue), reverse=True ) # AGGRESSIVE APPROACH: Liquidate the largest positions first until we're safe total_reduced = 0 target_reduction = holdings_value - (target_leverage * portfolio_value) for symbol, position in positions: if total_reduced >= target_reduction: break # For largest positions, liquidate entirely for immediate effect self.Liquidate(symbol) total_reduced += position.AbsoluteHoldingsValue self.Debug(f"EMERGENCY LIQUIDATION: Completely liquidated {symbol} (${position.AbsoluteHoldingsValue:.2f})") self.Debug(f"CIRCUIT BREAKER COMPLETE: Liquidated ${total_reduced:.2f} in emergency response")
# PENDULUM SYSTEM for quant connect switching algos from AlgorithmImports import * from datetime import datetime, timedelta import calendar from kqtalgo import KQTAlgorithm from riskcontrol import MarketCapWeightedSP500Tracker class PendulumSystem(QCAlgorithm): def Initialize(self): """Initialize the pendulum system algorithm.""" # Set start date, end date, and cash self.SetStartDate(2020, 1, 1) self.set_brokerage_model(BrokerageName.INTERACTIVE_BROKERS_BROKERAGE, AccountType.CASH) self.SetEndDate(2025, 1, 1) self.SetCash(1000000) # Add SPY for market data self.spy = self.AddEquity("SPY", Resolution.Daily).Symbol # Add VIX for monitoring only (not for trading) self.vix = self.AddIndex("VIX", Resolution.Hour).Symbol # Pre-add securities needed by RiskControl algorithm # Add BIL security needed by RiskControl self.bil = self.AddEquity("BIL", Resolution.Daily).Symbol # Add defensive ETFs required by RiskControl # Inverse ETFs self.sh = self.AddEquity("SH", Resolution.Daily).Symbol self.psq = self.AddEquity("PSQ", Resolution.Daily).Symbol self.dog = self.AddEquity("DOG", Resolution.Daily).Symbol self.rwm = self.AddEquity("RWM", Resolution.Daily).Symbol self.eum = self.AddEquity("EUM", Resolution.Daily).Symbol self.myd = self.AddEquity("MYY", Resolution.Daily).Symbol # Alternative defensive ETFs self.gld = self.AddEquity("GLD", Resolution.Daily).Symbol self.ief = self.AddEquity("IEF", Resolution.Daily).Symbol self.bnd = self.AddEquity("BND", Resolution.Daily).Symbol # Sector defensive ETFs self.xlp = self.AddEquity("XLP", Resolution.Daily).Symbol self.xlu = self.AddEquity("XLU", Resolution.Daily).Symbol self.xlv = self.AddEquity("XLV", Resolution.Daily).Symbol self.vht = self.AddEquity("VHT", Resolution.Daily).Symbol self.vdc = self.AddEquity("VDC", Resolution.Daily).Symbol # Set benchmark to SPY self.SetBenchmark("SPY") # Set universe settings self.UniverseSettings.Resolution = Resolution.Daily self.UniverseSettings.ExtendedMarketHours = False # Add universe selection for KQT algorithm self._universe = self.AddUniverse(self.CoarseSelectionFunction, self.FineSelectionFunction) # Add reference to all ETFs for RiskControl to use self.all_defensive_etfs = [self.sh, self.psq, self.dog, self.rwm, self.eum, self.myd, self.gld, self.ief, self.bnd, self.xlp, self.xlu, self.xlv, self.vht, self.vdc] # Initialize both algorithms as member variables self.kqt_algo = self._InitializeKQTAlgorithm() self.risk_control_algo = self._InitializeRiskControlAlgorithm() # Initialize active algorithm to KQT by default self.active_algo = self.kqt_algo self.current_algo_name = "KQT" # Initialize tracking variables for algorithm switching self.in_risk_control_mode = False self.risk_control_end_date = None self.months_in_risk_control = 0 self.vix_trigger_months = set() # Track months where VIX has triggered # Add a flag to track if we've done the initial setup for each algorithm self.risk_control_initialized = False self.kqt_initialized = False # Add a flag to record when switching occurred self.switch_date = None # Add comprehensive error tracking self.error_count = 0 self.max_errors = 10 # Exit after 10 serious errors # Add a flag to track if we should force a rebalance self.force_rebalance = False self.force_rebalance_date = None # Add flag to directly execute RiskControl's MonthlyRebalance function after switch self.force_immediate_rebalance = False # Add a counter to track days since algorithm switch to ensure rebalancing happens self.days_since_switch = 0 # Add a force allocation flag to ensure RiskControl properly allocates self.force_deep_allocation = False # Add a helper dictionary to store current market metrics for debugging self.market_metrics = {} # Schedule hourly VIX check self.Schedule.On(self.DateRules.EveryDay(), self.TimeRules.Every(TimeSpan.FromHours(1)), self.CheckVIX) # Schedule daily notification of which algorithm is active self.Schedule.On(self.DateRules.EveryDay(), self.TimeRules.At(9, 31), # 9:31 AM self.NotifyActiveAlgorithm) # Add daily logic execution for active algorithm self.Schedule.On(self.DateRules.EveryDay(), self.TimeRules.At(10, 0), # Same time as KQTAlgorithm.TradeExecute self.ExecuteActiveAlgorithmLogic) self.Debug(f"PendulumSystem initialized with default algorithm: {self.current_algo_name}") def _InitializeKQTAlgorithm(self): """Initialize the KQT algorithm.""" kqt = KQTAlgorithm() # Set the QCAlgorithm properties from this algorithm self._ShareAlgorithmProperties(kqt) # Call Initialize to set up the algorithm kqt.Initialize() return kqt def _InitializeRiskControlAlgorithm(self): """Initialize the Risk Control algorithm correctly with proper market data""" try: # Create a new instance with the correct classname risk_control = MarketCapWeightedSP500Tracker() # Share common properties self._ShareAlgorithmProperties(risk_control) # Explicitly set required properties before Initialize # Share ETF symbols directly to member variables risk_control.spy = self.spy risk_control.bil = self.bil risk_control.sh = self.sh risk_control.psq = self.psq risk_control.dog = self.dog risk_control.rwm = self.rwm risk_control.eum = self.eum risk_control.myd = self.myd risk_control.gld = self.gld risk_control.ief = self.ief risk_control.bnd = self.bnd risk_control.xlp = self.xlp risk_control.xlu = self.xlu risk_control.xlv = self.xlv risk_control.vht = self.vht risk_control.vdc = self.vdc # Set the securities dictionaries risk_control.Securities = self.Securities risk_control.Portfolio = self.Portfolio # Initialize RiskControl with correct state risk_control.rebalance_flag = True # Force rebalance at start risk_control.diagnostic_mode = True # Enable detailed logging # Initialize the proper ATR tracking for all symbols risk_control.atr = {} for symbol in self.all_defensive_etfs + [self.bil, self.spy]: risk_control.atr[symbol] = self.ATR(symbol, 14, Resolution.Daily) # Initialize tracking collections risk_control.spy_30day_window = RollingWindow[float](30) risk_control.entry_prices = {} risk_control.previous_bil_allocation = 0.0 risk_control.inverse_positions = set() risk_control.defensive_positions = set() risk_control.last_defensive_update = datetime(1900, 1, 1) # Do explicit pre-fill of historical data BEFORE calling Initialize history = self.History(self.spy, 60, Resolution.Daily) # Pre-populate SPY price history data if not history.empty: self.Debug(f"Loading {len(history)} days of historical SPY data into spy_30day_window") # Handle different index formats if isinstance(history.index, pd.MultiIndex): spy_prices = history.loc[self.spy]['close'].values else: spy_prices = history['close'].values # IMPORTANT: Organize prices in correct chronological order # Must add OLDEST prices first, then move to newest prices_to_add = spy_prices[-30:] # Get the 30 most recent prices # Add prices to the window in the correct order (oldest first) for price in prices_to_add: risk_control.spy_30day_window.Add(price) self.Debug(f"Filled spy_30day_window with {risk_control.spy_30day_window.Count} days of data") # Also fill market price dictionary for trend calculation risk_control.spy_prices = {} # FIX: Properly extract dates based on index type if isinstance(history.index, pd.MultiIndex): # For MultiIndex, properly extract dates from the level containing timestamps # This works with format: (symbol, timestamp) dates = [] for idx in history.index: if idx[0] == self.spy: # Ensure we're getting dates for SPY # The timestamp is the second element, regardless of index structure dates.append(idx[1].date()) else: # For DatetimeIndex, convert each timestamp to date directly dates = [idx.date() for idx in history.index] # Make sure we have matching lengths min_len = min(len(dates), len(spy_prices)) # Add recent prices to the spy_prices dictionary for i in range(min_len): risk_control.spy_prices[dates[i]] = spy_prices[i] self.Debug(f"Added {len(risk_control.spy_prices)} prices to spy_prices dictionary") # Initialize algorithm but catch any exceptions try: # Call the original Initialize method risk_control.Initialize() # CRITICAL FIX: Calculate and store current market metrics for debugging if risk_control.spy_30day_window.Count >= 30: spy_price = self.Securities[self.spy].Price sma_30 = sum(risk_control.spy_30day_window) / 30 market_deviation = (spy_price / sma_30) - 1.0 market_trend = risk_control._calculateMarketTrend() self.market_metrics = { "spy_price": spy_price, "sma_30": sma_30, "market_deviation": market_deviation, "market_trend": market_trend } self.Debug(f"Market metrics: SPY={spy_price:.2f}, SMA30={sma_30:.2f}, " + f"Deviation={market_deviation*100:.2f}%, Trend={market_trend*100:.2f}%") # Override rebalance flag to force a rebalance risk_control.rebalance_flag = True risk_control.force_rebalance_override = True # VERY IMPORTANT: Set the date properly risk_control.last_rebalance_date = self.Time - timedelta(days=60) # Force a rebalance self.risk_control_initialized = True except Exception as e: self.Error(f"Error initializing RiskControl algorithm: {str(e)}") import traceback self.Debug(traceback.format_exc()) return risk_control except Exception as e: self.Error(f"Error creating RiskControl algorithm: {str(e)}") import traceback self.Debug(traceback.format_exc()) # Return a placeholder risk control algorithm that won't crash placeholder = QCAlgorithm() placeholder.Debug = self.Debug return placeholder def _ShareAlgorithmProperties(self, algo): """Share essential QCAlgorithm properties with better error handling.""" try: # Share the broker, trader, and other essential properties algo.Securities = self.Securities algo.Portfolio = self.Portfolio algo.Transactions = self.Transactions algo.BrokerageModel = self.BrokerageModel algo.Debug = self.Debug algo.Error = self.Error if hasattr(self, 'Error') else self.Debug # Share needed symbols - check first to avoid errors if hasattr(self, 'spy'): algo.spy = self.spy if hasattr(self, 'bil'): algo.bil = self.bil # Share all defensive ETFs with RiskControl algorithm more carefully if isinstance(algo, MarketCapWeightedSP500Tracker): for etf in ['sh', 'psq', 'dog', 'rwm', 'eum', 'myd', 'gld', 'ief', 'bnd', 'xlp', 'xlu', 'xlv', 'vht', 'vdc']: if hasattr(self, etf): setattr(algo, etf, getattr(self, etf)) # Only share properties that exist on the parent algorithm for prop in ['SubscriptionManager', 'OptionChainProvider', 'FutureChainProvider']: if hasattr(self, prop): setattr(algo, prop, getattr(self, prop)) # This allows child algorithms to place orders through the main algorithm def wrapped_market_order(symbol, quantity, tag=""): try: # Don't allow trading of VIX (index) if symbol == self.vix: self.Debug(f"Blocked attempt to trade VIX index") return None return self.MarketOrder(symbol, quantity, asynchronous=False, tag=tag) except Exception as e: self.Error(f"Error in wrapped_market_order for {symbol}: {str(e)}") return None algo.MarketOrder = wrapped_market_order # Safe SetHoldings wrapper def wrapped_set_holdings(symbol, percentage, liquidateExistingHoldings=False, tag=""): try: if symbol == self.vix: self.Debug(f"Blocked attempt to set holdings for VIX index") return None return self.SetHoldings(symbol, percentage, liquidateExistingHoldings, tag) except Exception as e: self.Error(f"Error in SetHoldings for {symbol}: {str(e)}") return None algo.SetHoldings = wrapped_set_holdings # Safe Liquidate wrapper def wrapped_liquidate(symbol=None, tag=""): try: return self.Liquidate(symbol, tag) except Exception as e: self.Error(f"Error in Liquidate for {symbol}: {str(e)}") return None algo.Liquidate = wrapped_liquidate # Make other relevant methods available algo.History = self.History # Create a custom getter method for Time instead of direct assignment algo.GetCurrentTime = lambda: self.Time algo.GetUtcTime = lambda: self.UtcTime # Access to UniverseSettings algo.UniverseSettings = self.UniverseSettings # Pass ObjectStore access algo.ObjectStore = self.ObjectStore return algo except Exception as e: self.Error(f"Error in _ShareAlgorithmProperties: {str(e)}") import traceback self.Debug(traceback.format_exc()) return algo # Return the algorithm even if property sharing failed def OnSecuritiesChanged(self, changes): """Handle securities universe changes.""" # Forward to active algorithm if hasattr(self.active_algo, "OnSecuritiesChanged"): self.active_algo.OnSecuritiesChanged(changes) def OnData(self, data): """Main event for handling market data.""" try: # Track days since algorithm switch if self.in_risk_control_mode and hasattr(self, 'switch_date'): self.days_since_switch = (self.Time - self.switch_date).days # Update RiskControl algorithm's spy_30day_window if in RiskControl mode if self.in_risk_control_mode and self.spy in data.Bars: spy_price = data.Bars[self.spy].Close # Add to the RiskControl's price window directly self.active_algo.spy_30day_window.Add(spy_price) # Also update the price history dictionary if hasattr(self.active_algo, 'spy_prices'): self.active_algo.spy_prices[self.Time.date()] = spy_price # Clean up old prices (keep only the last 60 days) dates_to_remove = [] for date in self.active_algo.spy_prices.keys(): if (self.Time.date() - date).days > 60: dates_to_remove.append(date) for date in dates_to_remove: self.active_algo.spy_prices.pop(date) # Forward data to the active algorithm self.active_algo.OnData(data) # Force immediate rebalance after switching to RiskControl if self.force_immediate_rebalance and self.in_risk_control_mode: # Check if market is open and we have SPY data if self.Securities.ContainsKey(self.spy) and self.Securities[self.spy].Exchange.ExchangeOpen: self.Debug(f"Executing immediate rebalance {self.days_since_switch} days after switching to RiskControl") # Force flag to be set self.active_algo.rebalance_flag = True # Call the method directly self.ForceRiskControlRebalance() self.force_immediate_rebalance = False self.Debug("Immediate rebalance completed") # Verify portfolio state after rebalance self.LogRiskControlPortfolio() except Exception as e: self.Error(f"Error in OnData: {str(e)}") import traceback self.Debug(traceback.format_exc()) def CheckVIX(self): """Check VIX levels hourly with enhanced error handling.""" try: if not self.Securities.ContainsKey(self.vix): self.Debug("VIX data not available") return current_vix = self.Securities[self.vix].Price current_month_year = (self.Time.year, self.Time.month) # Log VIX level for monitoring if self.Time.hour == 10 and self.Time.minute == 0: # Log once per day at 10:00 AM self.Debug(f"Current VIX: {current_vix:.2f}, Current Algorithm: {self.current_algo_name}") # Add extra diagnostics if in risk control mode if self.in_risk_control_mode: days_since_switch = (self.Time - self.switch_date).days if self.switch_date else "unknown" self.Debug(f"Days since switch to RiskControl: {days_since_switch}, Error count: {self.error_count}") # Check if we need to switch to Risk Control if current_vix > 29 and not self.in_risk_control_mode: self.Debug(f"VIX TRIGGER: VIX at {current_vix:.2f} > 30, switching to Risk Control algorithm") self.SwitchToRiskControl() # Mark this month as having triggered VIX self.vix_trigger_months.add(current_month_year) # Check if VIX exceeded 30 in a new month while already in risk control mode elif current_vix > 29 and self.in_risk_control_mode: if current_month_year not in self.vix_trigger_months: self.vix_trigger_months.add(current_month_year) self.ExtendRiskControlPeriod() # Check if it's time to switch back to KQT if self.in_risk_control_mode and self.Time >= self.risk_control_end_date: self.Debug("Risk Control period ended, switching back to KQT algorithm") self.SwitchToKQT() except Exception as e: self.Error(f"Error in CheckVIX: {str(e)}") import traceback self.Debug(traceback.format_exc()) def SwitchToRiskControl(self): """Switch from KQT to Risk Control algorithm, ensuring proper initialization and execution""" try: self.Debug("Starting switch to RiskControl...") # Liquidate all positions before switching self.Liquidate() self.Debug("Positions liquidated") # Re-initialize the risk control algorithm to ensure a clean state self.Debug("Reinitializing RiskControl algorithm...") self.risk_control_algo = self._InitializeRiskControlAlgorithm() # CRITICAL ADDITION: Manually force portfolio allocation test self.force_deep_allocation = True self.Debug("Set force_deep_allocation to ensure proper portfolio allocation") # Make sure the force_rebalance_override exists and is set if not hasattr(self.risk_control_algo, 'force_rebalance_override'): self.Debug("Adding missing force_rebalance_override flag to RiskControl") self.risk_control_algo.force_rebalance_override = True else: self.risk_control_algo.force_rebalance_override = True # Update active algorithm self.active_algo = self.risk_control_algo self.current_algo_name = "RiskControl" self.in_risk_control_mode = True # Set initial period to 3 months self.months_in_risk_control = 3 # Calculate end date (3 months from now) self.risk_control_end_date = self._AddMonths(self.Time, 3) # Extensive logging of market conditions at switch time self._DeepLogMarketConditions() # Record when we switched for debugging self.switch_date = self.Time self.days_since_switch = 0 # CRITICAL FIX: Set multiple flags to ensure rebalance occurs self.force_immediate_rebalance = True self.force_rebalance = True self.force_rebalance_date = self.Time.date() self.Debug(f"Switched to Risk Control algorithm until {self.risk_control_end_date.strftime('%Y-%m-%d')}") # FORCE IMMEDIATE REBALANCE - This is critical for proper execution if hasattr(self.active_algo, 'MonthlyRebalance'): self.Debug("DIRECTLY executing MonthlyRebalance after switch") # Make sure the flag is set in multiple places self.active_algo.rebalance_flag = True self.active_algo.force_rebalance_override = True # CRITICAL: Set the last_rebalance_date to force a rebalance self.active_algo.last_rebalance_date = self.Time - timedelta(days=60) # Execute the rebalance self.active_algo.MonthlyRebalance() # Execute diagnostic methods self._ForceAllocationTest() # Log immediate results self.LogRiskControlPortfolio() except Exception as e: self.Error(f"Error in SwitchToRiskControl: {str(e)}") import traceback self.Debug(traceback.format_exc()) self.error_count += 1 # If too many errors, revert to KQT if self.error_count >= self.max_errors: self.Debug("Too many errors, reverting to KQT algorithm") self.SwitchToKQT() def _AnalyzeRiskControlPositions(self): """Enhanced debugging for RiskControl decision making""" if not self.in_risk_control_mode: return # Get spy data if not self.Securities.ContainsKey(self.spy): self.Debug("Cannot analyze RiskControl - SPY not available") return spy_price = self.Securities[self.spy].Price # Check if 30-day window is ready if hasattr(self.active_algo, 'spy_30day_window') and self.active_algo.spy_30day_window.Count >= 30: sma_30 = sum(self.active_algo.spy_30day_window) / 30 market_deviation = (spy_price / sma_30) - 1.0 self.Debug(f"ANALYSIS - Current SPY: {spy_price:.2f}, 30-day SMA: {sma_30:.2f}") self.Debug(f"ANALYSIS - Market deviation: {market_deviation*100:.2f}%") # Market condition assessment if market_deviation > 0.05: self.Debug("ANALYSIS - Strong bull market detected, minimal defensive positions expected") elif market_deviation > 0: self.Debug("ANALYSIS - Mild bull market detected, moderate defensive positions expected") elif market_deviation > -0.03: self.Debug("ANALYSIS - Neutral market detected, balanced allocation expected") else: self.Debug("ANALYSIS - Bear market detected, heavy defensive positions expected") # Check defensive scores if method exists if hasattr(self.active_algo, '_runDefensiveETFDiagnostics'): self.Debug("ANALYSIS - Running detailed defensive ETF diagnostics...") trend = self.active_algo._calculateMarketTrend() if hasattr(self.active_algo, '_calculateMarketTrend') else 0 self.active_algo._runDefensiveETFDiagnostics(market_deviation, trend) else: self.Debug("ANALYSIS - Not enough history in spy_30day_window for complete analysis") # Check BIL recommendation specifically self.Debug(f"ANALYSIS - Current BIL allocation: {self.Portfolio[self.bil].HoldingsValue/self.Portfolio.TotalPortfolioValue*100:.2f}%") def SwitchToKQT(self): """Switch from Risk Control to KQT algorithm.""" # Liquidate all positions and log details self.Debug("Liquidating all positions before switch to KQT...") self.Liquidate() # Add a check to verify liquidation status positions_count = sum(1 for p in self.Portfolio.Values if p.Invested) self.Debug(f"After liquidation call: {positions_count} positions still invested") if hasattr(self, 'risk_control_algo') and hasattr(self.risk_control_algo, 'spy_30day_window'): self.Debug("Clearing RiskControl market data state") self.risk_control_algo.spy_30day_window = RollingWindow[float](30) # Re-initialize the KQT algorithm to ensure a clean state self.kqt_algo = self._InitializeKQTAlgorithm() # Update active algorithm self.active_algo = self.kqt_algo self.current_algo_name = "KQT" self.in_risk_control_mode = False self.risk_control_end_date = None self.months_in_risk_control = 0 self.vix_trigger_months.clear() # Reset VIX trigger tracking self.Debug("Switched to KQT algorithm") def ExtendRiskControlPeriod(self): """Extend the Risk Control period by 1 month.""" # Add 1 month to the current end date self.months_in_risk_control += 1 self.risk_control_end_date = self._AddMonths(self.risk_control_end_date, 1) self.Debug(f"Extended Risk Control period by 1 month. New end date: {self.risk_control_end_date.strftime('%Y-%m-%d')}") def _AddMonths(self, date, months): """Add a specified number of months to a date, handling month end correctly.""" month = date.month - 1 + months year = date.year + month // 12 month = month % 12 + 1 # Get the last day of the target month last_day = calendar.monthrange(year, month)[1] # If original date is the last day of the month or beyond the last day of the target month if date.day > last_day or date.day == calendar.monthrange(date.year, date.month)[1]: day = last_day else: day = date.day return datetime(year, month, day, date.hour, date.minute, date.second) def NotifyActiveAlgorithm(self): """Log the currently active algorithm and remaining time if in Risk Control mode.""" if self.in_risk_control_mode: days_remaining = (self.risk_control_end_date - self.Time).days self.Debug(f"Active Algorithm: {self.current_algo_name}, Days remaining: {days_remaining}") else: self.Debug(f"Active Algorithm: {self.current_algo_name}") def OnOrderEvent(self, orderEvent): """Handle order events.""" # Forward to active algorithm if it has the method if hasattr(self.active_algo, "OnOrderEvent"): self.active_algo.OnOrderEvent(orderEvent) def OnEndOfAlgorithm(self): """Called at the end of the algorithm execution.""" # Forward to both algorithms to ensure proper cleanup if hasattr(self.kqt_algo, "OnEndOfAlgorithm"): self.kqt_algo.OnEndOfAlgorithm() if hasattr(self.risk_control_algo, "OnEndOfAlgorithm"): self.risk_control_algo.OnEndOfAlgorithm() def ExecuteActiveAlgorithmLogic(self): """Execute the active algorithm's trading logic with enhanced RiskControl handling""" # Check if market is open if not self.Securities.ContainsKey(self.spy) or not self.Securities[self.spy].Exchange.ExchangeOpen: return try: # Check if we need to force a rebalance after algorithm switch if self.force_rebalance and self.Time.date() >= self.force_rebalance_date: self.Debug("Executing forced rebalance after algorithm switch") self.ForceRiskControlRebalance() self.force_rebalance = False self.force_rebalance_date = None # For KQT algorithm, call TradeExecute directly if self.current_algo_name == "KQT" and hasattr(self.active_algo, "TradeExecute"): self.Debug(f"Executing KQT Algorithm's TradeExecute method") # Make sure target positions don't include VIX original_generate_positions = self.active_algo.strategy.generate_positions def filtered_generate_positions(prediction_data, current_returns=None): positions = original_generate_positions(prediction_data, current_returns) # Remove VIX from target positions if 'VIX' in positions: self.Debug(f"Removing VIX from target positions") del positions['VIX'] return positions # Monkey patch the strategy function to filter out VIX self.active_algo.strategy.generate_positions = filtered_generate_positions try: self.active_algo.TradeExecute() self.Debug("KQT TradeExecute completed successfully") except Exception as e: self.Error(f"Error in KQT TradeExecute: {str(e)}") import traceback self.Debug(traceback.format_exc()) self.error_count += 1 # Restore original function self.active_algo.strategy.generate_positions = original_generate_positions # Enhanced RiskControl execution logic elif self.current_algo_name == "RiskControl": # Log current RiskControl status with more details if hasattr(self.active_algo, 'rebalance_flag'): self.Debug(f"RiskControl status: rebalance_flag={self.active_algo.rebalance_flag}, " + f"days_since_switch={self.days_since_switch}") # Show portfolio breakdown for debugging if self.days_since_switch < 3 or self.Time.day == 1: # First of month or after switch self.LogRiskControlPortfolio() # Check if RiskControl is properly initialized if not self.risk_control_initialized: self.Debug("RiskControl not properly initialized, attempting to fix...") self.risk_control_algo = self._InitializeRiskControlAlgorithm() self.active_algo = self.risk_control_algo self.risk_control_initialized = True return today_is_wednesday = self.Time.weekday() == 2 today_is_monday = self.Time.weekday() == 0 first_week_of_month = self.Time.day <= 7 # CRITICAL: Force monthly rebalance during first few days OR on first Wednesday force_rebalance = (self.days_since_switch < 3) or (first_week_of_month and today_is_wednesday) # Check if it's time for monthly rebalance or forced rebalance if hasattr(self.active_algo, "MonthlyRebalance"): if (first_week_of_month and today_is_wednesday) or force_rebalance or self.active_algo.rebalance_flag: self.Debug(f"Executing RiskControl Algorithm's MonthlyRebalance method (forced={force_rebalance})") # Set the rebalance flag to true before calling self.active_algo.rebalance_flag = True try: # Using SetRebalanceFlag first if available if hasattr(self.active_algo, 'SetRebalanceFlag'): self.active_algo.SetRebalanceFlag() # Execute the rebalance self.active_algo.MonthlyRebalance() self.Debug("MonthlyRebalance completed successfully") # Log portfolio after rebalance self.LogRiskControlPortfolio() except Exception as e: self.Error(f"Error in MonthlyRebalance: {str(e)}") import traceback self.Debug(traceback.format_exc()) self.error_count += 1 # Also ensure WeeklyDefensiveAdjustment gets called on Mondays if today_is_monday and hasattr(self.active_algo, "WeeklyDefensiveAdjustment"): self.Debug(f"Executing RiskControl Algorithm's WeeklyDefensiveAdjustment method") try: self.active_algo.WeeklyDefensiveAdjustment() self.Debug("WeeklyDefensiveAdjustment completed successfully") # Log portfolio after weekly adjustment if self.Time.day <= 14: # First two weeks only to reduce log spam self.LogRiskControlPortfolio() except Exception as e: self.Error(f"Error in WeeklyDefensiveAdjustment: {str(e)}") import traceback self.Debug(traceback.format_exc()) self.error_count += 1 # Add detailed market analysis EVERY day to help diagnose the issue self._DeepLogMarketConditions() # Add portfolio composition check if self.Portfolio.Invested: total_value = self.Portfolio.TotalPortfolioValue equity_value = sum(self.Portfolio[s].HoldingsValue for s in self.Portfolio.Keys if s != self.bil and s not in self.all_defensive_etfs) bil_value = self.Portfolio[self.bil].HoldingsValue if self.Portfolio[self.bil].Invested else 0 self.Debug(f"DAILY PORTFOLIO CHECK: Equity: {equity_value/total_value*100:.1f}%, " + f"BIL: {bil_value/total_value*100:.1f}%, Cash: {self.Portfolio.Cash/total_value*100:.1f}%") # Force rebalance if we still see no equity positions after several days if (self.days_since_switch > 3 and sum(1 for s in self.Portfolio.Keys if self.Portfolio[s].Invested and s != self.bil and s not in self.all_defensive_etfs) == 0): self.Debug("EMERGENCY: No equity positions detected days after switch, forcing rebalance") self.ForceRiskControlRebalance() except Exception as e: self.Error(f"Error in ExecuteActiveAlgorithmLogic: {str(e)}") import traceback self.Debug(traceback.format_exc()) self.error_count += 1 # If too many errors, revert to KQT if self.error_count >= self.max_errors and self.current_algo_name == "RiskControl": self.Debug("Too many errors in RiskControl, reverting to KQT algorithm") self.SwitchToKQT() def ForceRiskControlRebalance(self): """Force a rebalance for the Risk Control algorithm with enhanced execution""" try: if self.current_algo_name == "RiskControl" and hasattr(self.active_algo, 'MonthlyRebalance'): self.Debug("Executing immediate rebalance after switch to RiskControl") # CRITICAL: Force both flags directly without any conditions self.active_algo.rebalance_flag = True self.active_algo.force_rebalance_override = True self.Debug("ForceRiskControlRebalance: Set BOTH rebalance flags to True") # CRITICAL: Make sure we have stocks in selected_by_market_cap if not hasattr(self.active_algo, 'selected_by_market_cap') or len(self.active_algo.selected_by_market_cap) == 0: self.Debug("WARNING: selected_by_market_cap is empty, manually triggering Fine selection") # Get initial universe from coarse selection coarse_universe = self.FineSelectionFunction(self.CoarseSelectionFunction([ x for x in self.Securities.Values if x.HasFundamentalData and x.Price > 5 and x.Market == Market.USA ])) # Call Fine selection function directly to populate selected_by_market_cap self.active_algo.selected_by_market_cap = [] # Try to get market cap data for these symbols for symbol in coarse_universe: if self.Securities.ContainsKey(symbol): security = self.Securities[symbol] if hasattr(security, 'Fundamentals') and security.Fundamentals is not None: market_cap = security.Fundamentals.MarketCap self.active_algo.selected_by_market_cap.append((symbol, market_cap)) else: # Use price as a proxy for market cap if fundamentals not available self.active_algo.selected_by_market_cap.append((symbol, security.Price * 1000000)) # If we still don't have enough stocks, use a fixed list of large caps if len(self.active_algo.selected_by_market_cap) < 10: self.Debug("Still not enough stocks, adding fallback large cap stocks") fallback_tickers = ["AAPL", "MSFT", "AMZN", "GOOGL", "META", "NVDA", "XOM", "JPM", "JNJ", "PG"] for ticker in fallback_tickers: # Try to find the symbol in our Securities collection for s in self.Securities.Keys: if s.Value == ticker and s not in [x[0] for x in self.active_algo.selected_by_market_cap]: price = self.Securities[s].Price # Use price as a proxy for market cap self.active_algo.selected_by_market_cap.append((s, price * 1000000000)) self.Debug(f"Added fallback stock: {ticker}") self.Debug(f"Selected {len(self.active_algo.selected_by_market_cap)} stocks for market cap weighting") self.active_algo.MonthlyRebalance() self.Debug("MonthlyRebalance executed with forced conditions") except Exception as e: self.Error(f"Error in ForceRiskControlRebalance: {str(e)}") import traceback self.Debug(traceback.format_exc()) def LogRiskControlPortfolio(self): """Enhanced portfolio logging with more details about defensive positions""" self.Debug("CURRENT PORTFOLIO HOLDINGS:") total_value = self.Portfolio.TotalPortfolioValue cash_percent = self.Portfolio.Cash / total_value * 100 self.Debug(f"Cash: ${self.Portfolio.Cash:.2f} ({cash_percent:.2f}%)") # Calculate major allocation groups bil_value = self.Portfolio[self.bil].HoldingsValue if self.Portfolio[self.bil].Invested else 0 bil_percent = bil_value / total_value * 100 inverse_value = sum(self.Portfolio[s].HoldingsValue for s in [self.sh, self.psq, self.dog, self.rwm, self.eum, self.myd] if self.Portfolio[s].Invested) inverse_percent = inverse_value / total_value * 100 defensive_value = sum(self.Portfolio[s].HoldingsValue for s in [self.gld, self.ief, self.bnd, self.xlp, self.xlu, self.xlv, self.vht, self.vdc] if self.Portfolio[s].Invested) defensive_percent = defensive_value / total_value * 100 equity_value = total_value - cash_percent - bil_value - inverse_value - defensive_value equity_percent = equity_value / total_value * 100 self.Debug(f"ALLOCATION SUMMARY:") self.Debug(f" BIL: ${bil_value:.2f} ({bil_percent:.2f}%)") self.Debug(f" Inverse: ${inverse_value:.2f} ({inverse_percent:.2f}%)") self.Debug(f" Defensive: ${defensive_value:.2f} ({defensive_percent:.2f}%)") self.Debug(f" Equity: ${equity_value:.2f} ({equity_percent:.2f}%)") # Log invested positions invested_positions = [kvp for kvp in self.Portfolio.Values if kvp.Invested] invested_positions.sort(key=lambda p: p.HoldingsValue, reverse=True) self.Debug(f"TOP 10 HOLDINGS:") for i, position in enumerate(invested_positions[:10]): symbol = position.Symbol shares = position.Quantity value = position.HoldingsValue percent = value / total_value * 100 # Highlight position type position_type = "Equity" if symbol == self.bil: position_type = "BIL" elif symbol in [self.sh, self.psq, self.dog, self.rwm, self.eum, self.myd]: position_type = "Inverse" elif symbol in [self.gld, self.ief, self.bnd, self.xlp, self.xlu, self.xlv, self.vht, self.vdc]: position_type = "Defensive" self.Debug(f" {i+1}. {symbol.Value} ({position_type}): {shares} shares, ${value:.2f} ({percent:.2f}%)") # Log total positions count self.Debug(f"Total positions: {len(invested_positions)}") # Log defensive tracking data from RiskControl algorithm if self.current_algo_name == "RiskControl": if hasattr(self.active_algo, 'defensive_positions'): defensive_symbols = [s.Value for s in self.active_algo.defensive_positions] self.Debug(f"Current tracked defensive positions: {defensive_symbols}") if hasattr(self.active_algo, 'rebalance_flag'): self.Debug(f"RiskControl rebalance_flag: {self.active_algo.rebalance_flag}") def CoarseSelectionFunction(self, coarse): """Pass universe selection to active algorithm if it has the method""" if self.current_algo_name == "KQT" and hasattr(self.active_algo, "CoarseSelectionFunction"): return self.active_algo.CoarseSelectionFunction(coarse) # Default implementation for RiskControl or as fallback filtered = [x for x in coarse if x.HasFundamentalData and x.Price > 5 and x.Market == Market.USA] return [x.Symbol for x in filtered] def FineSelectionFunction(self, fine): """Pass universe selection to active algorithm if it has the method""" if self.current_algo_name == "KQT" and hasattr(self.active_algo, "FineSelectionFunction"): return self.active_algo.FineSelectionFunction(fine) # Default implementation for RiskControl or as fallback filtered = [x for x in fine if x.MarketCap > 1e10 and x.SecurityReference.SecurityType == "ST00000001"] sorted_by_cap = sorted(filtered, key=lambda x: x.MarketCap, reverse=True)[:30] # IMPORTANT: If we're in RiskControl mode, update its selected_by_market_cap directly if self.current_algo_name == "RiskControl": self.active_algo.selected_by_market_cap = [(x.Symbol, x.MarketCap) for x in sorted_by_cap] self.Debug(f"Updated RiskControl selected_by_market_cap with {len(sorted_by_cap)} stocks") return [x.Symbol for x in sorted_by_cap] def _EnsureEnoughHistory(self): """Ensure RiskControl has enough historical data for decisions""" if not self.in_risk_control_mode: return # Check if we need to populate the 30-day window if hasattr(self.active_algo, 'spy_30day_window') and self.active_algo.spy_30day_window.Count < 30: self.Debug("Populating spy_30day_window with historical data") # Get 60 days of history to ensure we have enough history = self.History(self.spy, 60, Resolution.Daily) if not history.empty: if isinstance(history.index, pd.MultiIndex): prices = history.loc[self.spy]['close'].values else: prices = history['close'].values # Clear existing data and fill with historical data self.active_algo.spy_30day_window = RollingWindow[float](30) # Add prices to the window in the correct order (oldest first) for price in prices[-30:]: # Take the most recent 30 days self.active_algo.spy_30day_window.Add(price) self.Debug(f"Filled spy_30day_window with {self.active_algo.spy_30day_window.Count} days of data") def _DeepLogMarketConditions(self): """Log extensive market conditions for debugging RiskControl issues""" self.Debug("=========== MARKET CONDITIONS DEEP LOG ===========") # Get SPY data if not self.Securities.ContainsKey(self.spy): self.Debug("SPY not available") return spy_price = self.Securities[self.spy].Price # Get all the historic data history = self.History(self.spy, 60, Resolution.Daily) if history.empty: self.Debug("No SPY history available") return # Get closing prices if isinstance(history.index, pd.MultiIndex): spy_prices = history.loc[self.spy]['close'].values else: spy_prices = history['close'].values # Calculate key metrics if len(spy_prices) >= 30: sma_30 = np.mean(spy_prices[-30:]) market_deviation = (spy_price / sma_30) - 1.0 # Calculate trend metrics if len(spy_prices) >= 10: trend_10d = (spy_prices[-1] / spy_prices[-10]) - 1.0 else: trend_10d = 0 # Store these metrics for reference self.market_metrics = { "spy_price": spy_price, "sma_30": sma_30, "market_deviation": market_deviation, "trend_10d": trend_10d, "last_30_prices": spy_prices[-30:].tolist(), "spy_window_size": self.active_algo.spy_30day_window.Count if hasattr(self.active_algo, 'spy_30day_window') else 0 } # Log extensive details self.Debug(f"SPY price: {spy_price:.2f}") self.Debug(f"30-day SMA: {sma_30:.2f}") self.Debug(f"Market deviation: {market_deviation*100:.2f}%") self.Debug(f"10-day trend: {trend_10d*100:.2f}%") self.Debug(f"Window count: {self.active_algo.spy_30day_window.Count}") # Log where we are in the market cycle if market_deviation > 0.05: self.Debug("MARKET CONDITION: Strong bull market (deviation > 5%)") elif market_deviation > 0.02: self.Debug("MARKET CONDITION: Moderate bull market (deviation > 2%)") elif market_deviation > 0: self.Debug("MARKET CONDITION: Mild bull market (deviation > 0%)") elif market_deviation > -0.03: self.Debug("MARKET CONDITION: Neutral market (deviation > -3%)") else: self.Debug("MARKET CONDITION: Bear market (deviation <= -3%)") # CRITICAL: Add extensive log of allocation decisions self._LogExpectedAllocations(market_deviation, trend_10d) else: self.Debug(f"Not enough price history: {len(spy_prices)} days available, need 30") self.Debug("===================================================") def _LogExpectedAllocations(self, market_deviation, market_trend): """Log what allocations should be expected based on market conditions""" self.Debug("EXPECTED ALLOCATION BASED ON MARKET CONDITIONS:") # Replicate RiskControl logic to determine expected allocations bil_weight = 0.0 if market_deviation < 0: # Enhanced formula for better downside protection base_weight = -market_deviation # Convert to positive number if base_weight > 0.08: # Significant drop # Lower cap on BIL for significant drops bil_weight = min(base_weight * 1.1, 0.7) # Cap at 70% (was 90%) else: bil_weight = min(base_weight, 0.6) # Cap at 60% (was 80%) # Estimate what allocation should be if market_deviation > 0.08: # Very strong bull expected_bil = min(0.15, bil_weight) expected_equity = 0.85 expected_defensive = 0 elif market_deviation > 0.05: # Strong bull expected_bil = min(0.25, bil_weight) expected_equity = 0.75 expected_defensive = 0 elif market_deviation > 0.0: # Mild bull expected_bil = min(0.4, bil_weight) expected_equity = 0.6 expected_defensive = 0 elif market_deviation > -0.03: # Neutral expected_bil = min(0.5, bil_weight) expected_equity = 0.5 expected_defensive = 0 # Small defensive allocation possible else: # Bear expected_bil = min(0.6, bil_weight) expected_equity = 0.4 expected_defensive = max(0, 0.2 * (-market_deviation - 0.03) * 10) # More defensive in deeper corrections self.Debug(f"Expected BIL: {expected_bil*100:.1f}%") self.Debug(f"Expected equity: {expected_equity*100:.1f}%") self.Debug(f"Expected defensive ETFs: {expected_defensive*100:.1f}%") # Make all of this information available to MonthlyRebalance for diagnostic purposes if hasattr(self.active_algo, 'expected_allocations'): self.active_algo.expected_allocations = { 'bil': expected_bil, 'equity': expected_equity, 'defensive': expected_defensive } def _ForceAllocationTest(self): """Directly test allocation to make sure we can allocate to other assets""" if not self.in_risk_control_mode or not self.force_deep_allocation: return self.Debug("FORCE ALLOCATION TEST: Directly testing allocation capabilities") # Clear the force deep allocation flag so this only runs once self.force_deep_allocation = False try: # CRITICAL: Directly try buying a very small amount of a defensive ETF test_symbol = self.gld # Gold is a good test case test_allocation = 0.05 # Small test allocation (5%) self.Debug(f"Testing direct allocation to {test_symbol}") self.active_algo.SetHoldings(test_symbol, test_allocation) # Also try a small equity position if len(self.active_algo.selected_by_market_cap) > 0: equity_symbol = self.active_algo.selected_by_market_cap[0][0] self.Debug(f"Testing direct allocation to equity {equity_symbol}") self.active_algo.SetHoldings(equity_symbol, 0.05) # Instead of using Tomorrow(), schedule for the next trading day at market open # FIX: Use EveryDay and check if it's the next day in the handler self.next_check_date = self.Time.date() + timedelta(days=1) self.Schedule.On(self.DateRules.EveryDay(), self.TimeRules.At(10, 0), self.CheckAllocationTestWrapper) except Exception as e: self.Error(f"Force allocation test failed: {str(e)}") import traceback self.Debug(traceback.format_exc()) def CheckAllocationTestWrapper(self): """Wrapper to only execute the check once on the next day""" # Only run this once on the next day if hasattr(self, 'next_check_date') and self.Time.date() >= self.next_check_date: # Clear the next check date attribute so it doesn't run again delattr(self, 'next_check_date') # Call the actual check method self.CheckAllocationTest() def CheckAllocationTest(self): """Check if our forced allocation test was successful""" self.Debug("CHECKING ALLOCATION TEST RESULTS:") # Check if we have any non-BIL positions equity_positions = 0 defensive_positions = 0 bil_position = 0 for kvp in self.Portfolio: symbol = kvp.Key position = kvp.Value if not position.Invested: continue if symbol == self.bil: bil_position += 1 elif symbol in self.all_defensive_etfs: defensive_positions += 1 else: equity_positions += 1 self.Debug(f"Current positions: BIL: {bil_position}, Defensive ETFs: {defensive_positions}, Equities: {equity_positions}") if defensive_positions == 0 and equity_positions == 0: self.Debug("CRITICAL ERROR: Only BIL is being held despite force allocation test") # Last-resort attempt: Try to fake the market conditions to force allocation self._FakeMarketConditions() else: self.Debug("Allocation test successful: Portfolio includes non-BIL positions") def _FakeMarketConditions(self): """Last resort: Attempt to fake market conditions to force allocation""" if not self.in_risk_control_mode: return self.Debug("ATTEMPTING EMERGENCY FIX: Faking market conditions to force allocation") try: # Safety check to make sure we have the RiskControl algorithm if not hasattr(self.active_algo, 'MonthlyRebalance'): self.Debug("Cannot fake market conditions: Not a RiskControl algorithm") return # Create a fake bear market by manipulating the 30-day window # This is a hack, but it might fix the issue in a live system spy_price = self.Securities[self.spy].Price # Create a set of fake prices 20% higher than current price fake_high_prices = [spy_price * 1.2] * 30 # Replace the window with our fake high prices self.active_algo.spy_30day_window = RollingWindow[float](30) for price in fake_high_prices: self.active_algo.spy_30day_window.Add(price) # This should create a severe negative market deviation # which should force allocation to defensive ETFs self.Debug("Created fake bear market conditions (-16.7% deviation)") # Force rebalance with these fake conditions self.active_algo.rebalance_flag = True self.active_algo.force_rebalance_override = True self.active_algo.MonthlyRebalance() # Log the results self.LogRiskControlPortfolio() except Exception as e: self.Error(f"Failed to fake market conditions: {str(e)}") import traceback self.Debug(traceback.format_exc())
from AlgorithmImports import * import numpy as np from datetime import timedelta class MarketCapWeightedSP500Tracker(QCAlgorithm): def Initialize(self): self.SetStartDate(2019, 1, 1) self.SetEndDate(2025, 1, 1) self.SetCash(100000) self.UniverseSettings.Resolution = Resolution.Daily self.spy = self.AddEquity("SPY", Resolution.Daily).Symbol self.bil = self.AddEquity("BIL", Resolution.Daily).Symbol self.AddUniverse(self.CoarseSelectionFunction, self.FineSelectionFunction) self.selected_by_market_cap = [] self.rebalance_flag = False self.spy_30day_window = RollingWindow[float](30) self.entry_prices = {} self.previous_bil_allocation = 0.0 self.Schedule.On(self.DateRules.MonthStart(self.spy), self.TimeRules.AfterMarketOpen(self.spy, 30), self.SetRebalanceFlag) self.Schedule.On(self.DateRules.WeekStart(self.spy, DayOfWeek.Wednesday), self.TimeRules.AfterMarketOpen(self.spy, 30), self.MonthlyRebalance) # Initialize rolling window with historical data history = self.History(self.spy, 30, Resolution.Daily) if not history.empty: for time, row in history.loc[self.spy].iterrows(): self.spy_30day_window.Add(row["close"]) # Add simple tracking of market trend self.trend_lookback = 10 self.spy_prices = {} self.max_spy_history = 60 # Days of price history to keep # Add dynamic stop-loss enhancement self.stop_loss_base = 0.04 # Reduced base stop-loss threshold self.dynamic_stop_weight = 0.5 # Blend 50% ATR signal with base threshold # Expanded list of inverse and defensive ETFs # Original inverse ETFs self.sh = self.AddEquity("SH", Resolution.Daily).Symbol # Inverse S&P 500 self.psq = self.AddEquity("PSQ", Resolution.Daily).Symbol # Inverse Nasdaq-100 self.dog = self.AddEquity("DOG", Resolution.Daily).Symbol # Inverse Dow Jones self.rwm = self.AddEquity("RWM", Resolution.Daily).Symbol # Inverse Russell 2000 self.eum = self.AddEquity("EUM", Resolution.Daily).Symbol # Inverse Emerging Markets self.myd = self.AddEquity("MYY", Resolution.Daily).Symbol # Inverse Mid-Cap 400 # Alternative defensive ETFs (not inverse but potentially good in downturns) self.gld = self.AddEquity("GLD", Resolution.Daily).Symbol # Gold self.ief = self.AddEquity("IEF", Resolution.Daily).Symbol # 7-10 Year Treasury self.bnd = self.AddEquity("BND", Resolution.Daily).Symbol # Total Bond Market # Sector-based defensive ETFs (often outperform in bear markets) self.xlp = self.AddEquity("XLP", Resolution.Daily).Symbol # Consumer Staples self.xlu = self.AddEquity("XLU", Resolution.Daily).Symbol # Utilities self.xlv = self.AddEquity("XLV", Resolution.Daily).Symbol # Healthcare self.vht = self.AddEquity("VHT", Resolution.Daily).Symbol # Vanguard Healthcare self.vdc = self.AddEquity("VDC", Resolution.Daily).Symbol # Vanguard Consumer Staples # Group all defensive ETFs together self.inverse_etfs = [self.sh, self.psq, self.dog, self.rwm, self.eum, self.myd] self.alternative_defensive = [self.gld, self.ief, self.bnd] self.sector_defensive = [self.xlp, self.xlu, self.xlv, self.vht, self.vdc] self.all_defensive = self.inverse_etfs + self.alternative_defensive + self.sector_defensive # Add diagnostic logging capability self.diagnostic_mode = True # Enable detailed diagnostics # Initialize positions tracking and add weekly tactical adjustment self.defensive_positions = set() self.last_defensive_update = datetime(1900, 1, 1) # Add weekly defensive ETF evaluation schedule self.Schedule.On(self.DateRules.WeekStart(self.spy, DayOfWeek.Monday), self.TimeRules.AfterMarketOpen(self.spy, 60), # After main rebalance self.WeeklyDefensiveAdjustment) # Initialize positions tracking self.inverse_positions = set() # Add inverse ETF lookback windows for better momentum calculation self.inverse_lookback_short = 7 # 1 week momentum window self.inverse_lookback_med = 15 # Medium-term momentum # Add ATR indicators for enhanced volatility-based stop-loss calculation self.atr_period = 14 self.atr = {} # Register ATR for key symbols (defensive ETFs, BIL, and SPY) for symbol in self.all_defensive + [self.bil, self.spy]: self.atr[symbol] = self.ATR(symbol, self.atr_period, Resolution.Daily) # Add flag for forced rebalance override self.force_rebalance_override = False # Add logging for rebalance decisions self.rebalance_log = [] # Add expected allocations tracking for diagnostics self.expected_allocations = {'bil': 0, 'equity': 0, 'defensive': 0} # Add a flag for when debugging allocations self.debug_allocation_details = True def CoarseSelectionFunction(self, coarse): filtered = [x for x in coarse if x.HasFundamentalData and x.Price > 5 and x.Market == Market.USA] return [x.Symbol for x in filtered] def FineSelectionFunction(self, fine): filtered = [x for x in fine if x.MarketCap > 1e10 and x.SecurityReference.SecurityType == "ST00000001"] sorted_by_cap = sorted(filtered, key=lambda x: x.MarketCap, reverse=True)[:30] self.selected_by_market_cap = [(x.Symbol, x.MarketCap) for x in sorted_by_cap] return [x.Symbol for x in sorted_by_cap] def SetRebalanceFlag(self): if self.Time.weekday() == 2: # Wednesday self.rebalance_flag = True def OnData(self, data): # Update price window if not data.Bars.ContainsKey(self.spy): return self.spy_30day_window.Add(data.Bars[self.spy].Close) # Track prices for trend calculation self.spy_prices[self.Time.date()] = data.Bars[self.spy].Close # Remove old prices dates_to_remove = [] for date in self.spy_prices.keys(): if (self.Time.date() - date).days > self.max_spy_history: dates_to_remove.append(date) for date in dates_to_remove: self.spy_prices.pop(date) market_trend = self._calculateMarketTrend() # Track if any stop-loss was triggered stop_loss_triggered = False # Check stop-loss triggers with improved dynamic thresholds for kvp in self.Portfolio: symbol = kvp.Key holding = kvp.Value if holding.Invested and symbol != self.bil: current_price = self.Securities[symbol].Price if symbol not in self.entry_prices: self.entry_prices[symbol] = current_price price_drop = (self.entry_prices[symbol] - current_price) / self.entry_prices[symbol] # Start with the base threshold and adjust based on market trend stop_threshold = self.stop_loss_base if market_trend < -0.03: stop_threshold *= 0.9 # tighten in downtrends elif market_trend > 0.03: stop_threshold *= 1.1 # loosen in uptrends # Incorporate ATR if ready with adjustment to prevent overreaction in high volatility if symbol in self.atr and self.atr[symbol].IsReady: current_atr = self.atr[symbol].Current.Value atr_pct = current_atr / current_price # If ATR is excessively high versus our base, use a lower weight to temper the effect effective_weight = self.dynamic_stop_weight if atr_pct > stop_threshold * 1.2: effective_weight = min(self.dynamic_stop_weight, 0.3) stop_threshold = ((1 - effective_weight) * stop_threshold + effective_weight * atr_pct) if price_drop >= stop_threshold: self.Liquidate(symbol) stop_loss_triggered = True self.Debug(f"Stop-loss triggered for {symbol} at {current_price}, drop: {price_drop*100:.1f}%, threshold: {stop_threshold*100:.1f}%") # If any stop-loss was triggered, invest all available cash in BIL if stop_loss_triggered: available_cash = self.Portfolio.Cash + self.Portfolio.UnsettledCash if available_cash > 0: bil_price = self.Securities[self.bil].Price bil_quantity = available_cash / bil_price self.MarketOrder(self.bil, bil_quantity) self.Debug(f"Invested ${available_cash:0.2f} in BIL after stop-loss") def WeeklyDefensiveAdjustment(self): """Weekly check and adjustment for defensive ETF positions""" # Skip if we've done the monthly rebalance recently days_since_rebalance = (self.Time.date() - self.last_rebalance_date.date()).days if hasattr(self, 'last_rebalance_date') else 999 if days_since_rebalance < 3: return # Skip if we've updated defensive positions recently days_since_update = (self.Time.date() - self.last_defensive_update.date()).days if days_since_update < 5: # At most once a week return # Calculate current market conditions spy_price = self.Securities[self.spy].Price sma_30 = sum(self.spy_30day_window) / self.spy_30day_window.Count if self.spy_30day_window.Count > 0 else spy_price market_deviation = (spy_price / sma_30) - 1.0 market_trend = self._calculateMarketTrend() # Skip in strong bull markets if market_deviation > 0.04 and market_trend > 0.03: return # Calculate total invested amount including all positions total_invested = sum(holding.HoldingsValue for holding in self.Portfolio.Values if holding.Invested) / self.Portfolio.TotalPortfolioValue # If we're already fully invested, can't add more defensive positions if total_invested >= 0.98: # Allow small buffer for rounding errors self.Debug(f"Already fully invested ({total_invested:.2f}), skipping defensive adjustments") return # Calculate available room for defensive positions available_allocation = max(0, 0.99 - total_invested) # Keep tiny buffer # Calculate how much is currently allocated to defensive positions current_defensive_value = sum(self.Portfolio[s].HoldingsValue for s in self.defensive_positions if self.Portfolio.ContainsKey(s) and self.Portfolio[s].Invested) # Calculate current BIL allocation current_bil_value = self.Portfolio[self.bil].HoldingsValue if self.Portfolio[self.bil].Invested else 0 bil_allocation = current_bil_value / self.Portfolio.TotalPortfolioValue # Limit potential allocation to available room max_defensive_pct = min(0.25, available_allocation / bil_allocation if bil_allocation > 0 else 0) potential_allocation = bil_allocation * max_defensive_pct # Make sure we don't exceed available room potential_allocation = min(potential_allocation, available_allocation) # Super detailed diagnostics for current defensive positions if self.diagnostic_mode and self.defensive_positions: self.Debug(f"WEEKLY CHECK - Current defensive positions:") for symbol in self.defensive_positions: if self.Portfolio.ContainsKey(symbol) and self.Portfolio[symbol].Invested: position = self.Portfolio[symbol] entry = self.entry_prices.get(symbol, position.AveragePrice) current = self.Securities[symbol].Price pnl_pct = (current / entry) - 1 if entry > 0 else 0 self.Debug(f" {symbol}: PnL {pnl_pct*100:.2f}%, Value ${position.HoldingsValue:.2f}") # Evaluate current defensive positions and potential new ones self.Debug(f"WEEKLY CHECK - Market: Dev {market_deviation*100:.2f}%, Trend {market_trend*100:.2f}%") self.Debug(f"BIL allocation: {bil_allocation*100:.2f}%, Potential defensive: {potential_allocation*100:.2f}%") # Run the defensive ETF evaluation new_allocations = self._evaluateDefensiveETFs(market_deviation, market_trend, potential_allocation) # Calculate which positions to add, modify, or remove positions_to_add = {} positions_to_remove = set() # Process existing positions for symbol in self.defensive_positions: # If position should be kept but maybe at different allocation if symbol in new_allocations and new_allocations[symbol] > 0: current_pct = self.Portfolio[symbol].HoldingsValue / self.Portfolio.TotalPortfolioValue if self.Portfolio.ContainsKey(symbol) else 0 target_pct = new_allocations[symbol] # If allocation difference is significant, adjust position if abs(target_pct - current_pct) > 0.01: positions_to_add[symbol] = target_pct # Remove from new allocations dict to avoid double-processing new_allocations.pop(symbol) else: # Position should be removed positions_to_remove.add(symbol) # Add any remaining new positions for symbol, allocation in new_allocations.items(): if allocation > 0.01: # Minimum meaningful allocation positions_to_add[symbol] = allocation # Check if we'll exceed our allocation limits with new positions total_new_allocation = sum(positions_to_add.values()) if total_new_allocation > available_allocation: # Scale back allocations to fit available space scale_factor = available_allocation / total_new_allocation for symbol in positions_to_add: positions_to_add[symbol] *= scale_factor self.Debug(f"Scaled defensive allocations to fit available space: {scale_factor:.4f}") # Execute trades if needed if positions_to_add or positions_to_remove: self.Debug(f"WEEKLY ADJUSTMENT - Making defensive position changes") # Remove positions no longer needed for symbol in positions_to_remove: self.Liquidate(symbol) self.defensive_positions.remove(symbol) self.Debug(f"Removed defensive position: {symbol}") # Add or adjust positions for symbol, allocation in positions_to_add.items(): self.SetHoldings(symbol, allocation) self.defensive_positions.add(symbol) self.entry_prices[symbol] = self.Securities[symbol].Price self.Debug(f"Updated defensive position: {symbol} to {allocation*100:.2f}%") self.last_defensive_update = self.Time def MonthlyRebalance(self): """Modified to provide much more detailed logging about decisions""" # Check if we're forcing a rebalance regardless of flag if not self.rebalance_flag and not self.force_rebalance_override: self.Debug("MonthlyRebalance called but both rebalance_flag and force_rebalance_override are False - skipping") return self.Debug(f"EXECUTING MONTHLY REBALANCE - rebalance_flag: {self.rebalance_flag}, override: {self.force_rebalance_override}") # Reset the override flag after using it self.force_rebalance_override = False self.rebalance_flag = False self.entry_prices.clear() # Reset entry prices at rebalance # Add more detailed logging at key decision points if self.spy_30day_window.Count < 30: self.Debug(f"ERROR: Not enough SPY history. Window count: {self.spy_30day_window.Count}") # CRITICAL FIX: Use fake data if needed to ensure algorithm runs # Fill window with last available price spy_price = self.Securities[self.spy].Price while self.spy_30day_window.Count < 30: self.spy_30day_window.Add(spy_price) self.Debug(f"Filled insufficient window with current price. New count: {self.spy_30day_window.Count}") # Comprehensive logging of the 30-day window data window_data = [self.spy_30day_window[i] for i in range(self.spy_30day_window.Count)] self.Debug(f"SPY 30-day window first 5 values: {window_data[:5]}") self.Debug(f"SPY 30-day window last 5 values: {window_data[-5:]}") spy_price = self.Securities[self.spy].Price sma_30 = sum(self.spy_30day_window) / 30 # Log raw data self.Debug(f"RAW DATA - Current SPY price: {spy_price:.2f}, 30-day SMA: {sma_30:.2f}") # Calculate market deviation for better decisions market_deviation = (spy_price / sma_30) - 1.0 market_trend = self._calculateMarketTrend() self.Debug(f"REBALANCE ANALYSIS - Market deviation: {market_deviation*100:.2f}%, Trend: {market_trend*100:.2f}%") # Enhanced BIL allocation logic with lower caps bil_weight = 0.0 if spy_price < sma_30: # Enhanced formula for better downside protection base_weight = (sma_30 - spy_price) / sma_30 if base_weight > 0.08: # Significant drop # Lower cap on BIL for significant drops bil_weight = min(base_weight * 1.1, 0.7) # Cap at 70% (was 90%) else: bil_weight = min(base_weight, 0.6) # Cap at 60% (was 80%) # Enhanced reduction rule for better returns in bull markets if market_deviation > 0.05: # Strong bull market min_bil_allocation = self.previous_bil_allocation * 0.7 # 30% reduction elif market_deviation > 0.02: # Modest bull market min_bil_allocation = self.previous_bil_allocation * 0.75 # 25% reduction else: min_bil_allocation = self.previous_bil_allocation * 0.8 # Standard 20% reduction # Ensure we have a valid min_bil_allocation if min_bil_allocation <= 0 or np.isnan(min_bil_allocation): min_bil_allocation = 0 self.Debug("WARNING: min_bil_allocation was invalid, reset to 0") bil_weight = max(bil_weight, min_bil_allocation) # Debug the BIL allocation decision in detail self.Debug(f"BIL ALLOCATION CALCULATION:") self.Debug(f" Initial bil_weight based on deviation: {bil_weight*100:.2f}%") self.Debug(f" Previous BIL allocation: {self.previous_bil_allocation*100:.2f}%") self.Debug(f" Minimum BIL allocation: {min_bil_allocation*100:.2f}%") # Lower caps on BIL in all market conditions original_bil_weight = bil_weight # Save for debugging if market_deviation > 0.08: # Very strong bull bil_weight = min(bil_weight, 0.15) # Cap at 15% (was 20%) self.Debug(f" Applied very strong bull market cap: {bil_weight*100:.2f}%") elif market_deviation > 0.05: # Strong bull bil_weight = min(bil_weight, 0.25) # Cap at 25% (was 30%) self.Debug(f" Applied strong bull market cap: {bil_weight*100:.2f}%") elif market_deviation > 0.0: # Mild bull bil_weight = min(bil_weight, 0.4) # Cap at 40% (new tier) self.Debug(f" Applied mild bull market cap: {bil_weight*100:.2f}%") elif market_deviation > -0.03: # Neutral bil_weight = min(bil_weight, 0.5) # Cap at 50% (new tier) self.Debug(f" Applied neutral market cap: {bil_weight*100:.2f}%") else: # Bear bil_weight = min(bil_weight, 0.6) # Cap at 60% (new tier) self.Debug(f" Applied bear market cap: {bil_weight*100:.2f}%") if bil_weight != original_bil_weight: self.Debug(f" BIL allocation was capped from {original_bil_weight*100:.2f}% to {bil_weight*100:.2f}%") # Calculate how much of the original BIL allocation to potentially use for inverse ETFs original_bil = bil_weight # Use only a portion of BIL for inverse ETFs, keeping some as BIL inverse_etf_potential = original_bil * 0.4 # Use 40% of BIL allocation for inverse ETFs bil_weight = original_bil - inverse_etf_potential # Run diagnostics on defensive ETFs if self.diagnostic_mode: self._runDefensiveETFDiagnostics(market_deviation, market_trend) # Evaluate inverse ETFs for possible allocation inverse_allocations = self._evaluateInverseETFs(market_deviation, market_trend, inverse_etf_potential) # Include alternative defensive ETFs in evaluation all_defensive_allocations = self._evaluateDefensiveETFs(market_deviation, market_trend, inverse_etf_potential) # Calculate total allocation to defensive ETFs total_defensive_allocation = sum(all_defensive_allocations.values()) # Set aside remainder as cash (won't be allocated) cash_reserve = inverse_etf_potential - total_defensive_allocation # Calculate weight for equity portion equity_weight = 1.0 - total_defensive_allocation # Ensure total allocation never exceeds 100% total_allocation = bil_weight + total_defensive_allocation + equity_weight if total_allocation > 1.0: # Scale back components proportionally scale_factor = 1.0 / total_allocation bil_weight *= scale_factor equity_weight *= scale_factor # Scale each defensive allocation for symbol in all_defensive_allocations: all_defensive_allocations[symbol] *= scale_factor total_defensive_allocation = sum(all_defensive_allocations.values()) self.Debug(f"Scaled allocations to prevent leverage: {scale_factor:.4f}") self.Debug(f"Allocation breakdown: Equity {equity_weight*100:.1f}%, BIL {bil_weight*100:.1f}%, " + f"Defensive ETFs {total_defensive_allocation*100:.1f}%, Cash {cash_reserve*100:.1f}%") # Enhance stock selection with simple momentum filter momentum_scores = self._calculateSimpleMomentum() # Filter out worst momentum stocks filtered_stocks = [] for symbol, mcap in self.selected_by_market_cap: score = momentum_scores.get(symbol, 1.0) if score >= 0.9: # Keep only neutral or positive momentum stocks filtered_stocks.append((symbol, mcap)) # If we filtered too many, revert to original list if len(filtered_stocks) < 20: filtered_stocks = self.selected_by_market_cap # Calculate weights using the filtered stocks total_market_cap = sum([x[1] for x in filtered_stocks]) weights = {x[0]: (x[1] / total_market_cap) * equity_weight for x in filtered_stocks} # CRITICAL: Check if we actually have any stocks selected if len(self.selected_by_market_cap) == 0: self.Debug("CRITICAL ERROR: No stocks in selected_by_market_cap!") # Attempt to manually select some large cap stocks as a fallback fallback_tickers = ["AAPL", "MSFT", "AMZN", "GOOGL", "META", "NVDA", "BRK.B", "JNJ", "V", "PG", "UNH", "JPM", "HD", "MA", "XOM"] self.Debug(f"Using fallback list of {len(fallback_tickers)} large cap stocks") fallback_stocks = [] # Add each stock we can find for ticker in fallback_tickers: try: # Try to find the symbol symbol = None for kvp in self.Securities: if kvp.Key.Value == ticker: symbol = kvp.Key break if symbol is not None: # Get approximate market cap (using price as proxy) price = self.Securities[symbol].Price # Rough estimate of market cap using price as proxy (shares outstanding unknown) fake_mcap = price * 1000000000 # Use price in billions as stand-in fallback_stocks.append((symbol, fake_mcap)) self.Debug(f"Added fallback stock: {ticker}") except Exception as e: self.Debug(f"Error adding fallback stock {ticker}: {str(e)}") if len(fallback_stocks) > 0: self.selected_by_market_cap = fallback_stocks self.Debug(f"Added {len(fallback_stocks)} fallback stocks") else: # Direct fallback - add SPY with 100% allocation self.Debug("EMERGENCY FALLBACK: Using SPY as only equity position") self.SetHoldings(self.spy, equity_weight) self.SetHoldings(self.bil, bil_weight) # Set BIL allocation for next time self.previous_bil_allocation = self.Portfolio[self.bil].HoldingsValue / self.Portfolio.TotalPortfolioValue # Exit early since we've handled allocations directly return # Log how many stocks we have self.Debug(f"Using {len(self.selected_by_market_cap)} stocks for market cap weighting") # If we made it here, continue with existing code to filter stocks and calculate weights # CRITICAL: Add diagnostic logging for equity allocations self.Debug(f"EQUITY ALLOCATION DETAILS:") self.Debug(f"Filtered stocks count: {len(filtered_stocks)}") self.Debug(f"Total market cap: ${total_market_cap:,.2f}") self.Debug(f"Total equity weight: {equity_weight*100:.2f}%") # Log top equity positions top_equities = sorted(weights.items(), key=lambda x: x[1], reverse=True)[:10] self.Debug(f"Top 10 equity allocations:") for symbol, weight in top_equities: self.Debug(f" {symbol.Value}: {weight*100:.2f}% (Market cap: ${filtered_stocks[[s[0] for s in filtered_stocks].index(symbol)][1]:,.2f})") # Track successful allocations successful_allocations = [] failed_allocations = [] invested = set() for symbol, weight in weights.items(): if weight > 0: try: # CRITICAL: Add minimum threshold for equity positions if weight >= 0.005: # Minimum 0.5% allocation order_ticket = self.SetHoldings(symbol, weight) # Check if order was placed successfully if order_ticket: invested.add(symbol) successful_allocations.append((symbol.Value, weight)) self.entry_prices[symbol] = self.Securities[symbol].Price else: failed_allocations.append((symbol.Value, weight, "Order ticket null")) else: self.Debug(f"Skipping {symbol.Value} - allocation too small: {weight*100:.2f}%") except Exception as e: failed_allocations.append((symbol.Value, weight, str(e))) self.Debug(f"Error setting holdings for {symbol.Value}: {str(e)}") # Log allocation results self.Debug(f"Successfully allocated to {len(successful_allocations)} equity positions") if failed_allocations: self.Debug(f"Failed to allocate to {len(failed_allocations)} positions:") for symbol, weight, reason in failed_allocations[:5]: # Log first 5 failures self.Debug(f" {symbol}: {weight*100:.2f}% - Reason: {reason}") # Set BIL position if bil_weight > 0: self.SetHoldings(self.bil, bil_weight) invested.add(self.bil) else: self.Liquidate(self.bil) # Set defensive ETF positions for symbol, weight in all_defensive_allocations.items(): if weight > 0: self.SetHoldings(symbol, weight) invested.add(symbol) self.defensive_positions.add(symbol) # Using renamed set self.entry_prices[symbol] = self.Securities[symbol].Price self.Debug(f"Allocated {weight*100:.2f}% to defensive ETF {symbol}") elif symbol in self.defensive_positions: self.Liquidate(symbol) self.defensive_positions.remove(symbol) # Update last rebalance date tracker self.last_rebalance_date = self.Time # Store current BIL allocation for next month's minimum self.previous_bil_allocation = self.Portfolio[self.bil].HoldingsValue / self.Portfolio.TotalPortfolioValue self.Debug(f"New BIL allocation: {bil_weight*100:0.2f}% (Minimum was {min_bil_allocation*100:0.2f}%)") # Liquidate positions not in current selection for kvp in self.Portfolio: symbol = kvp.Key if (kvp.Value.Invested and symbol not in invested and symbol != self.spy and symbol not in self.defensive_positions): self.Liquidate(symbol) # Near the end of the method, add extra debugging to confirm positions were set def log_allocations(weights, defensive_allocations, bil): self.Debug(f"FINAL ALLOCATION SUMMARY:") self.Debug(f" BIL allocation: {bil*100:.2f}%") self.Debug(f" Equity allocations: {len(weights)} positions") if weights: total_equity = sum(weights.values()) self.Debug(f" Total equity weight: {total_equity*100:.2f}%") top_equities = sorted(weights.items(), key=lambda x: x[1], reverse=True)[:5] for sym, wt in top_equities: self.Debug(f" {sym.Value}: {wt*100:.2f}%") self.Debug(f" Defensive allocations: {len(defensive_allocations)} positions") if defensive_allocations: total_defensive = sum(defensive_allocations.values()) self.Debug(f" Total defensive weight: {total_defensive*100:.2f}%") for sym, wt in defensive_allocations.items(): self.Debug(f" {sym.Value}: {wt*100:.2f}%") # Log final allocations before setting them log_allocations(weights, all_defensive_allocations, bil_weight) # Near end of function, add a verification step self.Debug("VERIFICATION OF FINAL PORTFOLIO:") invested_positions = [kvp for kvp in self.Portfolio.Values if kvp.Invested] equity_positions = [p for p in invested_positions if p.Symbol != self.bil and p.Symbol not in self.defensive_positions] self.Debug(f"Final equity positions: {len(equity_positions)}/{len(filtered_stocks)}") self.Debug(f"Final defensive positions: {len([p for p in invested_positions if p.Symbol in self.defensive_positions])}") self.Debug(f"BIL position: {1 if self.Portfolio[self.bil].Invested else 0}") def _calculateMarketTrend(self): """Calculate recent market trend using price history""" if len(self.spy_prices) < self.trend_lookback + 1: return 0 # Not enough data dates = sorted(self.spy_prices.keys()) if len(dates) <= self.trend_lookback: return 0 recent_price = self.spy_prices[dates[-1]] older_price = self.spy_prices[dates[-self.trend_lookback]] return (recent_price / older_price) - 1.0 def _calculateSimpleMomentum(self): """Calculate simple momentum scores for stock filtering""" momentum_scores = {} symbols = [sym for sym, _ in self.selected_by_market_cap] if not symbols: return momentum_scores # Get 30 days of history for all stocks history = self.History(symbols, 30, Resolution.Daily) if history.empty: return momentum_scores # Calculate simple momentum (30-day price change) for symbol in symbols: if symbol in history.index.get_level_values(0): prices = history.loc[symbol]['close'] if len(prices) >= 30: # 30-day momentum mom = prices.iloc[-1] / prices.iloc[0] - 1 # Convert to a score between 0.7 and 1.3 # Center around 1.0, with range based on 15% move momentum_scores[symbol] = min(1.3, max(0.7, 1 + (mom * 2))) return momentum_scores def _evaluateInverseETFs(self, market_deviation, market_trend, max_allocation): """Enhanced evaluation of inverse ETFs with more sensitive criteria""" allocations = {symbol: 0 for symbol in self.inverse_etfs} # More permissive consideration of inverse ETFs if market_deviation > 0.04 and market_trend > 0.02: return allocations # Only skip in very strong bull markets # Get more history for better momentum calculation history = self.History(self.inverse_etfs, 45, Resolution.Daily) if history.empty: return allocations # Enhanced momentum scoring momentum_scores = {} volatility_scores = {} for symbol in self.inverse_etfs: if symbol in history.index.get_level_values(0): prices = history.loc[symbol]['close'] if len(prices) >= 30: # Multiple timeframe momentum - more emphasis on recent performance mom_7d = prices.iloc[-1] / prices.iloc[-7] - 1 if len(prices) >= 7 else 0 mom_15d = prices.iloc[-1] / prices.iloc[-15] - 1 if len(prices) >= 15 else 0 mom_30d = prices.iloc[-1] / prices.iloc[0] - 1 # Weight recent momentum much more heavily momentum = (mom_7d * 0.5) + (mom_15d * 0.3) + (mom_30d * 0.2) # Calculate volatility (lower is better for inverse ETFs) returns = [prices.iloc[i+1]/prices.iloc[i]-1 for i in range(min(20, len(prices)-1))] volatility = np.std(returns) if returns else 0 # Calculate short-term rate of change (acceleration) if len(prices) >= 10: recent_5d_change = prices.iloc[-1] / prices.iloc[-5] - 1 prev_5d_change = prices.iloc[-6] / prices.iloc[-10] - 1 acceleration = recent_5d_change - prev_5d_change else: acceleration = 0 # Momentum score adds weight for accelerating performance momentum_scores[symbol] = momentum + (acceleration * 0.5) volatility_scores[symbol] = volatility # More aggressive filtering - consider even small positive momentum positive_momentum_etfs = {s: score for s, score in momentum_scores.items() if score > -0.005} # No allocation if no ETFs have at least neutral momentum if not positive_momentum_etfs: self.Debug("No inverse ETFs showing acceptable momentum - keeping as cash") return allocations # Enhanced selection: favor momentum but consider volatility too best_candidates = [] for symbol, score in positive_momentum_etfs.items(): volatility = volatility_scores.get(symbol, 1.0) # Adjust score: higher momentum is good, lower volatility is good adjusted_score = score - (volatility * 0.5) best_candidates.append((symbol, score, adjusted_score)) # Sort by adjusted score best_candidates.sort(key=lambda x: x[2], reverse=True) # More aggressive allocation model allocation_pct = 0.0 # Allocate based on market conditions with more sensitivity if market_deviation < -0.05: allocation_pct = 1.0 # Use 100% of available inverse allocation elif market_deviation < -0.03: allocation_pct = 0.8 # Use 80% of available inverse allocation elif market_deviation < -0.01: allocation_pct = 0.6 # Use 60% of available inverse allocation elif market_deviation < 0.01: # Even in slight bull market if momentum is positive allocation_pct = 0.4 # Use 40% of available inverse allocation else: allocation_pct = 0.2 # Use 20% only if momentum is strong enough # No candidates or market conditions don't justify allocation if not best_candidates or allocation_pct < 0.1: return allocations # Take top 1-2 ETFs depending on market conditions num_etfs = 1 if market_deviation < -0.04 and len(best_candidates) > 1: num_etfs = 2 # Use two ETFs in stronger downtrends # Allocate to best ETF(s) remaining_allocation = max_allocation * allocation_pct for i in range(min(num_etfs, len(best_candidates))): symbol, raw_score, _ = best_candidates[i] # Allocate proportionally to momentum strength, with a minimum threshold etf_weight = min(1.0, max(0.3, raw_score * 3)) if raw_score > 0 else 0.3 # Calculate allocation for this ETF etf_allocation = remaining_allocation * etf_weight / num_etfs # Only allocate if it's a meaningful amount if etf_allocation >= 0.01: # At least 1% allocation allocations[symbol] = etf_allocation self.Debug(f"Selected inverse ETF {symbol} with momentum {raw_score:.2%}, allocating {etf_allocation*100:.2f}%") return allocations def _runDefensiveETFDiagnostics(self, market_deviation, market_trend): """Run detailed diagnostics on all defensive ETFs""" # Get extensive history for analysis history = self.History(self.all_defensive + [self.spy], 90, Resolution.Daily) if history.empty: return spy_perf = {} if self.spy in history.index.get_level_values(0): spy_prices = history.loc[self.spy]['close'] if len(spy_prices) >= 30: spy_perf = { "7d": spy_prices.iloc[-1] / spy_prices.iloc[-7] - 1 if len(spy_prices) >= 7 else 0, "15d": spy_prices.iloc[-1] / spy_prices.iloc[-15] - 1 if len(spy_prices) >= 15 else 0, "30d": spy_prices.iloc[-1] / spy_prices.iloc[-30] - 1 } # Log market conditions self.Debug(f"DIAGNOSTIC - Market: Deviation {market_deviation*100:.2f}%, " + f"Trend {market_trend*100:.2f}%, SPY 30d: {spy_perf.get('30d', 0)*100:.2f}%") # Analyze each ETF for symbol in self.all_defensive: if symbol in history.index.get_level_values(0): prices = history.loc[symbol]['close'] if len(prices) >= 30: # Calculate multiple timeframe performance perf_7d = prices.iloc[-1] / prices.iloc[-7] - 1 if len(prices) >= 7 else 0 perf_15d = prices.iloc[-1] / prices.iloc[-15] - 1 if len(prices) >= 15 else 0 perf_30d = prices.iloc[-1] / prices.iloc[-30] - 1 # Calculate recent acceleration recent_5d = prices.iloc[-1] / prices.iloc[-5] - 1 if len(prices) >= 5 else 0 prev_5d = prices.iloc[-6] / prices.iloc[-10] - 1 if len(prices) >= 10 else 0 accel = recent_5d - prev_5d # Calculate relative performance vs SPY rel_perf = {} for period, spy_val in spy_perf.items(): if period == "7d": rel_perf[period] = perf_7d - spy_val elif period == "15d": rel_perf[period] = perf_15d - spy_val elif period == "30d": rel_perf[period] = perf_30d - spy_val # Log detailed ETF statistics self.Debug(f" {symbol}: 7d: {perf_7d*100:.2f}%, 15d: {perf_15d*100:.2f}%, " + f"30d: {perf_30d*100:.2f}%, Accel: {accel*100:.2f}%, " + f"Rel30d: {rel_perf.get('30d', 0)*100:.2f}%") def _evaluateDefensiveETFs(self, market_deviation, market_trend, max_allocation): """Enhanced defensive ETF evaluation with sector rotation""" allocations = {symbol: 0 for symbol in self.all_defensive} # Skip if market is very bullish if market_deviation > 0.04 and market_trend > 0.02: return allocations # Get history for all defensive options and SPY history = self.History(self.all_defensive + [self.spy], 60, Resolution.Daily) if history.empty: return allocations # Detailed diagnostics on all ETFs self.Debug(f"DEFENSIVE ETF PERFORMANCE DETAILS:") # Calculate SPY performance for relative comparisons spy_perf = {} if self.spy in history.index.get_level_values(0): spy_prices = history.loc[self.spy]['close'] if len(spy_prices) >= 30: spy_perf = { "5d": spy_prices.iloc[-1] / spy_prices.iloc[-5] - 1 if len(spy_prices) >= 5 else 0, "10d": spy_prices.iloc[-1] / spy_prices.iloc[-10] - 1 if len(spy_prices) >= 10 else 0, "20d": spy_prices.iloc[-1] / spy_prices.iloc[-20] - 1 if len(spy_prices) >= 20 else 0, "30d": spy_prices.iloc[-1] / spy_prices.iloc[-30] - 1 } self.Debug(f" SPY: 5d: {spy_perf['5d']*100:.1f}%, 10d: {spy_perf['10d']*100:.1f}%, " + f"20d: {spy_perf['20d']*100:.1f}%, 30d: {spy_perf['30d']*100:.1f}%") # Enhanced scoring system with different criteria for different ETF types etf_scores = {} # Process each ETF by type for group_name, group in [("Inverse", self.inverse_etfs), ("Alternative", self.alternative_defensive), ("Sector", self.sector_defensive)]: self.Debug(f" {group_name} ETFs:") for symbol in group: if symbol in history.index.get_level_values(0): prices = history.loc[symbol]['close'] if len(prices) >= 30: # Calculate absolute momentum components perf = {} perf["5d"] = prices.iloc[-1] / prices.iloc[-5] - 1 if len(prices) >= 5 else 0 perf["10d"] = prices.iloc[-1] / prices.iloc[-10] - 1 if len(prices) >= 10 else 0 perf["20d"] = prices.iloc[-1] / prices.iloc[-20] - 1 if len(prices) >= 20 else 0 perf["30d"] = prices.iloc[-1] / prices.iloc[-30] - 1 # Calculate relative outperformance vs SPY rel_perf = {} for period, spy_val in spy_perf.items(): rel_perf[period] = perf[period] - spy_val # Log detailed performance self.Debug(f" {symbol}: 5d: {perf['5d']*100:.1f}% (rel: {rel_perf['5d']*100:+.1f}%), " + f"10d: {perf['10d']*100:.1f}% (rel: {rel_perf['10d']*100:+.1f}%), " + f"30d: {perf['30d']*100:.1f}% (rel: {rel_perf['30d']*100:+.1f}%)") # Inverse ETFs need to show positive momentum in down markets if symbol in self.inverse_etfs: # In downtrends, rising inverse ETFs are good if market_deviation < -0.02: score = (perf["5d"] * 0.4) + (perf["10d"] * 0.4) + (perf["30d"] * 0.2) # Bonus for relative outperformance score += (rel_perf["5d"] + rel_perf["10d"]) * 0.15 else: # Less emphasis on long-term performance in neutral markets score = (perf["5d"] * 0.6) + (perf["10d"] * 0.3) + (perf["30d"] * 0.1) # Alternative defensive (bonds, gold) - focus on absolute return elif symbol in self.alternative_defensive: # Less dramatic movements, need lower thresholds score = (perf["5d"] * 0.3) + (perf["10d"] * 0.4) + (perf["30d"] * 0.3) # In downtrends, emphasize relative performance more if market_deviation < -0.03: score += rel_perf["10d"] * 0.2 # Bonus for outperformance # Sector ETFs - focus on relative outperformance else: # These should have positive absolute returns and outperform SPY abs_score = (perf["5d"] * 0.3) + (perf["10d"] * 0.3) + (perf["30d"] * 0.4) rel_score = (rel_perf["5d"] * 0.3) + (rel_perf["10d"] * 0.3) + (rel_perf["30d"] * 0.4) # Balance absolute and relative performance if market_deviation < -0.02: # In downtrends, relative outperformance is more important score = (abs_score * 0.4) + (rel_score * 0.6) else: # In neutral markets, absolute performance matters more score = (abs_score * 0.6) + (rel_score * 0.4) etf_scores[symbol] = score # Find candidates with appropriate momentum based on market conditions threshold = -0.007 # Default threshold if market_deviation < -0.03: threshold = -0.01 # More permissive in stronger downturns candidates = {s: score for s, score in etf_scores.items() if score > threshold} if not candidates: self.Debug("No defensive ETFs showed sufficient momentum - keeping as cash") return allocations # Sort and log candidate scores sorted_candidates = sorted(candidates.items(), key=lambda x: x[1], reverse=True) self.Debug(f"Top 5 defensive candidates:") for symbol, score in sorted_candidates[:5]: group = "Inverse" if symbol in self.inverse_etfs else "Alternative" if symbol in self.alternative_defensive else "Sector" self.Debug(f" {symbol} ({group}): Score {score*100:.2f}%") # Set allocation percent based on market conditions and trend allocation_pct = 0.0 if market_deviation < -0.05 or market_trend < -0.04: allocation_pct = 0.95 # Almost all available allocation elif market_deviation < -0.03 or market_trend < -0.02: allocation_pct = 0.8 elif market_deviation < -0.01 or market_trend < -0.01: allocation_pct = 0.6 else: allocation_pct = 0.4 # Adjust allocation based on strength of best candidate best_score = sorted_candidates[0][1] if sorted_candidates else 0 allocation_pct *= min(1.0, max(0.5, (best_score + 0.02) * 4)) # Determine number of ETFs to use - more in stronger downtrends num_etfs = 1 if (market_deviation < -0.04 or market_trend < -0.03) and len(sorted_candidates) > 1: num_etfs = min(2, len(sorted_candidates)) # Allocate to best candidates remaining_allocation = max_allocation * allocation_pct total_score = sum(score for _, score in sorted_candidates[:num_etfs]) if total_score > 0: for i in range(num_etfs): symbol, score = sorted_candidates[i] # Weight by relative score weight = score / total_score if total_score > 0 else 1.0/num_etfs # Calculate allocation etf_allocation = remaining_allocation * weight # Only allocate if meaningful if etf_allocation >= 0.02: # 2% minimum allocation allocations[symbol] = etf_allocation etf_type = "Inverse" if symbol in self.inverse_etfs else "Alternative" if symbol in self.alternative_defensive else "Sector" self.Debug(f"Selected {etf_type} ETF {symbol} with score {score*100:.2f}%, allocating {etf_allocation*100:.2f}%") return allocations
# region imports from AlgorithmImports import * # endregion from QuantConnect import * from QuantConnect.Algorithm import * from QuantConnect.Data import * from QuantConnect.Indicators import * from datetime import timedelta import numpy as np import pandas as pd import torch import os import torch.nn as nn from sklearn.preprocessing import RobustScaler class KQTStrategy: def __init__(self): self.model = None self.lookback = 30 self.scalers = {} self.feature_cols = [] self.stock_to_id = {} self.sector_mappings = {} self.adaptive_threshold = 0.2 self.pred_std = 1.0 self.current_regime = "neutral" self.portfolio_returns = [] self.defensive_mode = False self.previous_day_hit_stops = [] def create_sliding_sequences(self, df, feature_cols, lookback, stride=1): X = [] for i in range(0, len(df) - lookback + 1, stride): X.append(df.iloc[i:i+lookback][feature_cols].values.astype(np.float32)) return np.array(X) def clip_outliers(self, df, cols, lower=0.01, upper=0.99): df_copy = df.copy() for col in cols: if col in df_copy.columns: q_low = df_copy[col].quantile(lower) q_high = df_copy[col].quantile(upper) df_copy.loc[df_copy[col] < q_low, col] = q_low df_copy.loc[df_copy[col] > q_high, col] = q_high return df_copy def filter_features_to_match_model(self, df, feature_cols, required_count=5): """Ensure we have exactly the required number of features""" if len(feature_cols) == required_count: return feature_cols # First, prioritize the lag returns (most important) lag_features = [col for col in feature_cols if 'return_lag' in col] # Next, add in the most predictive technical features in a fixed order tech_priority = ['roc_5', 'volatility_10', 'ma_cross', 'dist_ma20', 'momentum_1m', 'oversold', 'overbought', 'roc_diff', 'volatility_regime'] prioritized_features = lag_features.copy() for feat in tech_priority: if feat in feature_cols and len(prioritized_features) < required_count: prioritized_features.append(feat) # If still not enough, add remaining features remaining = [col for col in feature_cols if col not in prioritized_features] while len(prioritized_features) < required_count and remaining: prioritized_features.append(remaining.pop(0)) # If too many, truncate return prioritized_features[:required_count] def add_technical_features(self, df): if 'Close' not in df.columns: return df df['ma5'] = df['Close'].rolling(5).mean() / df['Close'] - 1 # Relative to price df['ma20'] = df['Close'].rolling(20).mean() / df['Close'] - 1 df['ma_cross'] = df['ma5'] - df['ma20'] # Moving average crossover signal df['volatility_10'] = df['Close'].pct_change().rolling(10).std() df['volatility_ratio'] = df['Close'].pct_change().rolling(5).std() / df['Close'].pct_change().rolling(20).std() df['roc_5'] = df['Close'].pct_change(5) df['roc_10'] = df['Close'].pct_change(10) df['roc_diff'] = df['roc_5'] - df['roc_10'] df['dist_ma20'] = (df['Close'] / df['Close'].rolling(20).mean() - 1) return df.fillna(0) def add_enhanced_features(self, df): """Add enhanced technical features""" df['volatility_trend'] = df['volatility_10'].pct_change(5) df['volatility_regime'] = (df['volatility_10'] > df['volatility_10'].rolling(20).mean()).astype(int) if 'volume' in df.columns: df['vol_ma_ratio'] = df['volume'] / df['volume'].rolling(20).mean() df['vol_price_trend'] = df['vol_ma_ratio'] * df['roc_5'] df['momentum_1m'] = df['Close'].pct_change(20) df['momentum_3m'] = df['Close'].pct_change(60) df['momentum_breadth'] = ( (df['roc_5'] > 0).astype(int) + (df['momentum_1m'] > 0).astype(int) + (df['momentum_3m'] > 0).astype(int) ) / 3 df['mean_rev_signal'] = -1 * df['dist_ma20'] * df['volatility_10'] df['oversold'] = (df['dist_ma20'] < -2 * df['volatility_10']).astype(int) df['overbought'] = (df['dist_ma20'] > 2 * df['volatility_10']).astype(int) df['regime_change'] = (np.sign(df['ma_cross']) != np.sign(df['ma_cross'].shift(1))).astype(int) df['risk_adj_momentum'] = df['roc_5'] / (df['volatility_10'] + 0.001) return df def prepare_stock_data(self, stock_data, ticker, is_training=False): """Prepare data for a single stock""" if len(stock_data) < self.lookback + 5: # Need enough data return None, None stock_df = pd.DataFrame({ 'Close': stock_data['close'].values, 'time': stock_data['time'].values }) if 'volume' in stock_data.columns: stock_df['volume'] = stock_data['volume'].values stock_df = stock_df.sort_values('time').reset_index(drop=True) stock_df['pct_return'] = stock_df['Close'].pct_change().shift(-1) * 100 # In prepare_stock_data, replace the feature cols section with: feature_cols = [] # Add basic lag features for i in range(1, 6): col_name = f'return_lag{i}' stock_df[col_name] = stock_df['pct_return'].shift(i) feature_cols.append(col_name) # Add technical features stock_df = self.add_technical_features(stock_df) stock_df = self.add_enhanced_features(stock_df) # Add all potential features additional_features = ['ma_cross', 'volatility_10', 'roc_5', 'roc_diff', 'dist_ma20'] enhanced_features = ['volatility_trend', 'volatility_regime', 'momentum_1m', 'momentum_breadth', 'mean_rev_signal', 'oversold', 'overbought', 'regime_change', 'risk_adj_momentum'] for col in additional_features + enhanced_features: if col in stock_df.columns: feature_cols.append(col) # Filter to the exact number of features expected by the model model_feature_count = 5 # Use the exact count from your model feature_cols = self.filter_features_to_match_model(stock_df, feature_cols, model_feature_count) if not self.feature_cols: self.feature_cols = feature_cols.copy() stock_df = stock_df.dropna().reset_index(drop=True) # Handle outliers stock_df = self.clip_outliers(stock_df, feature_cols) # Replace the scaling code in prepare_stock_data with this: # Scale features if ticker not in self.scalers or is_training: # Check if we have data if len(stock_df) == 0 or len(feature_cols) == 0: return None, stock_df # Return early if no data # Check if any features are empty/nan if stock_df[feature_cols].isna().any().any() or stock_df[feature_cols].empty: # Fill NaNs with zeros stock_df[feature_cols] = stock_df[feature_cols].fillna(0) # Ensure we have data if len(stock_df[feature_cols]) > 0: try: scaler = RobustScaler() stock_df[feature_cols] = scaler.fit_transform(stock_df[feature_cols]) self.scalers[ticker] = scaler except Exception as e: print(f"Scaling error for {ticker}: {str(e)}") # Use a simple standardization as fallback for col in feature_cols: mean = stock_df[col].mean() std = stock_df[col].std() if std > 0: stock_df[col] = (stock_df[col] - mean) / std else: stock_df[col] = 0 else: return None, stock_df # Return early if empty after processing else: # Use existing scaler scaler = self.scalers[ticker] try: stock_df[feature_cols] = scaler.transform(stock_df[feature_cols]) except Exception as e: print(f"Transform error for {ticker}: {str(e)}") # Simple standardization fallback for col in feature_cols: if col in stock_df.columns and len(stock_df[col]) > 0: mean = stock_df[col].mean() std = stock_df[col].std() if std > 0: stock_df[col] = (stock_df[col] - mean) / std else: stock_df[col] = 0 # Create sequences for prediction X = self.create_sliding_sequences(stock_df, feature_cols, self.lookback, stride=1) if len(X) == 0: return None, stock_df return X, stock_df # Add to strategy.py in KQTStrategy class def calculate_portfolio_risk_score(self, market_returns): """Calculate a portfolio risk score (0-100) to scale overall exposure""" risk_score = 50 # Neutral starting point # VIX-like volatility measurement using SPY returns if len(market_returns) >= 5: recent_vol = np.std(market_returns[-5:]) * np.sqrt(252) # Annualized longer_vol = np.std(market_returns[-10:]) * np.sqrt(252) if len(market_returns) >= 10 else recent_vol # Volatility spike detection vol_ratio = recent_vol / longer_vol if longer_vol > 0 else 1 if vol_ratio > 1.5: # Sharp volatility increase risk_score -= 30 elif vol_ratio > 1.2: risk_score -= 15 # Consecutive negative days if len(market_returns) >= 3: neg_days = sum(1 for r in market_returns[-3:] if r < 0) if neg_days == 3: # Three consecutive down days risk_score -= 20 elif neg_days == 2: risk_score -= 10 # Trend direction if len(market_returns) >= 10: avg_recent = np.mean(market_returns[-5:]) avg_older = np.mean(market_returns[-10:-5]) trend_change = avg_recent - avg_older # Declining trend if trend_change < -0.3: risk_score -= 15 # Accelerating uptrend elif trend_change > 0.3 and avg_recent > 0: risk_score += 10 return max(10, min(100, risk_score)) # Constrain between 10-100 def predict_returns(self, X, ticker): """Make predictions for a single stock""" if self.model is None: return 0 if ticker not in self.stock_to_id: self.stock_to_id[ticker] = len(self.stock_to_id) stock_id = self.stock_to_id[ticker] try: X_tensor = torch.tensor(X, dtype=torch.float32) stock_ids = torch.tensor([stock_id] * len(X), dtype=torch.long) with torch.no_grad(): predictions = self.model(X_tensor, stock_ids) # Convert to standard Python float for safety return float(predictions.detach().numpy().flatten()[-1]) except Exception as e: print(f"Prediction error for {ticker}: {e}") return 0 # Return neutral prediction on error def detect_market_regime(self, daily_returns, lookback=10): """Detect current market regime based on portfolio returns""" if len(daily_returns) >= 1: market_return = np.mean(daily_returns) market_vol = np.std(daily_returns) if len(self.portfolio_returns) >= 3: recent_returns = self.portfolio_returns[-min(lookback, len(self.portfolio_returns)):] avg_recent_return = np.mean(recent_returns) if len(self.portfolio_returns) >= 5: very_recent = np.mean(self.portfolio_returns[-3:]) less_recent = np.mean(self.portfolio_returns[-min(8, len(self.portfolio_returns)):-3]) trend_change = very_recent - less_recent if trend_change > 0.5 and avg_recent_return > 0.2: return "breakout_bullish" elif trend_change < -0.5 and avg_recent_return < -0.2: return "breakdown_bearish" if avg_recent_return > 0.15: if market_return > 0: return "bullish_strong" else: return "bullish_pullback" elif avg_recent_return < -0.3: if market_return < -0.2: return "bearish_high_vol" else: return "bearish_low_vol" elif avg_recent_return > 0 and market_return > 0: return "bullish" elif avg_recent_return < 0 and market_return < 0: return "bearish" if market_return > -0.05: return "neutral" else: return "bearish" return "neutral" def detect_bearish_signals(self, recent_returns): """Detect early warning signs of bearish conditions""" bearish_signals = 0 signal_strength = 0 if len(self.portfolio_returns) >= 5: recent_portfolio_returns = self.portfolio_returns[-5:] pos_days = sum(1 for r in recent_portfolio_returns if r > 0) neg_days = sum(1 for r in recent_portfolio_returns if r < 0) if neg_days > pos_days: bearish_signals += 1 signal_strength += 0.2 * (neg_days - pos_days) if len(self.portfolio_returns) >= 10: recent_vol = np.std(self.portfolio_returns[-5:]) older_vol = np.std(self.portfolio_returns[-10:-5]) if recent_vol > older_vol * 1.3: # 30% volatility increase bearish_signals += 1 signal_strength += 0.3 * (recent_vol/older_vol - 1) if len(self.portfolio_returns) >= 5: if self.portfolio_returns[-1] < 0 and self.portfolio_returns[-2] > 0.3: bearish_signals += 1 signal_strength += 0.3 return bearish_signals, signal_strength def generate_positions(self, prediction_data, current_returns=None): """Generate position sizing based on predictions with improved diversification""" if not prediction_data: return {} # Update market regime if current_returns is not None: self.current_regime = self.detect_market_regime(current_returns) bearish_count, bearish_strength = self.detect_bearish_signals(current_returns) self.defensive_mode = bearish_count >= 2 or bearish_strength > 0.5 # Calculate portfolio risk score (0-100) portfolio_risk_score = self.calculate_portfolio_risk_score(current_returns if current_returns else []) # Convert to a scaling factor (0.1 to 1.0) risk_scaling = portfolio_risk_score / 100 base_threshold = 0.25 * self.pred_std if self.current_regime in ["bullish_strong", "breakout_bullish"]: self.adaptive_threshold = base_threshold * 0.4 elif self.current_regime in ["bearish_high_vol", "breakdown_bearish"]: self.adaptive_threshold = base_threshold * 2.5 elif self.current_regime in ["bearish", "bearish_low_vol"]: self.adaptive_threshold = base_threshold * 1.6 elif self.current_regime in ["bullish_pullback"]: self.adaptive_threshold = base_threshold * 0.9 else: # neutral or other regimes self.adaptive_threshold = base_threshold * 0.75 positions = {} # Group stocks by sector sector_data = {} for ticker, data in prediction_data.items(): pred_return = data["pred_return"] sector = self.sector_mappings.get(ticker, "Unknown") if sector not in sector_data: sector_data[sector] = [] sector_data[sector].append({ "ticker": ticker, "pred_return": pred_return, "composite_score": pred_return / self.adaptive_threshold }) # Rank sectors by predicted return sector_avg_scores = {} for sector, stocks in sector_data.items(): sector_avg_scores[sector] = np.mean([s["pred_return"] for s in stocks]) # CHANGE: Include more sectors (3-4 instead of just 2) ranked_sectors = sorted(sector_avg_scores.keys(), key=lambda x: sector_avg_scores[x], reverse=True) top_sector_count = 3 if portfolio_risk_score > 60 else 2 # More diversification in lower risk periods top_sectors = ranked_sectors[:min(top_sector_count, len(ranked_sectors))] # CHANGE: Allow more stocks per sector in bull markets stocks_per_sector = 3 if self.current_regime in ["bullish_strong", "breakout_bullish"] else 2 # Allocate within top sectors - focus on stocks with strongest signals for sector in top_sectors: sector_stocks = sorted(sector_data[sector], key=lambda x: x["pred_return"], reverse=True) # Take top N stocks in each selected sector top_stocks = sector_stocks[:min(stocks_per_sector, len(sector_stocks))] # CHANGE: Make position size proportional to signal strength but limited by volatility for stock in top_stocks: ticker = stock["ticker"] signal_strength = stock["pred_return"] / (0.2 * self.pred_std) # Base size calculation base_size = min(0.3, max(0.05, 0.15 * signal_strength)) # Scale by portfolio risk final_size = base_size * risk_scaling positions[ticker] = final_size # Defensive adjustments if self.defensive_mode or self.current_regime in ["bearish_high_vol", "bearish_low_vol", "breakdown_bearish"]: # 1. Reduce overall position sizes scaling_factor = 0.5 if self.defensive_mode else 0.7 # More aggressive reduction for ticker in positions: positions[ticker] *= scaling_factor # 2. Add inverse positions (shorts) as hedges if we have bearish predictions if len(positions) > 0 and portfolio_risk_score < 40: # Only hedge in higher risk environments negative_preds = {t: data["pred_return"] for t, data in prediction_data.items() if data["pred_return"] < 0 and t not in positions} if negative_preds: worst_stocks = sorted(negative_preds.items(), key=lambda x: x[1])[:2] for ticker, pred in worst_stocks: hedge_size = -0.15 if self.defensive_mode else -0.1 positions[ticker] = hedge_size # CRITICAL FIX: Add overall position cap to prevent excessive leverage # Calculate how many positions we'll have in total total_position_count = sum(min(stocks_per_sector, len(sector_data.get(sector, []))) for sector in top_sectors) # Limit per-position size based on total count to avoid excessive leverage max_position_size = min(0.3, max(0.05, 0.95 / max(total_position_count, 1))) # NEW: Add a post-processing step to ensure total allocation is reasonable total_allocation = sum(abs(size) for size in positions.values()) if total_allocation > 0.95: scaling_factor = 0.95 / total_allocation for ticker in positions: positions[ticker] *= scaling_factor # Also ensure no single position is too large (could cause extreme leverage) for ticker in list(positions.keys()): if abs(positions[ticker]) > max_position_size: positions[ticker] = max_position_size * (1 if positions[ticker] > 0 else -1) return positions def get_stop_loss_level(self): """Get appropriate stop-loss level based on market regime""" if self.current_regime in ["bullish_strong", "breakout_bullish"]: if self.defensive_mode: return -2.0 # Tighter in defensive mode else: return -3.5 # More room for positions to breathe elif self.current_regime in ["bearish_high_vol", "breakdown_bearish"]: return -1.5 # Tighter stop-loss in bearish regimes else: if self.defensive_mode: return -1.8 else: return -2.5 def update_portfolio_returns(self, daily_return): """Update portfolio return history""" self.portfolio_returns.append(daily_return) if len(self.portfolio_returns) > 60: # Keep a rolling window self.portfolio_returns = self.portfolio_returns[-60:] def update_model_calibration(self, all_predictions): """Update prediction standard deviation for threshold calibration""" all_pred_values = [p for p in all_predictions.values()] if len(all_pred_values) > 5: self.pred_std = np.std(all_pred_values)