Created with Highcharts 12.1.2EquityJan 2019Jan…Jul 2019Jan 2020Jul 2020Jan 2021Jul 2021Jan 2022Jul 2022Jan 2023Jul 2023Jan 2024Jul 2024Jan 20255k10k15k-50-25000.20.4-1010500M1,000M01M2M01020
Overall Statistics
Total Orders
586
Average Win
0.86%
Average Loss
-1.21%
Compounding Annual Return
1.027%
Drawdown
35.400%
Expectancy
0.028
Start Equity
10000
End Equity
10628.96
Net Profit
6.290%
Sharpe Ratio
-0.152
Sortino Ratio
-0.148
Probabilistic Sharpe Ratio
0.493%
Loss Rate
40%
Win Rate
60%
Profit-Loss Ratio
0.71
Alpha
-0.016
Beta
0.006
Annual Standard Deviation
0.098
Annual Variance
0.01
Information Ratio
-0.621
Tracking Error
0.191
Treynor Ratio
-2.317
Total Fees
$577.00
Estimated Strategy Capacity
$750000000.00
Lowest Capacity Asset
MSTR RBGP9S2961YD
Portfolio Turnover
5.12%
from AlgorithmImports import *
from sklearn.ensemble import RandomForestClassifier
from sklearn.model_selection import train_test_split
from sklearn.metrics import accuracy_score
import numpy as np
import pandas as pd

