Created with Highcharts 12.1.2EquityJan 2019Jan…Jul 2019Jan 2020Jul 2020Jan 2021Jul 2021Jan 2022Jul 2022Jan 2023Jul 2023Jan 2024Jul 2024Jan 20250100k200k-50-25000.20.4010200M02M02040
Overall Statistics
Total Orders
184
Average Win
5.97%
Average Loss
-2.68%
Compounding Annual Return
42.709%
Drawdown
31.700%
Expectancy
1.033
Start Equity
10000
End Equity
83704.93
Net Profit
737.049%
Sharpe Ratio
1.147
Sortino Ratio
1.057
Probabilistic Sharpe Ratio
60.371%
Loss Rate
37%
Win Rate
63%
Profit-Loss Ratio
2.22
Alpha
0.256
Beta
0.357
Annual Standard Deviation
0.256
Annual Variance
0.065
Information Ratio
0.696
Tracking Error
0.271
Treynor Ratio
0.822
Total Fees
$1820.72
Estimated Strategy Capacity
$480000000.00
Lowest Capacity Asset
MSTR RBGP9S2961YD
Portfolio Turnover
3.42%
from AlgorithmImports import *
from sklearn.linear_model import LogisticRegression
from sklearn.model_selection import train_test_split
from sklearn.metrics import accuracy_score
from sklearn.preprocessing import StandardScaler
import numpy as np
import pandas as pd

# Custom fee model for 0.1% per trade
class PercentageFeeModel(FeeModel):
    def GetOrderFee(self, parameters):
        security = parameters.Security
        order = parameters.Order
        fee = 0.001 * security.Price * abs(order.Quantity)
        currency = security.QuoteCurrency.Symbol
        return OrderFee(CashAmount(fee, currency))

