Overall Statistics |
Total Orders 184 Average Win 5.97% Average Loss -2.68% Compounding Annual Return 42.709% Drawdown 31.700% Expectancy 1.033 Start Equity 10000 End Equity 83704.93 Net Profit 737.049% Sharpe Ratio 1.147 Sortino Ratio 1.057 Probabilistic Sharpe Ratio 60.371% Loss Rate 37% Win Rate 63% Profit-Loss Ratio 2.22 Alpha 0.256 Beta 0.357 Annual Standard Deviation 0.256 Annual Variance 0.065 Information Ratio 0.696 Tracking Error 0.271 Treynor Ratio 0.822 Total Fees $1820.72 Estimated Strategy Capacity $480000000.00 Lowest Capacity Asset MSTR RBGP9S2961YD Portfolio Turnover 3.42% |
from AlgorithmImports import * from sklearn.linear_model import LogisticRegression from sklearn.model_selection import train_test_split from sklearn.metrics import accuracy_score from sklearn.preprocessing import StandardScaler import numpy as np import pandas as pd # Custom fee model for 0.1% per trade class PercentageFeeModel(FeeModel): def GetOrderFee(self, parameters): security = parameters.Security order = parameters.Order fee = 0.001 * security.Price * abs(order.Quantity) currency = security.QuoteCurrency.Symbol return OrderFee(CashAmount(fee, currency)) class MLTradingAlgorithm(QCAlgorithm): def Initialize(self): # Algorithm Parameters self.SetStartDate(2019, 1, 1) # Start date self.SetEndDate(2024, 12, 31) # End date self.SetCash(10000) # Initial capital # Configurable ticker symbols and allocation percentage self.trading_ticker = self.GetParameter("trading_ticker", "MSTR") self.benchmark_ticker = self.GetParameter("benchmark_ticker", "SPY") self.allocation_percentage = self.GetParameter("allocation_percentage", 0.4) # Add trading equity with custom fee and slippage models trading_security = self.AddEquity(self.trading_ticker, Resolution.Daily) trading_security.SetFeeModel(PercentageFeeModel()) trading_security.SetSlippageModel(ConstantSlippageModel(0)) self.symbol = trading_security.Symbol # Add benchmark equity with custom fee and slippage models benchmark_security = self.AddEquity(self.benchmark_ticker, Resolution.Daily) benchmark_security.SetFeeModel(PercentageFeeModel()) benchmark_security.SetSlippageModel(ConstantSlippageModel(0)) self.benchmark_symbol = benchmark_security.Symbol # RollingWindow to store 200 days of TradeBar data for trading asset self.data = RollingWindow[TradeBar](200) # Warm-up period self.SetWarmUp(200) # Initialize Logistic Regression model and scaler self.model = LogisticRegression(random_state=42) self.scaler = StandardScaler() # Retained for feature scaling self.training_count = 0 self.is_model_trained = False # Tracks if the model is trained # Schedule training every Monday at 10:00 AM self.Schedule.On(self.DateRules.Every(DayOfWeek.Monday), self.TimeRules.At(10, 0), self.TrainModel) # Initialize variables for benchmarking self.beat_benchmark_count = 0 self.trade_entry_price = None # To store the entry price of a trade self.benchmark_entry_price = None # To store the benchmark price at trade entry def OnData(self, data): # Ensure data exists for trading symbol if not data.ContainsKey(self.symbol): return trade_bar = data[self.symbol] if trade_bar is None: return # Add TradeBar to Rolling Window self.data.Add(trade_bar) # Check if RollingWindow is ready if not self.data.IsReady or self.data.Count < 200: return # Ensure model is trained before making predictions if not self.is_model_trained: self.Debug("Model is not trained yet. Skipping prediction.") return # Extract features for prediction df = self.GetFeatureDataFrame() if df is None or len(df) < 1: return latest_features = df.iloc[-1, :-1].values.reshape(1, -1) # Transform features using the fitted scaler latest_features_scaled = self.scaler.transform(latest_features) # Make predictions using probability threshold try: prob_class = self.model.predict_proba(latest_features_scaled)[0][1] # Probability of class 1 prediction = 1 if prob_class > 0.5 else 0 except Exception as e: self.Debug(f"Error: Model prediction failed. {e}") return # Trading logic holdings = self.Portfolio[self.symbol].Quantity # Buy if prediction = 1 and not currently invested if prediction == 1 and holdings <= 0: self.SetHoldings(self.symbol, self.allocation_percentage) # Record the entry prices for the trade and benchmark self.trade_entry_price = trade_bar.Close if self.benchmark_symbol in data and data[self.benchmark_symbol] is not None: self.benchmark_entry_price = data[self.benchmark_symbol].Close else: self.benchmark_entry_price = None # Sell if prediction = 0 and currently invested elif prediction == 0 and holdings > 0: # Calculate trade return and benchmark return if self.trade_entry_price is not None and self.benchmark_entry_price is not None: trade_exit_price = trade_bar.Close trade_return = (trade_exit_price - self.trade_entry_price) / self.trade_entry_price if self.benchmark_symbol in data and data[self.benchmark_symbol] is not None: benchmark_exit_price = data[self.benchmark_symbol].Close benchmark_return = (benchmark_exit_price - self.benchmark_entry_price) / self.benchmark_entry_price # Compare trade return with benchmark return if trade_return > benchmark_return: self.beat_benchmark_count += 1 # Reset entry prices after the trade is closed self.trade_entry_price = None self.benchmark_entry_price = None # Execute the sell order self.Liquidate(self.symbol) def TrainModel(self): # Prepare training data df = self.GetFeatureDataFrame() if df is None or len(df) < 50: # Require enough data to train self.Debug("Insufficient data for training.") return # Split data chronologically (no shuffle) X = df.iloc[:, :-1] # Features y = df.iloc[:, -1] # Target (0 or 1) X_train, X_test, y_train, y_test = train_test_split( X, y, test_size=0.2, shuffle=False, random_state=42 ) # Fit the scaler on training data and transform both train and test sets X_train_scaled = self.scaler.fit_transform(X_train) X_test_scaled = self.scaler.transform(X_test) # Train Logistic Regression model on scaled data self.model.fit(X_train_scaled, y_train) self.is_model_trained = True # Evaluate model performance y_train_prob = self.model.predict_proba(X_train_scaled)[:, 1] y_train_pred_binary = [1 if val > 0.5 else 0 for val in y_train_prob] train_accuracy = accuracy_score(y_train, y_train_pred_binary) y_test_prob = self.model.predict_proba(X_test_scaled)[:, 1] y_test_pred_binary = [1 if val > 0.5 else 0 for val in y_test_prob] test_accuracy = accuracy_score(y_test, y_test_pred_binary) self.training_count += 1 self.Debug(f"Training #{self.training_count}: " f"Train Accuracy: {train_accuracy:.2%}, " f"Test Accuracy: {test_accuracy:.2%}") def GetFeatureDataFrame(self): # Wait until we have 200 data points in the rolling window if self.data.Count < 200: return None # Extract data from RollingWindow close_prices = [bar.Close for bar in self.data] high_prices = [bar.High for bar in self.data] low_prices = [bar.Low for bar in self.data] volumes = [bar.Volume for bar in self.data] # Create DataFrame df = pd.DataFrame({ "Close": close_prices, "High": high_prices, "Low": low_prices, "Volume": volumes }) # Feature Engineering: Bollinger Bands df["SMA_20"] = df["Close"].rolling(window=20).mean() df["Std_20"] = df["Close"].rolling(window=20).std() df["Upper_Band"] = df["SMA_20"] + 2 * df["Std_20"] df["Lower_Band"] = df["SMA_20"] - 2 * df["Std_20"] df["Band_Width"] = df["Upper_Band"] - df["Lower_Band"] # Feature Engineering: Stochastic Oscillator df["Lowest_Low_14"] = df["Low"].rolling(window=14).min() df["Highest_High_14"] = df["High"].rolling(window=14).max() df["%K"] = 100 * (df["Close"] - df["Lowest_Low_14"]) / (df["Highest_High_14"] - df["Lowest_Low_14"]) df["%D"] = df["%K"].rolling(window=3).mean() # Feature Engineering: Average True Range (ATR) df["TR1"] = df["High"] - df["Low"] df["TR2"] = abs(df["High"] - df["Close"].shift(1)) df["TR3"] = abs(df["Low"] - df["Close"].shift(1)) df["TR"] = df[["TR1", "TR2", "TR3"]].max(axis=1) df["ATR_14"] = df["TR"].rolling(window=14).mean() # Feature Engineering: On-Balance Volume (OBV) df["OBV"] = (np.sign(df["Close"].diff()) * df["Volume"]).cumsum() df["OBV"] = df["OBV"].fillna(0) # Define Target: 1 if next day's Close > today's Close, else 0 df["Target"] = (df["Close"].shift(-1) > df["Close"]).astype(int) # Select features for the model features = ["SMA_20", "Upper_Band", "Lower_Band", "Band_Width", "%K", "%D", "ATR_14", "OBV"] df = df[features + ["Target"]] # Remove rows with NaN values df.dropna(inplace=True) return df def OnEndOfAlgorithm(self): # Print the number of times the strategy beat the benchmark self.Log(f"Number of times strategy beat {self.benchmark_ticker}: {self.beat_benchmark_count}")