class MLTradingAlgorithm(QCAlgorithm):
    
    def Initialize(self):
        # 1. Setup Algorithm Parameters
        self.SetStartDate(2019, 1, 1)
        self.SetEndDate(2024, 12, 31)
        self.SetCash(10000)    # Enough capital for partial allocation

        # 2. Partial allocation (e.g., 20%)
        self.allocation = 0.2
        
        # 3. Add Equity
        self.symbol = self.AddEquity("MSTR", Resolution.Daily).Symbol
        
        # 4. Rolling Window for 200 Days of TradeBar data
        self.data = RollingWindow[TradeBar](200)
        self.SetWarmUp(200)
        
        # 5. ML Model (Random Forest)
        self.model = RandomForestClassifier(n_estimators=100, random_state=42)
        self.is_model_trained = False
        self.training_count = 0
        
        # 6. Schedule Weekly Retraining
        self.Schedule.On(self.DateRules.Every(DayOfWeek.Monday),
                         self.TimeRules.At(10, 0),
                         self.TrainModel)
        
        # 7. Trailing Stop Logic
        self.highestPrice = 0
        self.trailStopTicket = None
        self.trailing_stop_pct = 1  # 5% trailing stop below highest price
        
        # 8. Configurable usage of RSI or SMA
        #    Set True to use RSI, False to use short/long SMAs
        self.useRSI = True

    def OnData(self, data):
        # Check for valid data
        if not data.ContainsKey(self.symbol):
            return
        
        trade_bar = data[self.symbol]
        if trade_bar is None:
            return
        
        # Update rolling window
        self.data.Add(trade_bar)
        if not self.data.IsReady or self.data.Count < 200:
            return
        
        # Skip if model not trained
        if not self.is_model_trained:
            return
        
        # Build features for the latest bar
        df = self.GetFeatureDataFrame()
        if df is None or len(df) == 0:
            return
        
        latest_features = df.iloc[-1, :-1].values.reshape(1, -1)
        try:
            prediction = self.model.predict(latest_features)[0]  # 1 = Buy, 0 = Sell
        except:
            return
        
        holdings = self.Portfolio[self.symbol].Quantity
        
        # -----------------------------
        # Trading Logic
        # -----------------------------
        if prediction == 1 and holdings <= 0:
            # If we are short, first liquidate
            if holdings < 0:
                self.Liquidate(self.symbol)
                self.ResetTrailingStop()
            
            # Go long with partial allocation
            self.SetHoldings(self.symbol, self.allocation)
            
            # Reset trailing stop for new position
            self.highestPrice = trade_bar.Close
            self.PlaceOrUpdateTrailingStop()

        elif prediction == 0 and holdings > 0:
            # Liquidate if we hold a long position
            self.Liquidate(self.symbol)
            self.ResetTrailingStop()
        
        # Update trailing stop if we're holding long
        if holdings > 0:
            # If price made a new high, update the stop
            if trade_bar.Close > self.highestPrice:
                self.highestPrice = trade_bar.Close
                self.PlaceOrUpdateTrailingStop()

    def TrainModel(self):
        df = self.GetFeatureDataFrame()
        if df is None or len(df) < 50:
            self.Debug("Insufficient data for training.")
            return
        
        X = df.iloc[:, :-1]
        y = df.iloc[:, -1]

        # 80/20 split (time-based, no shuffle)
        X_train, X_test, y_train, y_test = train_test_split(
            X, y, test_size=0.2, shuffle=False, random_state=42
        )

        # Fit the model
        self.model.fit(X_train, y_train)
        self.is_model_trained = True

        # Evaluate
        y_train_pred = self.model.predict(X_train)
        train_accuracy = accuracy_score(y_train, y_train_pred)
        y_test_pred = self.model.predict(X_test)
        test_accuracy = accuracy_score(y_test, y_test_pred)

        self.training_count += 1
        self.Debug(f"Training #{self.training_count}: "
                   f"Train Accuracy: {train_accuracy:.2%}, "
                   f"Test Accuracy: {test_accuracy:.2%}")

    def GetFeatureDataFrame(self):
        """
        Build the DataFrame of features:
          - Bollinger Bands (20-day)
          - Historical Volatility (HV_30)
          - Optionally RSI or short/long SMAs
          - Target = next day's close higher than today's close
        """
        if self.data.Count < 200:
            return None
        
        close_prices = [bar.Close for bar in self.data]
        df = pd.DataFrame(close_prices, columns=["Close"])
        
        # ---------------------
        # 1) Bollinger Bands
        # ---------------------
        period = 20
        df["BB_mid"] = df["Close"].rolling(period).mean()
        df["BB_std"] = df["Close"].rolling(period).std()
        df["BB_upper"] = df["BB_mid"] + 2 * df["BB_std"]
        df["BB_lower"] = df["BB_mid"] - 2 * df["BB_std"]
        
        # ---------------------
        # 2) Historical Volatility (30-day)
        # ---------------------
        df["daily_returns"] = df["Close"].pct_change()
        df["HV_30"] = df["daily_returns"].rolling(window=30).std() * np.sqrt(252)
        
        # ---------------------
        # 3) RSI (if self.useRSI == True) 
        #    or short/long SMAs (if self.useRSI == False)
        # ---------------------
        if self.useRSI:
            # RSI calculation over 14 days
            delta = df["Close"].diff()
            gain = (delta.where(delta > 0, 0)).rolling(14).mean()
            loss = (-delta.where(delta < 0, 0)).rolling(14).mean()
            rs = gain / loss
            df["RSI"] = 100 - (100 / (1 + rs))
        else:
            # Short/Long SMAs (e.g., 10-day and 50-day)
            df["SMA_10"] = df["Close"].rolling(10).mean()
            df["SMA_50"] = df["Close"].rolling(50).mean()

        # ---------------------
        # 4) Target
        # ---------------------
        df["Target"] = (df["Close"].shift(-1) > df["Close"]).astype(int)
        
        # Cleanup
        df.dropna(inplace=True)

        # Remove daily_returns from final features
        df.drop(columns=["daily_returns"], inplace=True)

        return df

    # -----------------------------
    # Trailing Stop Methods
    # -----------------------------
    def PlaceOrUpdateTrailingStop(self):
        """
        Places a stop-market order ticket if we don't have one,
        or updates the stop price if we do.
        """
        quantity = self.Portfolio[self.symbol].Quantity
        if quantity <= 0:
            return
        
        newStopPrice = self.highestPrice * self.trailing_stop_pct
        
        if (not self.trailStopTicket 
            or self.trailStopTicket.Status in [OrderStatus.Filled, OrderStatus.Canceled, OrderStatus.Invalid]):
            
            # Create a new stop-market ticket (to sell our entire long position if triggered)
            self.trailStopTicket = self.StopMarketOrder(self.symbol, -quantity, newStopPrice)
        else:
            # Update existing ticket
            updateFields = UpdateOrderFields()
            updateFields.StopPrice = newStopPrice
            self.trailStopTicket.Update(updateFields)

    def ResetTrailingStop(self):
        """
        Resets trailing stop variables and cancels any active stop order
        when we exit or reverse the position.
        """
        self.highestPrice = 0
        if self.trailStopTicket is not None:
            if self.trailStopTicket.Status not in [OrderStatus.Filled, OrderStatus.Canceled, OrderStatus.Invalid]:
                self.trailStopTicket.Cancel("Exiting or reversing position.")
        self.trailStopTicket = None