class MLTradingAlgorithm(QCAlgorithm):
    
    def Initialize(self):
        # Algorithm Parameters
        self.SetStartDate(2019, 1, 1)         # Start date
        self.SetEndDate(2024, 12, 31)         # End date
        self.SetCash(10000)                   # Initial capital

        # Configurable ticker symbols and allocation percentage
        self.trading_ticker = self.GetParameter("trading_ticker", "MSTR")
        self.benchmark_ticker = self.GetParameter("benchmark_ticker", "SPY")
        self.allocation_percentage = self.GetParameter("allocation_percentage", 0.4)

        # Add trading equity with custom fee and slippage models
        trading_security = self.AddEquity(self.trading_ticker, Resolution.Daily)
        trading_security.SetFeeModel(PercentageFeeModel())
        trading_security.SetSlippageModel(ConstantSlippageModel(0))
        self.symbol = trading_security.Symbol
        
        # Add benchmark equity with custom fee and slippage models
        benchmark_security = self.AddEquity(self.benchmark_ticker, Resolution.Daily)
        benchmark_security.SetFeeModel(PercentageFeeModel())
        benchmark_security.SetSlippageModel(ConstantSlippageModel(0))
        self.benchmark_symbol = benchmark_security.Symbol

        # RollingWindow to store 200 days of TradeBar data for trading asset
        self.data = RollingWindow[TradeBar](200)

        # Warm-up period
        self.SetWarmUp(200)

        # Initialize Logistic Regression model and scaler
        self.model = LogisticRegression(random_state=42)
        self.scaler = StandardScaler()  # Retained for feature scaling
        self.training_count = 0
        self.is_model_trained = False  # Tracks if the model is trained

        # Schedule training every Monday at 10:00 AM
        self.Schedule.On(self.DateRules.Every(DayOfWeek.Monday), 
                         self.TimeRules.At(10, 0), 
                         self.TrainModel)
        
        # Initialize variables for benchmarking
        self.beat_benchmark_count = 0
        self.trade_entry_price = None  # To store the entry price of a trade
        self.benchmark_entry_price = None  # To store the benchmark price at trade entry
    
    def OnData(self, data):
        # Ensure data exists for trading symbol
        if not data.ContainsKey(self.symbol):
            return
        
        trade_bar = data[self.symbol]
        if trade_bar is None:
            return
        
        # Add TradeBar to Rolling Window
        self.data.Add(trade_bar)

        # Check if RollingWindow is ready
        if not self.data.IsReady or self.data.Count < 200:
            return
        
        # Ensure model is trained before making predictions
        if not self.is_model_trained:
            self.Debug("Model is not trained yet. Skipping prediction.")
            return

        # Extract features for prediction
        df = self.GetFeatureDataFrame()
        if df is None or len(df) < 1:
            return
        
        latest_features = df.iloc[-1, :-1].values.reshape(1, -1)
        
        # Transform features using the fitted scaler
        latest_features_scaled = self.scaler.transform(latest_features)
        
        # Make predictions using probability threshold
        try:
            prob_class = self.model.predict_proba(latest_features_scaled)[0][1]  # Probability of class 1
            prediction = 1 if prob_class > 0.5 else 0
        except Exception as e:
            self.Debug(f"Error: Model prediction failed. {e}")
            return
        
        # Trading logic
        holdings = self.Portfolio[self.symbol].Quantity
        
        # Buy if prediction = 1 and not currently invested
        if prediction == 1 and holdings <= 0:
            self.SetHoldings(self.symbol, self.allocation_percentage)
            # Record the entry prices for the trade and benchmark
            self.trade_entry_price = trade_bar.Close
            if self.benchmark_symbol in data and data[self.benchmark_symbol] is not None:
                self.benchmark_entry_price = data[self.benchmark_symbol].Close
            else:
                self.benchmark_entry_price = None
        
        # Sell if prediction = 0 and currently invested
        elif prediction == 0 and holdings > 0:
            # Calculate trade return and benchmark return
            if self.trade_entry_price is not None and self.benchmark_entry_price is not None:
                trade_exit_price = trade_bar.Close
                trade_return = (trade_exit_price - self.trade_entry_price) / self.trade_entry_price
                
                if self.benchmark_symbol in data and data[self.benchmark_symbol] is not None:
                    benchmark_exit_price = data[self.benchmark_symbol].Close
                    benchmark_return = (benchmark_exit_price - self.benchmark_entry_price) / self.benchmark_entry_price
                    
                    # Compare trade return with benchmark return
                    if trade_return > benchmark_return:
                        self.beat_benchmark_count += 1
            
            # Reset entry prices after the trade is closed
            self.trade_entry_price = None
            self.benchmark_entry_price = None
            
            # Execute the sell order
            self.Liquidate(self.symbol)

    def TrainModel(self):
        # Prepare training data
        df = self.GetFeatureDataFrame()
        if df is None or len(df) < 50:  # Require enough data to train
            self.Debug("Insufficient data for training.")
            return

        # Split data chronologically (no shuffle)
        X = df.iloc[:, :-1]  # Features
        y = df.iloc[:, -1]   # Target (0 or 1)
        
        X_train, X_test, y_train, y_test = train_test_split(
            X, y, test_size=0.2, shuffle=False, random_state=42
        )

        # Fit the scaler on training data and transform both train and test sets
        X_train_scaled = self.scaler.fit_transform(X_train)
        X_test_scaled = self.scaler.transform(X_test)

        # Train Logistic Regression model on scaled data
        self.model.fit(X_train_scaled, y_train)
        self.is_model_trained = True

        # Evaluate model performance
        y_train_prob = self.model.predict_proba(X_train_scaled)[:, 1]
        y_train_pred_binary = [1 if val > 0.5 else 0 for val in y_train_prob]
        train_accuracy = accuracy_score(y_train, y_train_pred_binary)

        y_test_prob = self.model.predict_proba(X_test_scaled)[:, 1]
        y_test_pred_binary = [1 if val > 0.5 else 0 for val in y_test_prob]
        test_accuracy = accuracy_score(y_test, y_test_pred_binary)
        
        self.training_count += 1
        self.Debug(f"Training #{self.training_count}: "
                   f"Train Accuracy: {train_accuracy:.2%}, "
                   f"Test Accuracy: {test_accuracy:.2%}")

    def GetFeatureDataFrame(self):
        # Wait until we have 200 data points in the rolling window
        if self.data.Count < 200:
            return None
        
        # Extract data from RollingWindow
        close_prices = [bar.Close for bar in self.data]
        high_prices = [bar.High for bar in self.data]
        low_prices = [bar.Low for bar in self.data]
        volumes = [bar.Volume for bar in self.data]
        
        # Create DataFrame
        df = pd.DataFrame({
            "Close": close_prices,
            "High": high_prices,
            "Low": low_prices,
            "Volume": volumes
        })
        
        # Feature Engineering: Bollinger Bands
        df["SMA_20"] = df["Close"].rolling(window=20).mean()
        df["Std_20"] = df["Close"].rolling(window=20).std()
        df["Upper_Band"] = df["SMA_20"] + 2 * df["Std_20"]
        df["Lower_Band"] = df["SMA_20"] - 2 * df["Std_20"]
        df["Band_Width"] = df["Upper_Band"] - df["Lower_Band"]
        
        # Feature Engineering: Stochastic Oscillator
        df["Lowest_Low_14"] = df["Low"].rolling(window=14).min()
        df["Highest_High_14"] = df["High"].rolling(window=14).max()
        df["%K"] = 100 * (df["Close"] - df["Lowest_Low_14"]) / (df["Highest_High_14"] - df["Lowest_Low_14"])
        df["%D"] = df["%K"].rolling(window=3).mean()
        
        # Feature Engineering: Average True Range (ATR)
        df["TR1"] = df["High"] - df["Low"]
        df["TR2"] = abs(df["High"] - df["Close"].shift(1))
        df["TR3"] = abs(df["Low"] - df["Close"].shift(1))
        df["TR"] = df[["TR1", "TR2", "TR3"]].max(axis=1)
        df["ATR_14"] = df["TR"].rolling(window=14).mean()
        
        # Feature Engineering: On-Balance Volume (OBV)
        df["OBV"] = (np.sign(df["Close"].diff()) * df["Volume"]).cumsum()
        df["OBV"] = df["OBV"].fillna(0)
        
        # Define Target: 1 if next day's Close > today's Close, else 0
        df["Target"] = (df["Close"].shift(-1) > df["Close"]).astype(int)
        
        # Select features for the model
        features = ["SMA_20", "Upper_Band", "Lower_Band", "Band_Width", "%K", "%D", "ATR_14", "OBV"]
        df = df[features + ["Target"]]
        
        # Remove rows with NaN values
        df.dropna(inplace=True)

        return df
    
    def OnEndOfAlgorithm(self):
        # Print the number of times the strategy beat the benchmark
        self.Log(f"Number of times strategy beat {self.benchmark_ticker}: {self.beat_benchmark_count}")