Overall Statistics |
Total Orders 586 Average Win 0.86% Average Loss -1.21% Compounding Annual Return 1.027% Drawdown 35.400% Expectancy 0.028 Start Equity 10000 End Equity 10628.96 Net Profit 6.290% Sharpe Ratio -0.152 Sortino Ratio -0.148 Probabilistic Sharpe Ratio 0.493% Loss Rate 40% Win Rate 60% Profit-Loss Ratio 0.71 Alpha -0.016 Beta 0.006 Annual Standard Deviation 0.098 Annual Variance 0.01 Information Ratio -0.621 Tracking Error 0.191 Treynor Ratio -2.317 Total Fees $577.00 Estimated Strategy Capacity $750000000.00 Lowest Capacity Asset MSTR RBGP9S2961YD Portfolio Turnover 5.12% |
from AlgorithmImports import * from sklearn.ensemble import RandomForestClassifier from sklearn.model_selection import train_test_split from sklearn.metrics import accuracy_score import numpy as np import pandas as pd class MLTradingAlgorithm(QCAlgorithm): def Initialize(self): # 1. Setup Algorithm Parameters self.SetStartDate(2019, 1, 1) self.SetEndDate(2024, 12, 31) self.SetCash(10000) # Enough capital for partial allocation # 2. Partial allocation (e.g., 20%) self.allocation = 0.2 # 3. Add Equity self.symbol = self.AddEquity("MSTR", Resolution.Daily).Symbol # 4. Rolling Window for 200 Days of TradeBar data self.data = RollingWindow[TradeBar](200) self.SetWarmUp(200) # 5. ML Model (Random Forest) self.model = RandomForestClassifier(n_estimators=100, random_state=42) self.is_model_trained = False self.training_count = 0 # 6. Schedule Weekly Retraining self.Schedule.On(self.DateRules.Every(DayOfWeek.Monday), self.TimeRules.At(10, 0), self.TrainModel) # 7. Trailing Stop Logic self.highestPrice = 0 self.trailStopTicket = None self.trailing_stop_pct = 1 # 5% trailing stop below highest price # 8. Configurable usage of RSI or SMA # Set True to use RSI, False to use short/long SMAs self.useRSI = True def OnData(self, data): # Check for valid data if not data.ContainsKey(self.symbol): return trade_bar = data[self.symbol] if trade_bar is None: return # Update rolling window self.data.Add(trade_bar) if not self.data.IsReady or self.data.Count < 200: return # Skip if model not trained if not self.is_model_trained: return # Build features for the latest bar df = self.GetFeatureDataFrame() if df is None or len(df) == 0: return latest_features = df.iloc[-1, :-1].values.reshape(1, -1) try: prediction = self.model.predict(latest_features)[0] # 1 = Buy, 0 = Sell except: return holdings = self.Portfolio[self.symbol].Quantity # ----------------------------- # Trading Logic # ----------------------------- if prediction == 1 and holdings <= 0: # If we are short, first liquidate if holdings < 0: self.Liquidate(self.symbol) self.ResetTrailingStop() # Go long with partial allocation self.SetHoldings(self.symbol, self.allocation) # Reset trailing stop for new position self.highestPrice = trade_bar.Close self.PlaceOrUpdateTrailingStop() elif prediction == 0 and holdings > 0: # Liquidate if we hold a long position self.Liquidate(self.symbol) self.ResetTrailingStop() # Update trailing stop if we're holding long if holdings > 0: # If price made a new high, update the stop if trade_bar.Close > self.highestPrice: self.highestPrice = trade_bar.Close self.PlaceOrUpdateTrailingStop() def TrainModel(self): df = self.GetFeatureDataFrame() if df is None or len(df) < 50: self.Debug("Insufficient data for training.") return X = df.iloc[:, :-1] y = df.iloc[:, -1] # 80/20 split (time-based, no shuffle) X_train, X_test, y_train, y_test = train_test_split( X, y, test_size=0.2, shuffle=False, random_state=42 ) # Fit the model self.model.fit(X_train, y_train) self.is_model_trained = True # Evaluate y_train_pred = self.model.predict(X_train) train_accuracy = accuracy_score(y_train, y_train_pred) y_test_pred = self.model.predict(X_test) test_accuracy = accuracy_score(y_test, y_test_pred) self.training_count += 1 self.Debug(f"Training #{self.training_count}: " f"Train Accuracy: {train_accuracy:.2%}, " f"Test Accuracy: {test_accuracy:.2%}") def GetFeatureDataFrame(self): """ Build the DataFrame of features: - Bollinger Bands (20-day) - Historical Volatility (HV_30) - Optionally RSI or short/long SMAs - Target = next day's close higher than today's close """ if self.data.Count < 200: return None close_prices = [bar.Close for bar in self.data] df = pd.DataFrame(close_prices, columns=["Close"]) # --------------------- # 1) Bollinger Bands # --------------------- period = 20 df["BB_mid"] = df["Close"].rolling(period).mean() df["BB_std"] = df["Close"].rolling(period).std() df["BB_upper"] = df["BB_mid"] + 2 * df["BB_std"] df["BB_lower"] = df["BB_mid"] - 2 * df["BB_std"] # --------------------- # 2) Historical Volatility (30-day) # --------------------- df["daily_returns"] = df["Close"].pct_change() df["HV_30"] = df["daily_returns"].rolling(window=30).std() * np.sqrt(252) # --------------------- # 3) RSI (if self.useRSI == True) # or short/long SMAs (if self.useRSI == False) # --------------------- if self.useRSI: # RSI calculation over 14 days delta = df["Close"].diff() gain = (delta.where(delta > 0, 0)).rolling(14).mean() loss = (-delta.where(delta < 0, 0)).rolling(14).mean() rs = gain / loss df["RSI"] = 100 - (100 / (1 + rs)) else: # Short/Long SMAs (e.g., 10-day and 50-day) df["SMA_10"] = df["Close"].rolling(10).mean() df["SMA_50"] = df["Close"].rolling(50).mean() # --------------------- # 4) Target # --------------------- df["Target"] = (df["Close"].shift(-1) > df["Close"]).astype(int) # Cleanup df.dropna(inplace=True) # Remove daily_returns from final features df.drop(columns=["daily_returns"], inplace=True) return df # ----------------------------- # Trailing Stop Methods # ----------------------------- def PlaceOrUpdateTrailingStop(self): """ Places a stop-market order ticket if we don't have one, or updates the stop price if we do. """ quantity = self.Portfolio[self.symbol].Quantity if quantity <= 0: return newStopPrice = self.highestPrice * self.trailing_stop_pct if (not self.trailStopTicket or self.trailStopTicket.Status in [OrderStatus.Filled, OrderStatus.Canceled, OrderStatus.Invalid]): # Create a new stop-market ticket (to sell our entire long position if triggered) self.trailStopTicket = self.StopMarketOrder(self.symbol, -quantity, newStopPrice) else: # Update existing ticket updateFields = UpdateOrderFields() updateFields.StopPrice = newStopPrice self.trailStopTicket.Update(updateFields) def ResetTrailingStop(self): """ Resets trailing stop variables and cancels any active stop order when we exit or reverse the position. """ self.highestPrice = 0 if self.trailStopTicket is not None: if self.trailStopTicket.Status not in [OrderStatus.Filled, OrderStatus.Canceled, OrderStatus.Invalid]: self.trailStopTicket.Cancel("Exiting or reversing position.") self.trailStopTicket = None