Overall Statistics |
Total Orders 557 Average Win 0.95% Average Loss -0.17% Compounding Annual Return 22.952% Drawdown 13.100% Expectancy 2.386 Start Equity 100000 End Equity 330163.65 Net Profit 230.164% Sharpe Ratio 1.212 Sortino Ratio 1.267 Probabilistic Sharpe Ratio 83.675% Loss Rate 49% Win Rate 51% Profit-Loss Ratio 5.68 Alpha 0.089 Beta 0.422 Annual Standard Deviation 0.111 Annual Variance 0.012 Information Ratio 0.216 Tracking Error 0.129 Treynor Ratio 0.319 Total Fees $625.65 Estimated Strategy Capacity $0 Lowest Capacity Asset COST R735QTJ8XC9X Portfolio Turnover 1.03% |
import numpy as np from AlgorithmImports import * class AssetWeightCalculator: def __init__(self, algorithm: QCAlgorithm): self.algorithm = algorithm self.risk_free = self.algorithm.add_equity("BIL", Resolution.HOUR) def coarse_selection(self, coarse): """ Selects stonks, first filter """ # Sorts by dollar volume before taking top 200 sorted_by_volume = sorted([x for x in coarse if x.price > 10 and x.has_fundamental_data], key=lambda x: x.dollar_volume, reverse=True) return [x.symbol for x in sorted_by_volume][:200] def fine_selection(self, fine): """ Selects stonks, second filter """ filtered = [x.symbol for x in fine if x.market_cap is not None and x.market_cap > 10e9] self.algorithm.debug(f"Fine Selection: {len(filtered)} symbols passed filters") # Doing it this way makes it so that stocks are ranked on each universe update and then the macds can be redone with the scheduler in main ranked_symbols = self.rank_stocks(filtered) return ranked_symbols def calculate_sharpe_ratio(self, symbol, period=4914): # This is 3 yrs worth of trading days """ Calculates the sharpe """ try: # If a KeyValuePair was recieved only take the symbol if hasattr(symbol, "Key"): symbol = symbol.Key history = self.algorithm.history([symbol], period, Resolution.HOUR) if history.empty: self.algorithm.debug(f"No history for {symbol.value}") return None # Get risk-free rate rf_history = self.algorithm.history(self.risk_free.symbol, 1, Resolution.HOUR) risk_free_rate = rf_history['close'].iloc[-1]/100 if not rf_history.empty else 0.02 # Default to 2% if no data # Sharpe ratio logic returns = history['close'].pct_change().dropna() excess_returns = returns - (risk_free_rate/1638) mean_excess_return = excess_returns.mean() * 1638 std_dev = excess_returns.std() * np.sqrt(1638) return mean_excess_return / std_dev if std_dev != 0 else None except Exception as e: self.algorithm.debug(f"Error calculating Sharpe for {symbol.value}: {str(e)}") return None def rank_stocks(self, symbols): """ Ranks da top 50 stocks based on sharpe """ if not symbols: self.algorithm.debug("No symbols to rank") return [] self.algorithm.debug(f"Ranking {len(symbols)} symbols") # Converting from key pair if neccessary symbols = [s.Key if hasattr(s, 'Key') else s for s in symbols] scores = {symbol: self.calculate_sharpe_ratio(symbol) for symbol in symbols} valid_scores = {k: v for k, v in scores.items() if v is not None} self.algorithm.debug(f"Valid Sharpe ratios: {len(valid_scores)} out of {len(symbols)}") if not valid_scores: return [] sorted_scores = sorted(valid_scores, key=valid_scores.get, reverse=True)[:20] self.algorithm.log(f"All symbols before ranking: {[s.value for s in symbols]}") self.algorithm.log(f"Symbols after filtering: {[s.value for s in valid_scores.keys()]}") return sorted_scores def normalize_scores(self, scores): """ The list of scores from the ranking method are normalized using a z score so that an additive operation may be used in WeightCombiner() """ values = np.array(list(scores.values())) mean = np.mean(values) std_dev = np.std(values) if std_dev == 0: # If no variation in scores, assign equal normalized scores return {symbol: 0 for symbol in scores.keys()} normalized_scores = {symbol: (score - mean) / std_dev for symbol, score in scores.items()} print(normalized_scores) #To see output for debugging return normalized_scores
from AlgorithmImports import * class MACDSignalGenerator: def __init__(self, algorithm: QCAlgorithm, symbols: list, cash_buffer: float = 0.05): self.algorithm = algorithm self.symbols = symbols self.cash_buffer = cash_buffer self.macd_indicators = {} # {symbol: {variant: MACD}} # Define MACD parameters for different variants self.macd_variants = { "slow": {"fast": 12, "slow": 26, "signal": 9}, "slow-med": {"fast": 9, "slow": 19, "signal": 5}, "med-fast": {"fast": 7, "slow": 15, "signal": 3}, "fast": {"fast": 5, "slow": 12, "signal": 2}, } def remove_symbols(self, symbols: list): """ Removes MACD indicators for the specified symbols. """ for symbol in symbols: # Liquidate position before removing indicator self.algorithm.liquidate(symbol) # Unregister and delete indicators tied to each symbol if symbol in self.macd_indicators: for macd in self.macd_indicators[symbol].values(): # Better: gets MACD objects directly self.algorithm.unregister_indicator(macd) del self.macd_indicators[symbol] def add_symbols(self, new_symbols): """ Add in the new symbols that are given by AssetWeightCalculator. """ # Log initial attempt self.algorithm.debug(f"Attempting to add symbols: {[s.value for s in new_symbols]}") # Get historical data for new symbols history = self.algorithm.history([s for s in new_symbols], 35, # Longest MACD period needed Resolution.HOUR) # Log history data availability self.algorithm.debug(f"History data available for: {history.index.get_level_values(0).unique()}") self.symbols.extend(new_symbols) for symbol in new_symbols: security = self.algorithm.securities[symbol] # Detailed security check logging # self.algorithm.debug(f"Security {symbol.value} check:" # f" has_data={security.has_data}," # f" is_tradable={security.is_tradable}," # f" price={security.price}") # Checking if price is 0 if not (security.has_data and security.is_tradable and security.price > 0): self.algorithm.debug(f"Waiting for valid price data: {symbol.value}") continue # Adding the symbol if symbol not in self.macd_indicators: self.macd_indicators[symbol] = {} # Get symbol's historical data if symbol not in history.index.get_level_values(0): self.algorithm.debug(f"No history data for: {symbol.value}") continue symbol_history = history.loc[symbol] self.algorithm.debug(f"History rows for {symbol.value}: {len(symbol_history)}") for variant, params in self.macd_variants.items(): macd = self.algorithm.macd( symbol=symbol, fast_period=params["fast"], slow_period=params["slow"], signal_period=params["signal"], type=MovingAverageType.EXPONENTIAL, resolution=Resolution.HOUR, selector=Field.CLOSE ) self.macd_indicators[symbol][variant] = macd # Warm up MACD with historical data for time, row in symbol_history.iterrows(): macd.update(time, row['close']) self.macd_indicators[symbol][variant] = macd def calculate_position_sizes(self): position_sizes = {} max_position_limit = 0.1 # Check if we have any symbols to process if not self.symbols or not self.macd_indicators: self.algorithm.debug("No symbols available for position calculation") return position_sizes # Calculating the maximum one variant can be in size max_position = (1 - self.cash_buffer) / (len(self.symbols) * len(self.macd_variants)) for symbol in self.macd_indicators: position_sizes[symbol] = {} for variant, macd in self.macd_indicators[symbol].items(): if macd.is_ready: security = self.algorithm.securities[symbol] # Detailed security check logging # self.algorithm.debug(f"Position Check for {symbol.value}:" # f" has_data={security.has_data}," # f" is_tradable={security.is_tradable}," # f" price={security.price}," # f" last_data={security.get_last_data() is not None},") # More comprehensive check # if not (security.has_data and # security.is_tradable and # security.price > 0 and # security.get_last_data() is not None): # self.algorithm.debug(f"Security not ready: {symbol.value}") # continue # Distance between fast and slow distance = macd.fast.current.value - macd.slow.current.value # Normalize the distance as a percentage difference and then as a fraction of max position position_size = max_position * (distance / macd.slow.current.value) * 70 # Scalar value of max_position, the scalar integer can be though of as a form of leverage setting # Only allow positive positions, cap at maximum position_size = max(0, min(position_size, max_position_limit)) position_sizes[symbol][variant] = position_size #self.algorithm.debug(f"Calculated position for {symbol.value} {variant}: {position_size}") else: position_sizes[symbol][variant] = 0 # Running daily cause the logging is too heavy hourly if self.algorithm.time.hour == 10 and self.algorithm.time.minute == 0: rounded_positions = [(s.value, {k: round(v, 5) for k, v in sizes.items()}) for s, sizes in position_sizes.items()] #self.algorithm.debug(f"Daily position sizes proposed: {rounded_positions}") return position_sizes
from AlgorithmImports import * import numpy as np from datetime import timedelta class MarketCapWeightedSP500Tracker(QCAlgorithm): def Initialize(self): self.SetStartDate(2019, 1, 1) self.SetEndDate(2025, 1, 1) self.SetCash(100000) self.UniverseSettings.Resolution = Resolution.Daily self.spy = self.AddEquity("SPY", Resolution.Daily).Symbol self.bil = self.AddEquity("BIL", Resolution.Daily).Symbol self.AddUniverse(self.CoarseSelectionFunction, self.FineSelectionFunction) self.selected_by_market_cap = [] self.rebalance_flag = False self.spy_30day_window = RollingWindow[float](30) self.entry_prices = {} self.previous_bil_allocation = 0.0 self.Schedule.On(self.DateRules.MonthStart(self.spy), self.TimeRules.AfterMarketOpen(self.spy, 30), self.SetRebalanceFlag) self.Schedule.On(self.DateRules.WeekStart(self.spy, DayOfWeek.Wednesday), self.TimeRules.AfterMarketOpen(self.spy, 30), self.MonthlyRebalance) # Initialize rolling window with historical data history = self.History(self.spy, 30, Resolution.Daily) if not history.empty: for time, row in history.loc[self.spy].iterrows(): self.spy_30day_window.Add(row["close"]) # Add simple tracking of market trend self.trend_lookback = 10 self.spy_prices = {} self.max_spy_history = 60 # Days of price history to keep # Add dynamic stop-loss enhancement self.stop_loss_base = 0.04 # Reduced base stop-loss threshold self.dynamic_stop_weight = 0.5 # Blend 50% ATR signal with base threshold # Expanded list of inverse and defensive ETFs # Original inverse ETFs self.sh = self.AddEquity("SH", Resolution.Daily).Symbol # Inverse S&P 500 self.psq = self.AddEquity("PSQ", Resolution.Daily).Symbol # Inverse Nasdaq-100 self.dog = self.AddEquity("DOG", Resolution.Daily).Symbol # Inverse Dow Jones self.rwm = self.AddEquity("RWM", Resolution.Daily).Symbol # Inverse Russell 2000 self.eum = self.AddEquity("EUM", Resolution.Daily).Symbol # Inverse Emerging Markets self.myd = self.AddEquity("MYY", Resolution.Daily).Symbol # Inverse Mid-Cap 400 # Alternative defensive ETFs (not inverse but potentially good in downturns) self.gld = self.AddEquity("GLD", Resolution.Daily).Symbol # Gold self.ief = self.AddEquity("IEF", Resolution.Daily).Symbol # 7-10 Year Treasury self.bnd = self.AddEquity("BND", Resolution.Daily).Symbol # Total Bond Market # Sector-based defensive ETFs (often outperform in bear markets) self.xlp = self.AddEquity("XLP", Resolution.Daily).Symbol # Consumer Staples self.xlu = self.AddEquity("XLU", Resolution.Daily).Symbol # Utilities self.xlv = self.AddEquity("XLV", Resolution.Daily).Symbol # Healthcare self.vht = self.AddEquity("VHT", Resolution.Daily).Symbol # Vanguard Healthcare self.vdc = self.AddEquity("VDC", Resolution.Daily).Symbol # Vanguard Consumer Staples # Group all defensive ETFs together self.inverse_etfs = [self.sh, self.psq, self.dog, self.rwm, self.eum, self.myd] self.alternative_defensive = [self.gld, self.ief, self.bnd] self.sector_defensive = [self.xlp, self.xlu, self.xlv, self.vht, self.vdc] self.all_defensive = self.inverse_etfs + self.alternative_defensive + self.sector_defensive # Add diagnostic logging capability self.diagnostic_mode = True # Enable detailed diagnostics # Initialize positions tracking and add weekly tactical adjustment self.defensive_positions = set() self.last_defensive_update = datetime(1900, 1, 1) # Add weekly defensive ETF evaluation schedule self.Schedule.On(self.DateRules.WeekStart(self.spy, DayOfWeek.Monday), self.TimeRules.AfterMarketOpen(self.spy, 60), # After main rebalance self.WeeklyDefensiveAdjustment) # Initialize positions tracking self.inverse_positions = set() # Add inverse ETF lookback windows for better momentum calculation self.inverse_lookback_short = 7 # 1 week momentum window self.inverse_lookback_med = 15 # Medium-term momentum # Add ATR indicators for enhanced volatility-based stop-loss calculation self.atr_period = 14 self.atr = {} # Register ATR for key symbols (defensive ETFs, BIL, and SPY) for symbol in self.all_defensive + [self.bil, self.spy]: self.atr[symbol] = self.ATR(symbol, self.atr_period, Resolution.Daily) def CoarseSelectionFunction(self, coarse): filtered = [x for x in coarse if x.HasFundamentalData and x.Price > 5 and x.Market == Market.USA] return [x.Symbol for x in filtered] def FineSelectionFunction(self, fine): filtered = [x for x in fine if x.MarketCap > 1e10 and x.SecurityReference.SecurityType == "ST00000001"] sorted_by_cap = sorted(filtered, key=lambda x: x.MarketCap, reverse=True)[:30] self.selected_by_market_cap = [(x.Symbol, x.MarketCap) for x in sorted_by_cap] return [x.Symbol for x in sorted_by_cap] def SetRebalanceFlag(self): if self.Time.weekday() == 2: # Wednesday self.rebalance_flag = True def OnData(self, data): # Update price window if not data.Bars.ContainsKey(self.spy): return self.spy_30day_window.Add(data.Bars[self.spy].Close) # Track prices for trend calculation self.spy_prices[self.Time.date()] = data.Bars[self.spy].Close # Remove old prices dates_to_remove = [] for date in self.spy_prices.keys(): if (self.Time.date() - date).days > self.max_spy_history: dates_to_remove.append(date) for date in dates_to_remove: self.spy_prices.pop(date) market_trend = self._calculateMarketTrend() # Track if any stop-loss was triggered stop_loss_triggered = False # Check stop-loss triggers with improved dynamic thresholds for kvp in self.Portfolio: symbol = kvp.Key holding = kvp.Value if holding.Invested and symbol != self.bil: current_price = self.Securities[symbol].Price if symbol not in self.entry_prices: self.entry_prices[symbol] = current_price price_drop = (self.entry_prices[symbol] - current_price) / self.entry_prices[symbol] # Start with the base threshold and adjust based on market trend stop_threshold = self.stop_loss_base if market_trend < -0.03: stop_threshold *= 0.9 # tighten in downtrends elif market_trend > 0.03: stop_threshold *= 1.1 # loosen in uptrends # Incorporate ATR if ready with adjustment to prevent overreaction in high volatility if symbol in self.atr and self.atr[symbol].IsReady: current_atr = self.atr[symbol].Current.Value atr_pct = current_atr / current_price # If ATR is excessively high versus our base, use a lower weight to temper the effect effective_weight = self.dynamic_stop_weight if atr_pct > stop_threshold * 1.2: effective_weight = min(self.dynamic_stop_weight, 0.3) stop_threshold = ((1 - effective_weight) * stop_threshold + effective_weight * atr_pct) if price_drop >= stop_threshold: self.Liquidate(symbol) stop_loss_triggered = True self.Debug(f"Stop-loss triggered for {symbol} at {current_price}, drop: {price_drop*100:.1f}%, threshold: {stop_threshold*100:.1f}%") # If any stop-loss was triggered, invest all available cash in BIL if stop_loss_triggered: available_cash = self.Portfolio.Cash + self.Portfolio.UnsettledCash if available_cash > 0: bil_price = self.Securities[self.bil].Price bil_quantity = available_cash / bil_price self.MarketOrder(self.bil, bil_quantity) self.Debug(f"Invested ${available_cash:0.2f} in BIL after stop-loss") def WeeklyDefensiveAdjustment(self): """Weekly check and adjustment for defensive ETF positions""" # Skip if we've done the monthly rebalance recently days_since_rebalance = (self.Time.date() - self.last_rebalance_date.date()).days if hasattr(self, 'last_rebalance_date') else 999 if days_since_rebalance < 3: return # Skip if we've updated defensive positions recently days_since_update = (self.Time.date() - self.last_defensive_update.date()).days if days_since_update < 5: # At most once a week return # Calculate current market conditions spy_price = self.Securities[self.spy].Price sma_30 = sum(self.spy_30day_window) / self.spy_30day_window.Count if self.spy_30day_window.Count > 0 else spy_price market_deviation = (spy_price / sma_30) - 1.0 market_trend = self._calculateMarketTrend() # Skip in strong bull markets if market_deviation > 0.04 and market_trend > 0.03: return # Calculate total invested amount including all positions total_invested = sum(holding.HoldingsValue for holding in self.Portfolio.Values if holding.Invested) / self.Portfolio.TotalPortfolioValue # If we're already fully invested, can't add more defensive positions if total_invested >= 0.98: # Allow small buffer for rounding errors self.Debug(f"Already fully invested ({total_invested:.2f}), skipping defensive adjustments") return # Calculate available room for defensive positions available_allocation = max(0, 0.99 - total_invested) # Keep tiny buffer # Calculate how much is currently allocated to defensive positions current_defensive_value = sum(self.Portfolio[s].HoldingsValue for s in self.defensive_positions if self.Portfolio.ContainsKey(s) and self.Portfolio[s].Invested) # Calculate current BIL allocation current_bil_value = self.Portfolio[self.bil].HoldingsValue if self.Portfolio[self.bil].Invested else 0 bil_allocation = current_bil_value / self.Portfolio.TotalPortfolioValue # Limit potential allocation to available room max_defensive_pct = min(0.25, available_allocation / bil_allocation if bil_allocation > 0 else 0) potential_allocation = bil_allocation * max_defensive_pct # Make sure we don't exceed available room potential_allocation = min(potential_allocation, available_allocation) # Super detailed diagnostics for current defensive positions if self.diagnostic_mode and self.defensive_positions: self.Debug(f"WEEKLY CHECK - Current defensive positions:") for symbol in self.defensive_positions: if self.Portfolio.ContainsKey(symbol) and self.Portfolio[symbol].Invested: position = self.Portfolio[symbol] entry = self.entry_prices.get(symbol, position.AveragePrice) current = self.Securities[symbol].Price pnl_pct = (current / entry) - 1 if entry > 0 else 0 self.Debug(f" {symbol}: PnL {pnl_pct*100:.2f}%, Value ${position.HoldingsValue:.2f}") # Evaluate current defensive positions and potential new ones self.Debug(f"WEEKLY CHECK - Market: Dev {market_deviation*100:.2f}%, Trend {market_trend*100:.2f}%") self.Debug(f"BIL allocation: {bil_allocation*100:.2f}%, Potential defensive: {potential_allocation*100:.2f}%") # Run the defensive ETF evaluation new_allocations = self._evaluateDefensiveETFs(market_deviation, market_trend, potential_allocation) # Calculate which positions to add, modify, or remove positions_to_add = {} positions_to_remove = set() # Process existing positions for symbol in self.defensive_positions: # If position should be kept but maybe at different allocation if symbol in new_allocations and new_allocations[symbol] > 0: current_pct = self.Portfolio[symbol].HoldingsValue / self.Portfolio.TotalPortfolioValue if self.Portfolio.ContainsKey(symbol) else 0 target_pct = new_allocations[symbol] # If allocation difference is significant, adjust position if abs(target_pct - current_pct) > 0.01: positions_to_add[symbol] = target_pct # Remove from new allocations dict to avoid double-processing new_allocations.pop(symbol) else: # Position should be removed positions_to_remove.add(symbol) # Add any remaining new positions for symbol, allocation in new_allocations.items(): if allocation > 0.01: # Minimum meaningful allocation positions_to_add[symbol] = allocation # Check if we'll exceed our allocation limits with new positions total_new_allocation = sum(positions_to_add.values()) if total_new_allocation > available_allocation: # Scale back allocations to fit available space scale_factor = available_allocation / total_new_allocation for symbol in positions_to_add: positions_to_add[symbol] *= scale_factor self.Debug(f"Scaled defensive allocations to fit available space: {scale_factor:.4f}") # Execute trades if needed if positions_to_add or positions_to_remove: self.Debug(f"WEEKLY ADJUSTMENT - Making defensive position changes") # Remove positions no longer needed for symbol in positions_to_remove: self.Liquidate(symbol) self.defensive_positions.remove(symbol) self.Debug(f"Removed defensive position: {symbol}") # Add or adjust positions for symbol, allocation in positions_to_add.items(): self.SetHoldings(symbol, allocation) self.defensive_positions.add(symbol) self.entry_prices[symbol] = self.Securities[symbol].Price self.Debug(f"Updated defensive position: {symbol} to {allocation*100:.2f}%") self.last_defensive_update = self.Time def MonthlyRebalance(self): if not self.rebalance_flag: return self.rebalance_flag = False self.entry_prices.clear() # Reset entry prices at rebalance if self.spy_30day_window.Count < 30: self.Debug("Waiting for enough SPY history.") return spy_price = self.Securities[self.spy].Price sma_30 = sum(self.spy_30day_window) / 30 # Calculate market deviation for better decisions market_deviation = (spy_price / sma_30) - 1.0 market_trend = self._calculateMarketTrend() # Enhanced BIL allocation logic with lower caps bil_weight = 0.0 if spy_price < sma_30: # Enhanced formula for better downside protection base_weight = (sma_30 - spy_price) / sma_30 if base_weight > 0.08: # Significant drop # Lower cap on BIL for significant drops bil_weight = min(base_weight * 1.1, 0.7) # Cap at 70% (was 90%) else: bil_weight = min(base_weight, 0.6) # Cap at 60% (was 80%) # Enhanced reduction rule for better returns in bull markets if market_deviation > 0.05: # Strong bull market min_bil_allocation = self.previous_bil_allocation * 0.7 # 30% reduction elif market_deviation > 0.02: # Modest bull market min_bil_allocation = self.previous_bil_allocation * 0.75 # 25% reduction else: min_bil_allocation = self.previous_bil_allocation * 0.8 # Standard 20% reduction bil_weight = max(bil_weight, min_bil_allocation) # Lower caps on BIL in all market conditions if market_deviation > 0.08: # Very strong bull bil_weight = min(bil_weight, 0.15) # Cap at 15% (was 20%) elif market_deviation > 0.05: # Strong bull bil_weight = min(bil_weight, 0.25) # Cap at 25% (was 30%) elif market_deviation > 0.0: # Mild bull bil_weight = min(bil_weight, 0.4) # Cap at 40% (new tier) elif market_deviation > -0.03: # Neutral bil_weight = min(bil_weight, 0.5) # Cap at 50% (new tier) else: # Bear bil_weight = min(bil_weight, 0.6) # Cap at 60% (new tier) # Calculate how much of the original BIL allocation to potentially use for inverse ETFs original_bil = bil_weight # Use only a portion of BIL for inverse ETFs, keeping some as BIL inverse_etf_potential = original_bil * 0.4 # Use 40% of BIL allocation for inverse ETFs bil_weight = original_bil - inverse_etf_potential # Run diagnostics on defensive ETFs if self.diagnostic_mode: self._runDefensiveETFDiagnostics(market_deviation, market_trend) # Evaluate inverse ETFs for possible allocation inverse_allocations = self._evaluateInverseETFs(market_deviation, market_trend, inverse_etf_potential) # Include alternative defensive ETFs in evaluation all_defensive_allocations = self._evaluateDefensiveETFs(market_deviation, market_trend, inverse_etf_potential) # Calculate total allocation to defensive ETFs total_defensive_allocation = sum(all_defensive_allocations.values()) # Set aside remainder as cash (won't be allocated) cash_reserve = inverse_etf_potential - total_defensive_allocation # Calculate weight for equity portion equity_weight = 1.0 - total_defensive_allocation # Ensure total allocation never exceeds 100% total_allocation = bil_weight + total_defensive_allocation + equity_weight if total_allocation > 1.0: # Scale back components proportionally scale_factor = 1.0 / total_allocation bil_weight *= scale_factor equity_weight *= scale_factor # Scale each defensive allocation for symbol in all_defensive_allocations: all_defensive_allocations[symbol] *= scale_factor total_defensive_allocation = sum(all_defensive_allocations.values()) self.Debug(f"Scaled allocations to prevent leverage: {scale_factor:.4f}") self.Debug(f"Allocation breakdown: Equity {equity_weight*100:.1f}%, BIL {bil_weight*100:.1f}%, " + f"Defensive ETFs {total_defensive_allocation*100:.1f}%, Cash {cash_reserve*100:.1f}%") # Enhance stock selection with simple momentum filter momentum_scores = self._calculateSimpleMomentum() # Filter out worst momentum stocks filtered_stocks = [] for symbol, mcap in self.selected_by_market_cap: score = momentum_scores.get(symbol, 1.0) if score >= 0.9: # Keep only neutral or positive momentum stocks filtered_stocks.append((symbol, mcap)) # If we filtered too many, revert to original list if len(filtered_stocks) < 20: filtered_stocks = self.selected_by_market_cap # Calculate weights using the filtered stocks total_market_cap = sum([x[1] for x in filtered_stocks]) weights = {x[0]: (x[1] / total_market_cap) * equity_weight for x in filtered_stocks} invested = set() for symbol, weight in weights.items(): if weight > 0: self.SetHoldings(symbol, weight) invested.add(symbol) self.entry_prices[symbol] = self.Securities[symbol].Price # Set BIL position if bil_weight > 0: self.SetHoldings(self.bil, bil_weight) invested.add(self.bil) else: self.Liquidate(self.bil) # Set defensive ETF positions for symbol, weight in all_defensive_allocations.items(): if weight > 0: self.SetHoldings(symbol, weight) invested.add(symbol) self.defensive_positions.add(symbol) # Using renamed set self.entry_prices[symbol] = self.Securities[symbol].Price self.Debug(f"Allocated {weight*100:.2f}% to defensive ETF {symbol}") elif symbol in self.defensive_positions: self.Liquidate(symbol) self.defensive_positions.remove(symbol) # Update last rebalance date tracker self.last_rebalance_date = self.Time # Store current BIL allocation for next month's minimum self.previous_bil_allocation = self.Portfolio[self.bil].HoldingsValue / self.Portfolio.TotalPortfolioValue self.Debug(f"New BIL allocation: {bil_weight*100:0.2f}% (Minimum was {min_bil_allocation*100:0.2f}%)") # Liquidate positions not in current selection for kvp in self.Portfolio: symbol = kvp.Key if (kvp.Value.Invested and symbol not in invested and symbol != self.spy and symbol not in self.defensive_positions): self.Liquidate(symbol) def _calculateMarketTrend(self): """Calculate recent market trend using price history""" if len(self.spy_prices) < self.trend_lookback + 1: return 0 # Not enough data dates = sorted(self.spy_prices.keys()) if len(dates) <= self.trend_lookback: return 0 recent_price = self.spy_prices[dates[-1]] older_price = self.spy_prices[dates[-self.trend_lookback]] return (recent_price / older_price) - 1.0 def _calculateSimpleMomentum(self): """Calculate simple momentum scores for stock filtering""" momentum_scores = {} symbols = [sym for sym, _ in self.selected_by_market_cap] if not symbols: return momentum_scores # Get 30 days of history for all stocks history = self.History(symbols, 30, Resolution.Daily) if history.empty: return momentum_scores # Calculate simple momentum (30-day price change) for symbol in symbols: if symbol in history.index.get_level_values(0): prices = history.loc[symbol]['close'] if len(prices) >= 30: # 30-day momentum mom = prices.iloc[-1] / prices.iloc[0] - 1 # Convert to a score between 0.7 and 1.3 # Center around 1.0, with range based on 15% move momentum_scores[symbol] = min(1.3, max(0.7, 1 + (mom * 2))) return momentum_scores def _evaluateInverseETFs(self, market_deviation, market_trend, max_allocation): """Enhanced evaluation of inverse ETFs with more sensitive criteria""" allocations = {symbol: 0 for symbol in self.inverse_etfs} # More permissive consideration of inverse ETFs if market_deviation > 0.04 and market_trend > 0.02: return allocations # Only skip in very strong bull markets # Get more history for better momentum calculation history = self.History(self.inverse_etfs, 45, Resolution.Daily) if history.empty: return allocations # Enhanced momentum scoring momentum_scores = {} volatility_scores = {} for symbol in self.inverse_etfs: if symbol in history.index.get_level_values(0): prices = history.loc[symbol]['close'] if len(prices) >= 30: # Multiple timeframe momentum - more emphasis on recent performance mom_7d = prices.iloc[-1] / prices.iloc[-7] - 1 if len(prices) >= 7 else 0 mom_15d = prices.iloc[-1] / prices.iloc[-15] - 1 if len(prices) >= 15 else 0 mom_30d = prices.iloc[-1] / prices.iloc[0] - 1 # Weight recent momentum much more heavily momentum = (mom_7d * 0.5) + (mom_15d * 0.3) + (mom_30d * 0.2) # Calculate volatility (lower is better for inverse ETFs) returns = [prices.iloc[i+1]/prices.iloc[i]-1 for i in range(min(20, len(prices)-1))] volatility = np.std(returns) if returns else 0 # Calculate short-term rate of change (acceleration) if len(prices) >= 10: recent_5d_change = prices.iloc[-1] / prices.iloc[-5] - 1 prev_5d_change = prices.iloc[-6] / prices.iloc[-10] - 1 acceleration = recent_5d_change - prev_5d_change else: acceleration = 0 # Momentum score adds weight for accelerating performance momentum_scores[symbol] = momentum + (acceleration * 0.5) volatility_scores[symbol] = volatility # More aggressive filtering - consider even small positive momentum positive_momentum_etfs = {s: score for s, score in momentum_scores.items() if score > -0.005} # No allocation if no ETFs have at least neutral momentum if not positive_momentum_etfs: self.Debug("No inverse ETFs showing acceptable momentum - keeping as cash") return allocations # Enhanced selection: favor momentum but consider volatility too best_candidates = [] for symbol, score in positive_momentum_etfs.items(): volatility = volatility_scores.get(symbol, 1.0) # Adjust score: higher momentum is good, lower volatility is good adjusted_score = score - (volatility * 0.5) best_candidates.append((symbol, score, adjusted_score)) # Sort by adjusted score best_candidates.sort(key=lambda x: x[2], reverse=True) # More aggressive allocation model allocation_pct = 0.0 # Allocate based on market conditions with more sensitivity if market_deviation < -0.05: allocation_pct = 1.0 # Use 100% of available inverse allocation elif market_deviation < -0.03: allocation_pct = 0.8 # Use 80% of available inverse allocation elif market_deviation < -0.01: allocation_pct = 0.6 # Use 60% of available inverse allocation elif market_deviation < 0.01: # Even in slight bull market if momentum is positive allocation_pct = 0.4 # Use 40% of available inverse allocation else: allocation_pct = 0.2 # Use 20% only if momentum is strong enough # No candidates or market conditions don't justify allocation if not best_candidates or allocation_pct < 0.1: return allocations # Take top 1-2 ETFs depending on market conditions num_etfs = 1 if market_deviation < -0.04 and len(best_candidates) > 1: num_etfs = 2 # Use two ETFs in stronger downtrends # Allocate to best ETF(s) remaining_allocation = max_allocation * allocation_pct for i in range(min(num_etfs, len(best_candidates))): symbol, raw_score, _ = best_candidates[i] # Allocate proportionally to momentum strength, with a minimum threshold etf_weight = min(1.0, max(0.3, raw_score * 3)) if raw_score > 0 else 0.3 # Calculate allocation for this ETF etf_allocation = remaining_allocation * etf_weight / num_etfs # Only allocate if it's a meaningful amount if etf_allocation >= 0.01: # At least 1% allocation allocations[symbol] = etf_allocation self.Debug(f"Selected inverse ETF {symbol} with momentum {raw_score:.2%}, allocating {etf_allocation*100:.2f}%") return allocations def _runDefensiveETFDiagnostics(self, market_deviation, market_trend): """Run detailed diagnostics on all defensive ETFs""" # Get extensive history for analysis history = self.History(self.all_defensive + [self.spy], 90, Resolution.Daily) if history.empty: return spy_perf = {} if self.spy in history.index.get_level_values(0): spy_prices = history.loc[self.spy]['close'] if len(spy_prices) >= 30: spy_perf = { "7d": spy_prices.iloc[-1] / spy_prices.iloc[-7] - 1 if len(spy_prices) >= 7 else 0, "15d": spy_prices.iloc[-1] / spy_prices.iloc[-15] - 1 if len(spy_prices) >= 15 else 0, "30d": spy_prices.iloc[-1] / spy_prices.iloc[-30] - 1 } # Log market conditions self.Debug(f"DIAGNOSTIC - Market: Deviation {market_deviation*100:.2f}%, " + f"Trend {market_trend*100:.2f}%, SPY 30d: {spy_perf.get('30d', 0)*100:.2f}%") # Analyze each ETF for symbol in self.all_defensive: if symbol in history.index.get_level_values(0): prices = history.loc[symbol]['close'] if len(prices) >= 30: # Calculate multiple timeframe performance perf_7d = prices.iloc[-1] / prices.iloc[-7] - 1 if len(prices) >= 7 else 0 perf_15d = prices.iloc[-1] / prices.iloc[-15] - 1 if len(prices) >= 15 else 0 perf_30d = prices.iloc[-1] / prices.iloc[-30] - 1 # Calculate recent acceleration recent_5d = prices.iloc[-1] / prices.iloc[-5] - 1 if len(prices) >= 5 else 0 prev_5d = prices.iloc[-6] / prices.iloc[-10] - 1 if len(prices) >= 10 else 0 accel = recent_5d - prev_5d # Calculate relative performance vs SPY rel_perf = {} for period, spy_val in spy_perf.items(): if period == "7d": rel_perf[period] = perf_7d - spy_val elif period == "15d": rel_perf[period] = perf_15d - spy_val elif period == "30d": rel_perf[period] = perf_30d - spy_val # Log detailed ETF statistics self.Debug(f" {symbol}: 7d: {perf_7d*100:.2f}%, 15d: {perf_15d*100:.2f}%, " + f"30d: {perf_30d*100:.2f}%, Accel: {accel*100:.2f}%, " + f"Rel30d: {rel_perf.get('30d', 0)*100:.2f}%") def _evaluateDefensiveETFs(self, market_deviation, market_trend, max_allocation): """Enhanced defensive ETF evaluation with sector rotation""" allocations = {symbol: 0 for symbol in self.all_defensive} # Skip if market is very bullish if market_deviation > 0.04 and market_trend > 0.02: return allocations # Get history for all defensive options and SPY history = self.History(self.all_defensive + [self.spy], 60, Resolution.Daily) if history.empty: return allocations # Detailed diagnostics on all ETFs self.Debug(f"DEFENSIVE ETF PERFORMANCE DETAILS:") # Calculate SPY performance for relative comparisons spy_perf = {} if self.spy in history.index.get_level_values(0): spy_prices = history.loc[self.spy]['close'] if len(spy_prices) >= 30: spy_perf = { "5d": spy_prices.iloc[-1] / spy_prices.iloc[-5] - 1 if len(spy_prices) >= 5 else 0, "10d": spy_prices.iloc[-1] / spy_prices.iloc[-10] - 1 if len(spy_prices) >= 10 else 0, "20d": spy_prices.iloc[-1] / spy_prices.iloc[-20] - 1 if len(spy_prices) >= 20 else 0, "30d": spy_prices.iloc[-1] / spy_prices.iloc[-30] - 1 } self.Debug(f" SPY: 5d: {spy_perf['5d']*100:.1f}%, 10d: {spy_perf['10d']*100:.1f}%, " + f"20d: {spy_perf['20d']*100:.1f}%, 30d: {spy_perf['30d']*100:.1f}%") # Enhanced scoring system with different criteria for different ETF types etf_scores = {} # Process each ETF by type for group_name, group in [("Inverse", self.inverse_etfs), ("Alternative", self.alternative_defensive), ("Sector", self.sector_defensive)]: self.Debug(f" {group_name} ETFs:") for symbol in group: if symbol in history.index.get_level_values(0): prices = history.loc[symbol]['close'] if len(prices) >= 30: # Calculate absolute momentum components perf = {} perf["5d"] = prices.iloc[-1] / prices.iloc[-5] - 1 if len(prices) >= 5 else 0 perf["10d"] = prices.iloc[-1] / prices.iloc[-10] - 1 if len(prices) >= 10 else 0 perf["20d"] = prices.iloc[-1] / prices.iloc[-20] - 1 if len(prices) >= 20 else 0 perf["30d"] = prices.iloc[-1] / prices.iloc[-30] - 1 # Calculate relative outperformance vs SPY rel_perf = {} for period, spy_val in spy_perf.items(): rel_perf[period] = perf[period] - spy_val # Log detailed performance self.Debug(f" {symbol}: 5d: {perf['5d']*100:.1f}% (rel: {rel_perf['5d']*100:+.1f}%), " + f"10d: {perf['10d']*100:.1f}% (rel: {rel_perf['10d']*100:+.1f}%), " + f"30d: {perf['30d']*100:.1f}% (rel: {rel_perf['30d']*100:+.1f}%)") # Inverse ETFs need to show positive momentum in down markets if symbol in self.inverse_etfs: # In downtrends, rising inverse ETFs are good if market_deviation < -0.02: score = (perf["5d"] * 0.4) + (perf["10d"] * 0.4) + (perf["30d"] * 0.2) # Bonus for relative outperformance score += (rel_perf["5d"] + rel_perf["10d"]) * 0.15 else: # Less emphasis on long-term performance in neutral markets score = (perf["5d"] * 0.6) + (perf["10d"] * 0.3) + (perf["30d"] * 0.1) # Alternative defensive (bonds, gold) - focus on absolute return elif symbol in self.alternative_defensive: # Less dramatic movements, need lower thresholds score = (perf["5d"] * 0.3) + (perf["10d"] * 0.4) + (perf["30d"] * 0.3) # In downtrends, emphasize relative performance more if market_deviation < -0.03: score += rel_perf["10d"] * 0.2 # Bonus for outperformance # Sector ETFs - focus on relative outperformance else: # These should have positive absolute returns and outperform SPY abs_score = (perf["5d"] * 0.3) + (perf["10d"] * 0.3) + (perf["30d"] * 0.4) rel_score = (rel_perf["5d"] * 0.3) + (rel_perf["10d"] * 0.3) + (rel_perf["30d"] * 0.4) # Balance absolute and relative performance if market_deviation < -0.02: # In downtrends, relative outperformance is more important score = (abs_score * 0.4) + (rel_score * 0.6) else: # In neutral markets, absolute performance matters more score = (abs_score * 0.6) + (rel_score * 0.4) etf_scores[symbol] = score # Find candidates with appropriate momentum based on market conditions threshold = -0.007 # Default threshold if market_deviation < -0.03: threshold = -0.01 # More permissive in stronger downturns candidates = {s: score for s, score in etf_scores.items() if score > threshold} if not candidates: self.Debug("No defensive ETFs showed sufficient momentum - keeping as cash") return allocations # Sort and log candidate scores sorted_candidates = sorted(candidates.items(), key=lambda x: x[1], reverse=True) self.Debug(f"Top 5 defensive candidates:") for symbol, score in sorted_candidates[:5]: group = "Inverse" if symbol in self.inverse_etfs else "Alternative" if symbol in self.alternative_defensive else "Sector" self.Debug(f" {symbol} ({group}): Score {score*100:.2f}%") # Set allocation percent based on market conditions and trend allocation_pct = 0.0 if market_deviation < -0.05 or market_trend < -0.04: allocation_pct = 0.95 # Almost all available allocation elif market_deviation < -0.03 or market_trend < -0.02: allocation_pct = 0.8 elif market_deviation < -0.01 or market_trend < -0.01: allocation_pct = 0.6 else: allocation_pct = 0.4 # Adjust allocation based on strength of best candidate best_score = sorted_candidates[0][1] if sorted_candidates else 0 allocation_pct *= min(1.0, max(0.5, (best_score + 0.02) * 4)) # Determine number of ETFs to use - more in stronger downtrends num_etfs = 1 if (market_deviation < -0.04 or market_trend < -0.03) and len(sorted_candidates) > 1: num_etfs = min(2, len(sorted_candidates)) # Allocate to best candidates remaining_allocation = max_allocation * allocation_pct total_score = sum(score for _, score in sorted_candidates[:num_etfs]) if total_score > 0: for i in range(num_etfs): symbol, score = sorted_candidates[i] # Weight by relative score weight = score / total_score if total_score > 0 else 1.0/num_etfs # Calculate allocation etf_allocation = remaining_allocation * weight # Only allocate if meaningful if etf_allocation >= 0.02: # 2% minimum allocation allocations[symbol] = etf_allocation etf_type = "Inverse" if symbol in self.inverse_etfs else "Alternative" if symbol in self.alternative_defensive else "Sector" self.Debug(f"Selected {etf_type} ETF {symbol} with score {score*100:.2f}%, allocating {etf_allocation*100:.2f}%") return allocations
# region imports from AlgorithmImports import * # endregion from QuantConnect import * from QuantConnect.Algorithm import * from QuantConnect.Data import * from QuantConnect.Indicators import * from datetime import timedelta import numpy as np import pandas as pd import torch import os import torch.nn as nn from sklearn.preprocessing import RobustScaler class KQTStrategy: def __init__(self): self.model = None self.lookback = 30 self.scalers = {} self.feature_cols = [] self.stock_to_id = {} self.sector_mappings = {} self.adaptive_threshold = 0.2 self.pred_std = 1.0 self.current_regime = "neutral" self.portfolio_returns = [] self.defensive_mode = False self.previous_day_hit_stops = [] def create_sliding_sequences(self, df, feature_cols, lookback, stride=1): X = [] for i in range(0, len(df) - lookback + 1, stride): X.append(df.iloc[i:i+lookback][feature_cols].values.astype(np.float32)) return np.array(X) def clip_outliers(self, df, cols, lower=0.01, upper=0.99): df_copy = df.copy() for col in cols: if col in df_copy.columns: q_low = df_copy[col].quantile(lower) q_high = df_copy[col].quantile(upper) df_copy.loc[df_copy[col] < q_low, col] = q_low df_copy.loc[df_copy[col] > q_high, col] = q_high return df_copy def filter_features_to_match_model(self, df, feature_cols, required_count=5): """Ensure we have exactly the required number of features""" if len(feature_cols) == required_count: return feature_cols # First, prioritize the lag returns (most important) lag_features = [col for col in feature_cols if 'return_lag' in col] # Next, add in the most predictive technical features in a fixed order tech_priority = ['roc_5', 'volatility_10', 'ma_cross', 'dist_ma20', 'momentum_1m', 'oversold', 'overbought', 'roc_diff', 'volatility_regime'] prioritized_features = lag_features.copy() for feat in tech_priority: if feat in feature_cols and len(prioritized_features) < required_count: prioritized_features.append(feat) # If still not enough, add remaining features remaining = [col for col in feature_cols if col not in prioritized_features] while len(prioritized_features) < required_count and remaining: prioritized_features.append(remaining.pop(0)) # If too many, truncate return prioritized_features[:required_count] def add_technical_features(self, df): if 'Close' not in df.columns: return df df['ma5'] = df['Close'].rolling(5).mean() / df['Close'] - 1 # Relative to price df['ma20'] = df['Close'].rolling(20).mean() / df['Close'] - 1 df['ma_cross'] = df['ma5'] - df['ma20'] # Moving average crossover signal df['volatility_10'] = df['Close'].pct_change().rolling(10).std() df['volatility_ratio'] = df['Close'].pct_change().rolling(5).std() / df['Close'].pct_change().rolling(20).std() df['roc_5'] = df['Close'].pct_change(5) df['roc_10'] = df['Close'].pct_change(10) df['roc_diff'] = df['roc_5'] - df['roc_10'] df['dist_ma20'] = (df['Close'] / df['Close'].rolling(20).mean() - 1) return df.fillna(0) def add_enhanced_features(self, df): """Add enhanced technical features""" df['volatility_trend'] = df['volatility_10'].pct_change(5) df['volatility_regime'] = (df['volatility_10'] > df['volatility_10'].rolling(20).mean()).astype(int) if 'volume' in df.columns: df['vol_ma_ratio'] = df['volume'] / df['volume'].rolling(20).mean() df['vol_price_trend'] = df['vol_ma_ratio'] * df['roc_5'] df['momentum_1m'] = df['Close'].pct_change(20) df['momentum_3m'] = df['Close'].pct_change(60) df['momentum_breadth'] = ( (df['roc_5'] > 0).astype(int) + (df['momentum_1m'] > 0).astype(int) + (df['momentum_3m'] > 0).astype(int) ) / 3 df['mean_rev_signal'] = -1 * df['dist_ma20'] * df['volatility_10'] df['oversold'] = (df['dist_ma20'] < -2 * df['volatility_10']).astype(int) df['overbought'] = (df['dist_ma20'] > 2 * df['volatility_10']).astype(int) df['regime_change'] = (np.sign(df['ma_cross']) != np.sign(df['ma_cross'].shift(1))).astype(int) df['risk_adj_momentum'] = df['roc_5'] / (df['volatility_10'] + 0.001) return df def prepare_stock_data(self, stock_data, ticker, is_training=False): """Prepare data for a single stock""" if len(stock_data) < self.lookback + 5: # Need enough data return None, None stock_df = pd.DataFrame({ 'Close': stock_data['close'].values, 'time': stock_data['time'].values }) if 'volume' in stock_data.columns: stock_df['volume'] = stock_data['volume'].values stock_df = stock_df.sort_values('time').reset_index(drop=True) stock_df['pct_return'] = stock_df['Close'].pct_change().shift(-1) * 100 # In prepare_stock_data, replace the feature cols section with: feature_cols = [] # Add basic lag features for i in range(1, 6): col_name = f'return_lag{i}' stock_df[col_name] = stock_df['pct_return'].shift(i) feature_cols.append(col_name) # Add technical features stock_df = self.add_technical_features(stock_df) stock_df = self.add_enhanced_features(stock_df) # Add all potential features additional_features = ['ma_cross', 'volatility_10', 'roc_5', 'roc_diff', 'dist_ma20'] enhanced_features = ['volatility_trend', 'volatility_regime', 'momentum_1m', 'momentum_breadth', 'mean_rev_signal', 'oversold', 'overbought', 'regime_change', 'risk_adj_momentum'] for col in additional_features + enhanced_features: if col in stock_df.columns: feature_cols.append(col) # Filter to the exact number of features expected by the model model_feature_count = 5 # Use the exact count from your model feature_cols = self.filter_features_to_match_model(stock_df, feature_cols, model_feature_count) if not self.feature_cols: self.feature_cols = feature_cols.copy() stock_df = stock_df.dropna().reset_index(drop=True) # Handle outliers stock_df = self.clip_outliers(stock_df, feature_cols) # Replace the scaling code in prepare_stock_data with this: # Scale features if ticker not in self.scalers or is_training: # Check if we have data if len(stock_df) == 0 or len(feature_cols) == 0: return None, stock_df # Return early if no data # Check if any features are empty/nan if stock_df[feature_cols].isna().any().any() or stock_df[feature_cols].empty: # Fill NaNs with zeros stock_df[feature_cols] = stock_df[feature_cols].fillna(0) # Ensure we have data if len(stock_df[feature_cols]) > 0: try: scaler = RobustScaler() stock_df[feature_cols] = scaler.fit_transform(stock_df[feature_cols]) self.scalers[ticker] = scaler except Exception as e: print(f"Scaling error for {ticker}: {str(e)}") # Use a simple standardization as fallback for col in feature_cols: mean = stock_df[col].mean() std = stock_df[col].std() if std > 0: stock_df[col] = (stock_df[col] - mean) / std else: stock_df[col] = 0 else: return None, stock_df # Return early if empty after processing else: # Use existing scaler scaler = self.scalers[ticker] try: stock_df[feature_cols] = scaler.transform(stock_df[feature_cols]) except Exception as e: print(f"Transform error for {ticker}: {str(e)}") # Simple standardization fallback for col in feature_cols: if col in stock_df.columns and len(stock_df[col]) > 0: mean = stock_df[col].mean() std = stock_df[col].std() if std > 0: stock_df[col] = (stock_df[col] - mean) / std else: stock_df[col] = 0 # Create sequences for prediction X = self.create_sliding_sequences(stock_df, feature_cols, self.lookback, stride=1) if len(X) == 0: return None, stock_df return X, stock_df # Add to strategy.py in KQTStrategy class def calculate_portfolio_risk_score(self, market_returns): """Calculate a portfolio risk score (0-100) to scale overall exposure""" risk_score = 50 # Neutral starting point # VIX-like volatility measurement using SPY returns if len(market_returns) >= 5: recent_vol = np.std(market_returns[-5:]) * np.sqrt(252) # Annualized longer_vol = np.std(market_returns[-10:]) * np.sqrt(252) if len(market_returns) >= 10 else recent_vol # Volatility spike detection vol_ratio = recent_vol / longer_vol if longer_vol > 0 else 1 if vol_ratio > 1.5: # Sharp volatility increase risk_score -= 30 elif vol_ratio > 1.2: risk_score -= 15 # Consecutive negative days if len(market_returns) >= 3: neg_days = sum(1 for r in market_returns[-3:] if r < 0) if neg_days == 3: # Three consecutive down days risk_score -= 20 elif neg_days == 2: risk_score -= 10 # Trend direction if len(market_returns) >= 10: avg_recent = np.mean(market_returns[-5:]) avg_older = np.mean(market_returns[-10:-5]) trend_change = avg_recent - avg_older # Declining trend if trend_change < -0.3: risk_score -= 15 # Accelerating uptrend elif trend_change > 0.3 and avg_recent > 0: risk_score += 10 return max(10, min(100, risk_score)) # Constrain between 10-100 def predict_returns(self, X, ticker): """Make predictions for a single stock""" if self.model is None: return 0 if ticker not in self.stock_to_id: self.stock_to_id[ticker] = len(self.stock_to_id) stock_id = self.stock_to_id[ticker] try: X_tensor = torch.tensor(X, dtype=torch.float32) stock_ids = torch.tensor([stock_id] * len(X), dtype=torch.long) with torch.no_grad(): predictions = self.model(X_tensor, stock_ids) # Convert to standard Python float for safety return float(predictions.detach().numpy().flatten()[-1]) except Exception as e: print(f"Prediction error for {ticker}: {e}") return 0 # Return neutral prediction on error def detect_market_regime(self, daily_returns, lookback=10): """Detect current market regime based on portfolio returns""" if len(daily_returns) >= 1: market_return = np.mean(daily_returns) market_vol = np.std(daily_returns) if len(self.portfolio_returns) >= 3: recent_returns = self.portfolio_returns[-min(lookback, len(self.portfolio_returns)):] avg_recent_return = np.mean(recent_returns) if len(self.portfolio_returns) >= 5: very_recent = np.mean(self.portfolio_returns[-3:]) less_recent = np.mean(self.portfolio_returns[-min(8, len(self.portfolio_returns)):-3]) trend_change = very_recent - less_recent if trend_change > 0.5 and avg_recent_return > 0.2: return "breakout_bullish" elif trend_change < -0.5 and avg_recent_return < -0.2: return "breakdown_bearish" if avg_recent_return > 0.15: if market_return > 0: return "bullish_strong" else: return "bullish_pullback" elif avg_recent_return < -0.3: if market_return < -0.2: return "bearish_high_vol" else: return "bearish_low_vol" elif avg_recent_return > 0 and market_return > 0: return "bullish" elif avg_recent_return < 0 and market_return < 0: return "bearish" if market_return > -0.05: return "neutral" else: return "bearish" return "neutral" def detect_bearish_signals(self, recent_returns): """Detect early warning signs of bearish conditions""" bearish_signals = 0 signal_strength = 0 if len(self.portfolio_returns) >= 5: recent_portfolio_returns = self.portfolio_returns[-5:] pos_days = sum(1 for r in recent_portfolio_returns if r > 0) neg_days = sum(1 for r in recent_portfolio_returns if r < 0) if neg_days > pos_days: bearish_signals += 1 signal_strength += 0.2 * (neg_days - pos_days) if len(self.portfolio_returns) >= 10: recent_vol = np.std(self.portfolio_returns[-5:]) older_vol = np.std(self.portfolio_returns[-10:-5]) if recent_vol > older_vol * 1.3: # 30% volatility increase bearish_signals += 1 signal_strength += 0.3 * (recent_vol/older_vol - 1) if len(self.portfolio_returns) >= 5: if self.portfolio_returns[-1] < 0 and self.portfolio_returns[-2] > 0.3: bearish_signals += 1 signal_strength += 0.3 return bearish_signals, signal_strength def generate_positions(self, prediction_data, current_returns=None): """Generate position sizing based on predictions with improved diversification""" if not prediction_data: return {} # Update market regime if current_returns is not None: self.current_regime = self.detect_market_regime(current_returns) bearish_count, bearish_strength = self.detect_bearish_signals(current_returns) self.defensive_mode = bearish_count >= 2 or bearish_strength > 0.5 # Calculate portfolio risk score (0-100) portfolio_risk_score = self.calculate_portfolio_risk_score(current_returns if current_returns else []) # Convert to a scaling factor (0.1 to 1.0) risk_scaling = portfolio_risk_score / 100 base_threshold = 0.25 * self.pred_std if self.current_regime in ["bullish_strong", "breakout_bullish"]: self.adaptive_threshold = base_threshold * 0.4 elif self.current_regime in ["bearish_high_vol", "breakdown_bearish"]: self.adaptive_threshold = base_threshold * 2.5 elif self.current_regime in ["bearish", "bearish_low_vol"]: self.adaptive_threshold = base_threshold * 1.6 elif self.current_regime in ["bullish_pullback"]: self.adaptive_threshold = base_threshold * 0.9 else: # neutral or other regimes self.adaptive_threshold = base_threshold * 0.75 positions = {} # Group stocks by sector sector_data = {} for ticker, data in prediction_data.items(): pred_return = data["pred_return"] sector = self.sector_mappings.get(ticker, "Unknown") if sector not in sector_data: sector_data[sector] = [] sector_data[sector].append({ "ticker": ticker, "pred_return": pred_return, "composite_score": pred_return / self.adaptive_threshold }) # Rank sectors by predicted return sector_avg_scores = {} for sector, stocks in sector_data.items(): sector_avg_scores[sector] = np.mean([s["pred_return"] for s in stocks]) # CHANGE: Include more sectors (3-4 instead of just 2) ranked_sectors = sorted(sector_avg_scores.keys(), key=lambda x: sector_avg_scores[x], reverse=True) top_sector_count = 3 if portfolio_risk_score > 60 else 2 # More diversification in lower risk periods top_sectors = ranked_sectors[:min(top_sector_count, len(ranked_sectors))] # CHANGE: Allow more stocks per sector in bull markets stocks_per_sector = 3 if self.current_regime in ["bullish_strong", "breakout_bullish"] else 2 # Allocate within top sectors - focus on stocks with strongest signals for sector in top_sectors: sector_stocks = sorted(sector_data[sector], key=lambda x: x["pred_return"], reverse=True) # Take top N stocks in each selected sector top_stocks = sector_stocks[:min(stocks_per_sector, len(sector_stocks))] # CHANGE: Make position size proportional to signal strength but limited by volatility for stock in top_stocks: ticker = stock["ticker"] signal_strength = stock["pred_return"] / (0.2 * self.pred_std) # Base size calculation base_size = min(0.3, max(0.05, 0.15 * signal_strength)) # Scale by portfolio risk final_size = base_size * risk_scaling positions[ticker] = final_size # Defensive adjustments if self.defensive_mode or self.current_regime in ["bearish_high_vol", "bearish_low_vol", "breakdown_bearish"]: # 1. Reduce overall position sizes scaling_factor = 0.5 if self.defensive_mode else 0.7 # More aggressive reduction for ticker in positions: positions[ticker] *= scaling_factor # 2. Add inverse positions (shorts) as hedges if we have bearish predictions if len(positions) > 0 and portfolio_risk_score < 40: # Only hedge in higher risk environments negative_preds = {t: data["pred_return"] for t, data in prediction_data.items() if data["pred_return"] < 0 and t not in positions} if negative_preds: worst_stocks = sorted(negative_preds.items(), key=lambda x: x[1])[:2] for ticker, pred in worst_stocks: hedge_size = -0.15 if self.defensive_mode else -0.1 positions[ticker] = hedge_size return positions def get_stop_loss_level(self): """Get appropriate stop-loss level based on market regime""" if self.current_regime in ["bullish_strong", "breakout_bullish"]: if self.defensive_mode: return -2.0 # Tighter in defensive mode else: return -3.5 # More room for positions to breathe elif self.current_regime in ["bearish_high_vol", "breakdown_bearish"]: return -1.5 # Tighter stop-loss in bearish regimes else: if self.defensive_mode: return -1.8 else: return -2.5 def update_portfolio_returns(self, daily_return): """Update portfolio return history""" self.portfolio_returns.append(daily_return) if len(self.portfolio_returns) > 60: # Keep a rolling window self.portfolio_returns = self.portfolio_returns[-60:] def update_model_calibration(self, all_predictions): """Update prediction standard deviation for threshold calibration""" all_pred_values = [p for p in all_predictions.values()] if len(all_pred_values) > 5: self.pred_std = np.std(all_pred_values)