Overall Statistics |
Total Orders 477 Average Win 0.91% Average Loss -0.21% Compounding Annual Return 22.138% Drawdown 13.200% Expectancy 1.949 Start Equity 1000000 End Equity 3175591.13 Net Profit 217.559% Sharpe Ratio 1.184 Sortino Ratio 1.223 Probabilistic Sharpe Ratio 82.037% Loss Rate 44% Win Rate 56% Profit-Loss Ratio 4.25 Alpha 0.082 Beta 0.442 Annual Standard Deviation 0.109 Annual Variance 0.012 Information Ratio 0.179 Tracking Error 0.123 Treynor Ratio 0.292 Total Fees $2025.47 Estimated Strategy Capacity $0 Lowest Capacity Asset NB R735QTJ8XC9X Portfolio Turnover 0.89% |
import numpy as np from AlgorithmImports import * class AssetWeightCalculator: def __init__(self, algorithm: QCAlgorithm): self.algorithm = algorithm self.risk_free = self.algorithm.add_equity("BIL", Resolution.HOUR) def coarse_selection(self, coarse): """ Selects stonks, first filter """ # Sorts by dollar volume before taking top 200 sorted_by_volume = sorted([x for x in coarse if x.price > 10 and x.has_fundamental_data], key=lambda x: x.dollar_volume, reverse=True) return [x.symbol for x in sorted_by_volume][:200] def fine_selection(self, fine): """ Selects stonks, second filter """ filtered = [x.symbol for x in fine if x.market_cap is not None and x.market_cap > 10e9] self.algorithm.debug(f"Fine Selection: {len(filtered)} symbols passed filters") # Doing it this way makes it so that stocks are ranked on each universe update and then the macds can be redone with the scheduler in main ranked_symbols = self.rank_stocks(filtered) return ranked_symbols def calculate_sharpe_ratio(self, symbol, period=4914): # This is 3 yrs worth of trading days """ Calculates the sharpe """ try: # If a KeyValuePair was recieved only take the symbol if hasattr(symbol, "Key"): symbol = symbol.Key history = self.algorithm.history([symbol], period, Resolution.HOUR) if history.empty: self.algorithm.debug(f"No history for {symbol.value}") return None # Get risk-free rate rf_history = self.algorithm.history(self.risk_free.symbol, 1, Resolution.HOUR) risk_free_rate = rf_history['close'].iloc[-1]/100 if not rf_history.empty else 0.02 # Default to 2% if no data # Sharpe ratio logic returns = history['close'].pct_change().dropna() excess_returns = returns - (risk_free_rate/1638) mean_excess_return = excess_returns.mean() * 1638 std_dev = excess_returns.std() * np.sqrt(1638) return mean_excess_return / std_dev if std_dev != 0 else None except Exception as e: self.algorithm.debug(f"Error calculating Sharpe for {symbol.value}: {str(e)}") return None def rank_stocks(self, symbols): """ Ranks da top 50 stocks based on sharpe """ if not symbols: self.algorithm.debug("No symbols to rank") return [] self.algorithm.debug(f"Ranking {len(symbols)} symbols") # Converting from key pair if neccessary symbols = [s.Key if hasattr(s, 'Key') else s for s in symbols] scores = {symbol: self.calculate_sharpe_ratio(symbol) for symbol in symbols} valid_scores = {k: v for k, v in scores.items() if v is not None} self.algorithm.debug(f"Valid Sharpe ratios: {len(valid_scores)} out of {len(symbols)}") if not valid_scores: return [] sorted_scores = sorted(valid_scores, key=valid_scores.get, reverse=True)[:20] self.algorithm.log(f"All symbols before ranking: {[s.value for s in symbols]}") self.algorithm.log(f"Symbols after filtering: {[s.value for s in valid_scores.keys()]}") return sorted_scores def normalize_scores(self, scores): """ The list of scores from the ranking method are normalized using a z score so that an additive operation may be used in WeightCombiner() """ values = np.array(list(scores.values())) mean = np.mean(values) std_dev = np.std(values) if std_dev == 0: # If no variation in scores, assign equal normalized scores return {symbol: 0 for symbol in scores.keys()} normalized_scores = {symbol: (score - mean) / std_dev for symbol, score in scores.items()} print(normalized_scores) #To see output for debugging return normalized_scores
from AlgorithmImports import * class MACDSignalGenerator: def __init__(self, algorithm: QCAlgorithm, symbols: list, cash_buffer: float = 0.05): self.algorithm = algorithm self.symbols = symbols self.cash_buffer = cash_buffer self.macd_indicators = {} # {symbol: {variant: MACD}} # Define MACD parameters for different variants self.macd_variants = { "slow": {"fast": 12, "slow": 26, "signal": 9}, "slow-med": {"fast": 9, "slow": 19, "signal": 5}, "med-fast": {"fast": 7, "slow": 15, "signal": 3}, "fast": {"fast": 5, "slow": 12, "signal": 2}, } def remove_symbols(self, symbols: list): """ Removes MACD indicators for the specified symbols. """ for symbol in symbols: # Liquidate position before removing indicator self.algorithm.liquidate(symbol) # Unregister and delete indicators tied to each symbol if symbol in self.macd_indicators: for macd in self.macd_indicators[symbol].values(): # Better: gets MACD objects directly self.algorithm.unregister_indicator(macd) del self.macd_indicators[symbol] def add_symbols(self, new_symbols): """ Add in the new symbols that are given by AssetWeightCalculator. """ # Log initial attempt self.algorithm.debug(f"Attempting to add symbols: {[s.value for s in new_symbols]}") # Get historical data for new symbols history = self.algorithm.history([s for s in new_symbols], 35, # Longest MACD period needed Resolution.HOUR) # Log history data availability self.algorithm.debug(f"History data available for: {history.index.get_level_values(0).unique()}") self.symbols.extend(new_symbols) for symbol in new_symbols: security = self.algorithm.securities[symbol] # Detailed security check logging # self.algorithm.debug(f"Security {symbol.value} check:" # f" has_data={security.has_data}," # f" is_tradable={security.is_tradable}," # f" price={security.price}") # Checking if price is 0 if not (security.has_data and security.is_tradable and security.price > 0): self.algorithm.debug(f"Waiting for valid price data: {symbol.value}") continue # Adding the symbol if symbol not in self.macd_indicators: self.macd_indicators[symbol] = {} # Get symbol's historical data if symbol not in history.index.get_level_values(0): self.algorithm.debug(f"No history data for: {symbol.value}") continue symbol_history = history.loc[symbol] self.algorithm.debug(f"History rows for {symbol.value}: {len(symbol_history)}") for variant, params in self.macd_variants.items(): macd = self.algorithm.macd( symbol=symbol, fast_period=params["fast"], slow_period=params["slow"], signal_period=params["signal"], type=MovingAverageType.EXPONENTIAL, resolution=Resolution.HOUR, selector=Field.CLOSE ) self.macd_indicators[symbol][variant] = macd # Warm up MACD with historical data for time, row in symbol_history.iterrows(): macd.update(time, row['close']) self.macd_indicators[symbol][variant] = macd def calculate_position_sizes(self): position_sizes = {} max_position_limit = 0.1 # Check if we have any symbols to process if not self.symbols or not self.macd_indicators: self.algorithm.debug("No symbols available for position calculation") return position_sizes # Calculating the maximum one variant can be in size max_position = (1 - self.cash_buffer) / (len(self.symbols) * len(self.macd_variants)) for symbol in self.macd_indicators: position_sizes[symbol] = {} for variant, macd in self.macd_indicators[symbol].items(): if macd.is_ready: security = self.algorithm.securities[symbol] # Detailed security check logging # self.algorithm.debug(f"Position Check for {symbol.value}:" # f" has_data={security.has_data}," # f" is_tradable={security.is_tradable}," # f" price={security.price}," # f" last_data={security.get_last_data() is not None},") # More comprehensive check # if not (security.has_data and # security.is_tradable and # security.price > 0 and # security.get_last_data() is not None): # self.algorithm.debug(f"Security not ready: {symbol.value}") # continue # Distance between fast and slow distance = macd.fast.current.value - macd.slow.current.value # Normalize the distance as a percentage difference and then as a fraction of max position position_size = max_position * (distance / macd.slow.current.value) * 70 # Scalar value of max_position, the scalar integer can be though of as a form of leverage setting # Only allow positive positions, cap at maximum position_size = max(0, min(position_size, max_position_limit)) position_sizes[symbol][variant] = position_size #self.algorithm.debug(f"Calculated position for {symbol.value} {variant}: {position_size}") else: position_sizes[symbol][variant] = 0 # Running daily cause the logging is too heavy hourly if self.algorithm.time.hour == 10 and self.algorithm.time.minute == 0: rounded_positions = [(s.value, {k: round(v, 5) for k, v in sizes.items()}) for s, sizes in position_sizes.items()] #self.algorithm.debug(f"Daily position sizes proposed: {rounded_positions}") return position_sizes
from AlgorithmImports import * import numpy as np from datetime import timedelta class MarketCapWeightedSP500Tracker(QCAlgorithm): def Initialize(self): self.SetStartDate(2019, 1, 1) self.SetEndDate(2025, 1, 1) self.SetCash(1000000) self.UniverseSettings.Resolution = Resolution.Daily self.spy = self.AddEquity("SPY", Resolution.Daily).Symbol self.bil = self.AddEquity("BIL", Resolution.Daily).Symbol # Add tactical ETFs - much smaller set self.spxl = self.AddEquity("SPXL", Resolution.Daily).Symbol # 3x S&P 500 bull self.spxs = self.AddEquity("SPXS", Resolution.Daily).Symbol # 3x S&P 500 bear # Much smaller tactical allocation - only 8% of portfolio self.tactical_allocation = 0.08 self.tactical_positions = set() self.last_tactical_trade = datetime(1900, 1, 1) # Add volatility tracking self.spy_volatility_window = RollingWindow[float](20) self.AddUniverse(self.CoarseSelectionFunction, self.FineSelectionFunction) self.selected_by_market_cap = [] self.rebalance_flag = False self.spy_30day_window = RollingWindow[float](30) self.entry_prices = {} self.previous_bil_allocation = 0.0 self.Schedule.On(self.DateRules.MonthStart(self.spy), self.TimeRules.AfterMarketOpen(self.spy, 30), self.SetRebalanceFlag) self.Schedule.On(self.DateRules.WeekStart(self.spy, DayOfWeek.Wednesday), self.TimeRules.AfterMarketOpen(self.spy, 30), self.MonthlyRebalance) # Weekly tactical adjustment (not daily - reduces turnover) self.Schedule.On(self.DateRules.WeekStart(self.spy, DayOfWeek.Monday), self.TimeRules.AfterMarketOpen(self.spy, 45), # After main rebalance self.WeeklyTacticalAdjustment) # Initialize rolling window with historical data history = self.History(self.spy, 30, Resolution.Daily) if not history.empty: for time, row in history.loc[self.spy].iterrows(): self.spy_30day_window.Add(row["close"]) def CoarseSelectionFunction(self, coarse): filtered = [x for x in coarse if x.HasFundamentalData and x.Price > 5 and x.Market == Market.USA] return [x.Symbol for x in filtered] def FineSelectionFunction(self, fine): filtered = [x for x in fine if x.MarketCap > 1e10 and x.SecurityReference.SecurityType == "ST00000001"] sorted_by_cap = sorted(filtered, key=lambda x: x.MarketCap, reverse=True)[:30] self.selected_by_market_cap = [(x.Symbol, x.MarketCap) for x in sorted_by_cap] return [x.Symbol for x in sorted_by_cap] def SetRebalanceFlag(self): if self.Time.weekday() == 2: # Wednesday self.rebalance_flag = True def OnData(self, data): # Update price window if not data.Bars.ContainsKey(self.spy): return self.spy_30day_window.Add(data.Bars[self.spy].Close) # Update volatility tracking if data.Bars.ContainsKey(self.spy): current_price = data.Bars[self.spy].Close if self.spy_volatility_window.Count > 0: previous_price = self.spy_volatility_window[0] if previous_price > 0: # Ensure no division by zero daily_return = (current_price - previous_price) / previous_price self.spy_volatility_window.Add(daily_return) else: # First value should be price self.spy_volatility_window.Add(current_price) # Track if any stop-loss was triggered stop_loss_triggered = False # Check for stop-loss triggers on all holdings for kvp in self.Portfolio: symbol = kvp.Key holding = kvp.Value if holding.Invested and symbol != self.bil: current_price = self.Securities[symbol].Price if symbol not in self.entry_prices: self.entry_prices[symbol] = current_price price_drop = (self.entry_prices[symbol] - current_price) / self.entry_prices[symbol] # Slightly more nuanced stop-loss threshold stop_threshold = 0.05 # For larger positions, use slightly tighter stop-loss to protect portfolio position_value = holding.HoldingsValue portfolio_pct = position_value / self.Portfolio.TotalPortfolioValue if portfolio_pct > 0.08: # If position is over 8% of portfolio stop_threshold = 0.048 # Slightly tighter stop if price_drop >= stop_threshold: stop_loss_triggered = True # Liquidate the position self.Liquidate(symbol) self.Debug(f"Stop-loss triggered for {symbol} at {current_price}, drop: {price_drop*100:.2f}%") # If any stop-loss was triggered, invest all available cash in BIL if stop_loss_triggered: # Calculate total available cash (including unsettled cash from sales) available_cash = self.Portfolio.Cash + self.Portfolio.UnsettledCash if available_cash > 0: bil_price = self.Securities[self.bil].Price if bil_price is not None and bil_price > 0: bil_quantity = available_cash / bil_price self.MarketOrder(self.bil, bil_quantity) self.Debug(f"Invested ${available_cash:0.2f} in BIL after stop-loss") else: self.Debug("BIL price not available or zero; skipping buy.") def WeeklyTacticalAdjustment(self): """Weekly tactical adjustment that uses a very small portion (8%) of the portfolio""" # Only calculate market state if we have enough history if self.spy_30day_window.Count < 30: return # Check how far we are from monthly rebalance to avoid conflicting trades days_since_rebalance = (self.Time.date() - self.last_rebalance_date.date()).days if hasattr(self, 'last_rebalance_date') else 999 if days_since_rebalance < 3: return # Skip if we just rebalanced the main portfolio # Calculate current market state spy_price = self.Securities[self.spy].Price sma_30 = sum(self.spy_30day_window) / self.spy_30day_window.Count # Market state: percentage deviation from 30-day SMA market_deviation = (spy_price / sma_30) - 1.0 # Calculate market volatility - ensure we have enough values market_volatility = 0 if self.spy_volatility_window.Count >= 20: returns = [self.spy_volatility_window[i] for i in range(1, min(20, self.spy_volatility_window.Count))] if returns: market_volatility = np.std(returns) * np.sqrt(252) # Annualized # Close existing tactical positions if conditions have changed for symbol in list(self.tactical_positions): # Make a copy since we'll modify during iteration # Liquidate tactical positions that no longer match market conditions if symbol == self.spxl and market_deviation < 0.02: # No longer in strong bull self.Liquidate(symbol) self.tactical_positions.remove(symbol) self.Debug(f"Tactical: Closed bull position due to changing market conditions") elif symbol == self.spxs and market_deviation > -0.02: # No longer in strong bear self.Liquidate(symbol) self.tactical_positions.remove(symbol) self.Debug(f"Tactical: Closed bear position due to changing market conditions") # Calculate portfolio value excluding tactical positions tactical_value = sum(self.Portfolio[s].HoldingsValue for s in self.tactical_positions if self.Portfolio.ContainsKey(s) and self.Portfolio[s].Invested) core_portfolio_value = self.Portfolio.TotalPortfolioValue - tactical_value # Strong Bull Market: SPY > 5% above 30-day SMA and previous month has low BIL allocation if market_deviation > 0.05 and self.previous_bil_allocation < 0.3: # Avoid if volatility is too high (> 30% annualized) if market_volatility < 0.30 and self.spxl not in self.tactical_positions: # Allocate to the 3x bull ETF target_value = core_portfolio_value * self.tactical_allocation current_price = self.Securities[self.spxl].Price if current_price > 0: shares = target_value / current_price self.MarketOrder(self.spxl, shares) self.tactical_positions.add(self.spxl) self.Debug(f"Tactical: Allocated {self.tactical_allocation*100:.1f}% to SPXL in strong bull market") # Strong Bear Market: SPY > 5% below 30-day SMA and high BIL allocation elif market_deviation < -0.05 and self.previous_bil_allocation > 0.4: # Only in high volatility environments if market_volatility > 0.20 and self.spxs not in self.tactical_positions: # Allocate to the 3x bear ETF target_value = core_portfolio_value * self.tactical_allocation current_price = self.Securities[self.spxs].Price if current_price > 0: shares = target_value / current_price self.MarketOrder(self.spxs, shares) self.tactical_positions.add(self.spxs) self.Debug(f"Tactical: Allocated {self.tactical_allocation*100:.1f}% to SPXS in strong bear market") self.last_tactical_trade = self.Time def MonthlyRebalance(self): if not self.rebalance_flag: return self.rebalance_flag = False self.last_rebalance_date = self.Time # Track when we do the monthly rebalance self.entry_prices.clear() # Reset entry prices at rebalance if self.spy_30day_window.Count < 30: self.Debug("Waiting for enough SPY history.") return spy_price = self.Securities[self.spy].Price sma_30 = sum(self.spy_30day_window) / 30 # Calculate new BIL allocation based on SMA bil_weight = 0.0 if spy_price < sma_30: # Original formula with slight adjustment to react faster to deeper drops base_weight = (sma_30 - spy_price) / sma_30 # Use a slightly more aggressive allocation when significantly below SMA if base_weight > 0.08: # Slightly earlier trigger (8% below vs 10%) bil_weight = min(base_weight * 1.2, 1.0) # 20% more aggressive else: bil_weight = min(base_weight, 1.0) # Apply reduction rule from previous month (maintain original approach) min_bil_allocation = self.previous_bil_allocation * 0.8 bil_weight = max(bil_weight, min_bil_allocation) # Cap at 80% to ensure some equity exposure remains even in severe downturns bil_weight = min(bil_weight, 0.8) # If market is rising strongly, reduce BIL allocation more aggressively if spy_price > sma_30 * 1.03: # More responsive to uptrends (3% above SMA) bil_weight = min(bil_weight, self.previous_bil_allocation * 0.75) # Slightly faster reduction # Add an even stronger reduction in very strong uptrends if spy_price > sma_30 * 1.08: # Very strong uptrend bil_weight = min(bil_weight, self.previous_bil_allocation * 0.65) # Much faster reduction # Tactical positions should not affect normal position sizing # Calculate core portfolio value excluding tactical positions tactical_value = sum(self.Portfolio[s].HoldingsValue for s in self.tactical_positions if self.Portfolio.ContainsKey(s) and self.Portfolio[s].Invested) core_portfolio_value = self.Portfolio.TotalPortfolioValue - tactical_value equity_weight = 1.0 - bil_weight if not self.selected_by_market_cap: self.Debug("No stocks selected for investment.") return total_market_cap = sum([x[1] for x in self.selected_by_market_cap]) weights = {x[0]: (x[1] / total_market_cap) * equity_weight for x in self.selected_by_market_cap} # Limit individual stock exposure to improve diversification capped_weights = {} for symbol, weight in weights.items(): # Slightly higher weight cap for largest companies market_cap = next((cap for sym, cap in self.selected_by_market_cap if sym == symbol), 0) if market_cap > 5e11: # Over $500B market cap capped_weights[symbol] = min(weight, equity_weight * 0.12) # Allow up to 12% else: capped_weights[symbol] = min(weight, equity_weight * 0.10) # Standard 10% cap # Normalize the weights after capping total_capped_weight = sum(capped_weights.values()) if total_capped_weight > 0: normalized_weights = {symbol: (weight / total_capped_weight) * equity_weight for symbol, weight in capped_weights.items()} else: normalized_weights = weights invested = set() for symbol, weight in normalized_weights.items(): if weight > 0.005: # Small minimum position threshold (0.5%) self.SetHoldings(symbol, weight) invested.add(symbol) self.entry_prices[symbol] = self.Securities[symbol].Price if bil_weight > 0: self.SetHoldings(self.bil, bil_weight) invested.add(self.bil) else: self.Liquidate(self.bil) # Store current BIL allocation for next month's minimum self.previous_bil_allocation = self.Portfolio[self.bil].HoldingsValue / self.Portfolio.TotalPortfolioValue self.Debug(f"New BIL allocation: {bil_weight*100:0.2f}% (Minimum was {min_bil_allocation*100:0.2f}%)") # Add conditional logging to help track performance if spy_price < sma_30: self.Debug(f"Market below 30d SMA by {((sma_30 - spy_price) / sma_30 * 100):0.2f}%") else: self.Debug(f"Market above 30d SMA by {((spy_price - sma_30) / sma_30 * 100):0.2f}%") # Add portfolio statistics to logs self.Debug(f"Total invested: {self.Portfolio.Invested:0.2f}, Cash: {self.Portfolio.Cash:0.2f}") # Liquidate positions not in current selection for kvp in self.Portfolio: symbol = kvp.Key if (kvp.Value.Invested and symbol not in invested and symbol != self.spy and symbol not in self.tactical_positions): self.Liquidate(symbol)
# region imports from AlgorithmImports import * # endregion from QuantConnect import * from QuantConnect.Algorithm import * from QuantConnect.Data import * from QuantConnect.Indicators import * from datetime import timedelta import numpy as np import pandas as pd import torch import os import torch.nn as nn from sklearn.preprocessing import RobustScaler class KQTStrategy: def __init__(self): self.model = None self.lookback = 30 self.scalers = {} self.feature_cols = [] self.stock_to_id = {} self.sector_mappings = {} self.adaptive_threshold = 0.2 self.pred_std = 1.0 self.current_regime = "neutral" self.portfolio_returns = [] self.defensive_mode = False self.previous_day_hit_stops = [] self.entry_prices = {} def create_sliding_sequences(self, df, feature_cols, lookback, stride=1): X = [] for i in range(0, len(df) - lookback + 1, stride): X.append(df.iloc[i:i+lookback][feature_cols].values.astype(np.float32)) return np.array(X) def clip_outliers(self, df, cols, lower=0.01, upper=0.99): df_copy = df.copy() for col in cols: if col in df_copy.columns: q_low = df_copy[col].quantile(lower) q_high = df_copy[col].quantile(upper) df_copy.loc[df_copy[col] < q_low, col] = q_low df_copy.loc[df_copy[col] > q_high, col] = q_high return df_copy def filter_features_to_match_model(self, df, feature_cols, required_count=5): """Ensure we have exactly the required number of features""" if len(feature_cols) == required_count: return feature_cols # First, prioritize the lag returns (most important) lag_features = [col for col in feature_cols if 'return_lag' in col] # Next, add in the most predictive technical features in a fixed order tech_priority = ['roc_5', 'volatility_10', 'ma_cross', 'dist_ma20', 'momentum_1m', 'oversold', 'overbought', 'roc_diff', 'volatility_regime'] prioritized_features = lag_features.copy() for feat in tech_priority: if feat in feature_cols and len(prioritized_features) < required_count: prioritized_features.append(feat) # If still not enough, add remaining features remaining = [col for col in feature_cols if col not in prioritized_features] while len(prioritized_features) < required_count and remaining: prioritized_features.append(remaining.pop(0)) # If too many, truncate return prioritized_features[:required_count] def add_technical_features(self, df): if 'Close' not in df.columns: return df df['ma5'] = df['Close'].rolling(5).mean() / df['Close'] - 1 # Relative to price df['ma20'] = df['Close'].rolling(20).mean() / df['Close'] - 1 df['ma_cross'] = df['ma5'] - df['ma20'] # Moving average crossover signal df['volatility_10'] = df['Close'].pct_change().rolling(10).std() df['volatility_ratio'] = df['Close'].pct_change().rolling(5).std() / df['Close'].pct_change().rolling(20).std() df['roc_5'] = df['Close'].pct_change(5) df['roc_10'] = df['Close'].pct_change(10) df['roc_diff'] = df['roc_5'] - df['roc_10'] df['dist_ma20'] = (df['Close'] / df['Close'].rolling(20).mean() - 1) return df.fillna(0) def add_enhanced_features(self, df): """Add enhanced technical features""" df['volatility_trend'] = df['volatility_10'].pct_change(5) df['volatility_regime'] = (df['volatility_10'] > df['volatility_10'].rolling(20).mean()).astype(int) if 'volume' in df.columns: df['vol_ma_ratio'] = df['volume'] / df['volume'].rolling(20).mean() df['vol_price_trend'] = df['vol_ma_ratio'] * df['roc_5'] df['momentum_1m'] = df['Close'].pct_change(20) df['momentum_3m'] = df['Close'].pct_change(60) df['momentum_breadth'] = ( (df['roc_5'] > 0).astype(int) + (df['momentum_1m'] > 0).astype(int) + (df['momentum_3m'] > 0).astype(int) ) / 3 df['mean_rev_signal'] = -1 * df['dist_ma20'] * df['volatility_10'] df['oversold'] = (df['dist_ma20'] < -2 * df['volatility_10']).astype(int) df['overbought'] = (df['dist_ma20'] > 2 * df['volatility_10']).astype(int) df['regime_change'] = (np.sign(df['ma_cross']) != np.sign(df['ma_cross'].shift(1))).astype(int) df['risk_adj_momentum'] = df['roc_5'] / (df['volatility_10'] + 0.001) return df def prepare_stock_data(self, stock_data, ticker, is_training=False): """Prepare data for a single stock""" if len(stock_data) < self.lookback + 5: # Need enough data return None, None stock_df = pd.DataFrame({ 'Close': stock_data['close'].values, 'time': stock_data['time'].values }) if 'volume' in stock_data.columns: stock_df['volume'] = stock_data['volume'].values stock_df = stock_df.sort_values('time').reset_index(drop=True) stock_df['pct_return'] = stock_df['Close'].pct_change().shift(-1) * 100 # In prepare_stock_data, replace the feature cols section with: feature_cols = [] # Add basic lag features for i in range(1, 6): col_name = f'return_lag{i}' stock_df[col_name] = stock_df['pct_return'].shift(i) feature_cols.append(col_name) # Add technical features stock_df = self.add_technical_features(stock_df) stock_df = self.add_enhanced_features(stock_df) # Add all potential features additional_features = ['ma_cross', 'volatility_10', 'roc_5', 'roc_diff', 'dist_ma20'] enhanced_features = ['volatility_trend', 'volatility_regime', 'momentum_1m', 'momentum_breadth', 'mean_rev_signal', 'oversold', 'overbought', 'regime_change', 'risk_adj_momentum'] for col in additional_features + enhanced_features: if col in stock_df.columns: feature_cols.append(col) # Filter to the exact number of features expected by the model model_feature_count = 5 # Use the exact count from your model feature_cols = self.filter_features_to_match_model(stock_df, feature_cols, model_feature_count) if not self.feature_cols: self.feature_cols = feature_cols.copy() stock_df = stock_df.dropna().reset_index(drop=True) # Handle outliers stock_df = self.clip_outliers(stock_df, feature_cols) # Replace the scaling code in prepare_stock_data with this: # Scale features if ticker not in self.scalers or is_training: # Check if we have data if len(stock_df) == 0 or len(feature_cols) == 0: return None, stock_df # Return early if no data # Check if any features are empty/nan if stock_df[feature_cols].isna().any().any() or stock_df[feature_cols].empty: # Fill NaNs with zeros stock_df[feature_cols] = stock_df[feature_cols].fillna(0) # Ensure we have data if len(stock_df[feature_cols]) > 0: try: scaler = RobustScaler() stock_df[feature_cols] = scaler.fit_transform(stock_df[feature_cols]) self.scalers[ticker] = scaler except Exception as e: print(f"Scaling error for {ticker}: {str(e)}") # Use a simple standardization as fallback for col in feature_cols: mean = stock_df[col].mean() std = stock_df[col].std() if std > 0: stock_df[col] = (stock_df[col] - mean) / std else: stock_df[col] = 0 else: return None, stock_df # Return early if empty after processing else: # Use existing scaler scaler = self.scalers[ticker] try: stock_df[feature_cols] = scaler.transform(stock_df[feature_cols]) except Exception as e: print(f"Transform error for {ticker}: {str(e)}") # Simple standardization fallback for col in feature_cols: if col in stock_df.columns and len(stock_df[col]) > 0: mean = stock_df[col].mean() std = stock_df[col].std() if std > 0: stock_df[col] = (stock_df[col] - mean) / std else: stock_df[col] = 0 # Create sequences for prediction X = self.create_sliding_sequences(stock_df, feature_cols, self.lookback, stride=1) if len(X) == 0: return None, stock_df return X, stock_df # Add to strategy.py in KQTStrategy class def calculate_portfolio_risk_score(self, market_returns): """Calculate a portfolio risk score (0-100) to scale overall exposure""" risk_score = 50 # Neutral starting point # VIX-like volatility measurement using SPY returns if len(market_returns) >= 5: recent_vol = np.std(market_returns[-5:]) * np.sqrt(252) # Annualized longer_vol = np.std(market_returns[-10:]) * np.sqrt(252) if len(market_returns) >= 10 else recent_vol # Volatility spike detection vol_ratio = recent_vol / longer_vol if longer_vol > 0 else 1 if vol_ratio > 1.5: # Sharp volatility increase risk_score -= 30 elif vol_ratio > 1.2: risk_score -= 15 # Consecutive negative days if len(market_returns) >= 3: neg_days = sum(1 for r in market_returns[-3:] if r < 0) if neg_days == 3: # Three consecutive down days risk_score -= 20 elif neg_days == 2: risk_score -= 10 # Trend direction if len(market_returns) >= 10: avg_recent = np.mean(market_returns[-5:]) avg_older = np.mean(market_returns[-10:-5]) trend_change = avg_recent - avg_older # Declining trend if trend_change < -0.3: risk_score -= 15 # Accelerating uptrend elif trend_change > 0.3 and avg_recent > 0: risk_score += 10 return max(10, min(100, risk_score)) # Constrain between 10-100 def predict_returns(self, X, ticker): """Make predictions for a single stock""" if self.model is None: return 0 if ticker not in self.stock_to_id: self.stock_to_id[ticker] = len(self.stock_to_id) stock_id = self.stock_to_id[ticker] try: X_tensor = torch.tensor(X, dtype=torch.float32) stock_ids = torch.tensor([stock_id] * len(X), dtype=torch.long) with torch.no_grad(): predictions = self.model(X_tensor, stock_ids) # Convert to standard Python float for safety return float(predictions.detach().numpy().flatten()[-1]) except Exception as e: print(f"Prediction error for {ticker}: {e}") return 0 # Return neutral prediction on error def adjust_for_market_conditions(self, positions, spy_sma_signal, defensive_mode): """Adjust positions based on market conditions from SPY moving average""" adjusted_positions = positions.copy() # If we're below SPY SMA, reduce position sizing if spy_sma_signal < 0 or defensive_mode: # Calculate how far below SMA we are (0 to 1 scale) reduction_factor = min(0.5, max(0.2, abs(spy_sma_signal))) if spy_sma_signal < 0 else 0.25 # Reduce all position sizes for ticker in adjusted_positions: adjusted_positions[ticker] *= (1 - reduction_factor) return adjusted_positions def update_stopped_out_tickers(self, stopped_out_today): """Update the list of recently stopped out tickers""" # Add new stopped out tickers to the list self.previous_day_hit_stops.extend(stopped_out_today) # Keep only the most recent 10 stopped out tickers if len(self.previous_day_hit_stops) > 10: self.previous_day_hit_stops = self.previous_day_hit_stops[-10:] def get_stop_loss_level(self): """Get appropriate stop-loss level based on market regime""" # Use fixed 5% stop-loss from Drawdownminimize.py return -5.0 def check_stop_losses(self, symbol_data): """Check if any positions should be stopped out based on 5% drop from entry""" # This would be called from the algorithm with current price data # Return list of tickers that hit stop-loss stopped_out = [] for symbol, price_data in symbol_data.items(): if symbol in self.entry_prices: entry_price = self.entry_prices[symbol] current_price = price_data['current_price'] # 5% drop from entry if (entry_price - current_price) / entry_price >= 0.05: stopped_out.append(symbol) return stopped_out def detect_market_regime(self, daily_returns, lookback=10, spy_ma_signal=0): """Detect current market regime based on portfolio returns and SPY MA""" # Make regime detection more sensitive to negative SPY MA signals # CRITICAL CHANGE: Immediately shift to bearish on SPY below MA if spy_ma_signal < 0: # How far below MA determines the severity if spy_ma_signal < -0.05: # More than 5% below MA return "bearish_high_vol" elif spy_ma_signal < -0.02: # More than 2% below MA return "bearish" else: # Slight MA crossover shifts neutral regimes bearish and bullish to neutral if self.current_regime == "bullish": return "neutral" elif self.current_regime == "neutral": return "bearish_low_vol" if len(daily_returns) >= 1: market_return = np.mean(daily_returns) market_vol = np.std(daily_returns) # Incorporate SPY moving average signal (negative when below MA) if spy_ma_signal < -0.02: # SPY at least 2% below 30-day MA if self.current_regime == "neutral": return "bearish" elif self.current_regime in ["bullish", "bullish_pullback"]: return "neutral" elif self.current_regime == "bearish": return "bearish_high_vol" # Increase bearish conviction if len(self.portfolio_returns) >= 3: recent_returns = self.portfolio_returns[-min(lookback, len(self.portfolio_returns)):] avg_recent_return = np.mean(recent_returns) if len(self.portfolio_returns) >= 5: very_recent = np.mean(self.portfolio_returns[-3:]) less_recent = np.mean(self.portfolio_returns[-min(8, len(self.portfolio_returns)):-3]) trend_change = very_recent - less_recent if trend_change > 0.5 and avg_recent_return > 0.2: return "breakout_bullish" elif trend_change < -0.5 and avg_recent_return < -0.2: return "breakdown_bearish" if avg_recent_return > 0.15: if market_return > 0: return "bullish_strong" else: return "bullish_pullback" elif avg_recent_return < -0.3: if market_return < -0.2: return "bearish_high_vol" else: return "bearish_low_vol" elif avg_recent_return > 0 and market_return > 0: return "bullish" elif avg_recent_return < 0 and market_return < 0: return "bearish" if market_return > -0.05: return "neutral" else: return "bearish" return "neutral" def detect_bearish_signals(self, recent_returns): """Detect early warning signs of bearish conditions""" bearish_signals = 0 signal_strength = 0 if len(self.portfolio_returns) >= 5: recent_portfolio_returns = self.portfolio_returns[-5:] pos_days = sum(1 for r in recent_portfolio_returns if r > 0) neg_days = sum(1 for r in recent_portfolio_returns if r < 0) if neg_days > pos_days: bearish_signals += 1 signal_strength += 0.2 * (neg_days - pos_days) if len(self.portfolio_returns) >= 10: recent_vol = np.std(self.portfolio_returns[-5:]) older_vol = np.std(self.portfolio_returns[-10:-5]) if recent_vol > older_vol * 1.3: # 30% volatility increase bearish_signals += 1 signal_strength += 0.3 * (recent_vol/older_vol - 1) if len(self.portfolio_returns) >= 5: if self.portfolio_returns[-1] < 0 and self.portfolio_returns[-2] > 0.3: bearish_signals += 1 signal_strength += 0.3 return bearish_signals, signal_strength def generate_positions(self, prediction_data, current_returns=None, spy_sma_signal=0): """Generate position sizing based on predictions with improved defensive capabilities""" if not prediction_data: return {} # Update market regime - now including SPY MA signal in the detection if current_returns is not None: self.current_regime = self.detect_market_regime(current_returns, 10, spy_sma_signal) bearish_count, bearish_strength = self.detect_bearish_signals(current_returns) # Set defensive mode if bearish signals or below SPY MA self.defensive_mode = (bearish_count >= 1 or # Reduced from 2 to 1 bearish_strength > 0.3 or # Reduced from 0.5 to 0.3 spy_sma_signal < 0) # ANY negative signal from MA # Calculate portfolio risk score (0-100) portfolio_risk_score = self.calculate_portfolio_risk_score(current_returns if current_returns else []) # Convert to a scaling factor (0.1 to 1.0) risk_scaling = portfolio_risk_score / 100 # Adjust threshold based on regime base_threshold = 0.25 * self.pred_std if self.current_regime in ["bullish_strong", "breakout_bullish"]: self.adaptive_threshold = base_threshold * 0.4 elif self.current_regime in ["bearish_high_vol", "breakdown_bearish"]: self.adaptive_threshold = base_threshold * 2.5 elif self.current_regime in ["bearish", "bearish_low_vol"]: self.adaptive_threshold = base_threshold * 1.6 elif self.current_regime in ["bullish_pullback"]: self.adaptive_threshold = base_threshold * 0.9 else: # neutral or other regimes self.adaptive_threshold = base_threshold * 0.75 positions = {} # Group stocks by sector sector_data = {} for ticker, data in prediction_data.items(): pred_return = data["pred_return"] sector = self.sector_mappings.get(ticker, "Unknown") if sector not in sector_data: sector_data[sector] = [] sector_data[sector].append({ "ticker": ticker, "pred_return": pred_return, "composite_score": pred_return / self.adaptive_threshold }) # Rank sectors by predicted return sector_avg_scores = {} for sector, stocks in sector_data.items(): sector_avg_scores[sector] = np.mean([s["pred_return"] for s in stocks]) # CHANGE: Include more sectors (3-4 instead of just 2) ranked_sectors = sorted(sector_avg_scores.keys(), key=lambda x: sector_avg_scores[x], reverse=True) top_sector_count = 3 if portfolio_risk_score > 60 else 2 # More diversification in lower risk periods top_sectors = ranked_sectors[:min(top_sector_count, len(ranked_sectors))] # CHANGE: Allow more stocks per sector in bull markets stocks_per_sector = 3 if self.current_regime in ["bullish_strong", "breakout_bullish"] else 2 # Allocate within top sectors - focus on stocks with strongest signals for sector in top_sectors: sector_stocks = sorted(sector_data[sector], key=lambda x: x["pred_return"], reverse=True) # Take top N stocks in each selected sector top_stocks = sector_stocks[:min(stocks_per_sector, len(sector_stocks))] # CHANGE: Make position size proportional to signal strength but limited by volatility for stock in top_stocks: ticker = stock["ticker"] signal_strength = stock["pred_return"] / (0.2 * self.pred_std) # Base size calculation base_size = min(0.3, max(0.05, 0.15 * signal_strength)) # Scale by portfolio risk final_size = base_size * risk_scaling positions[ticker] = final_size # Apply unified defensive logic at the end (remove redundant sections) if self.defensive_mode or self.current_regime in ["bearish_high_vol", "bearish_low_vol", "breakdown_bearish"] or spy_sma_signal < 0: # STRONGER SCALING: Adjust scaling based on severity if spy_sma_signal < -0.05 or self.current_regime == "bearish_high_vol": scaling_factor = 0.3 # Reduced from 0.4 to 0.3 elif spy_sma_signal < -0.02 or self.current_regime == "bearish_low_vol": scaling_factor = 0.5 # Reduced from 0.6 to 0.5 else: scaling_factor = 0.7 for ticker in positions: positions[ticker] *= scaling_factor # Remove tickers that recently hit stop losses for ticker in list(positions.keys()): if ticker in self.previous_day_hit_stops: positions.pop(ticker, None) return positions def update_portfolio_returns(self, daily_return): """Update portfolio return history""" self.portfolio_returns.append(daily_return) if len(self.portfolio_returns) > 60: # Keep a rolling window self.portfolio_returns = self.portfolio_returns[-60:] def update_model_calibration(self, all_predictions): """Update prediction standard deviation for threshold calibration""" all_pred_values = [p for p in all_predictions.values()] if len(all_pred_values) > 5: self.pred_std = np.std(all_pred_values) def calculate_bil_allocation(self, spy_price, sma_30): """Calculate BIL allocation based on SPY price and SMA.""" bill_drop_intensity = (sma_30 - spy_price) / sma_30 if spy_price < sma_30: bil_weight = min(bill_drop_intensity * 1.5, 1.0) # Stronger response if bill_drop_intensity > 0.05: # SPY >5% below MA bil_weight = min(0.7, bil_weight * 1.3) # Even stronger response else: bil_weight = 0.0 return bil_weight