Overall Statistics
Total Orders
0
Average Win
0%
Average Loss
0%
Compounding Annual Return
0%
Drawdown
0%
Expectancy
0
Start Equity
100000
End Equity
100000
Net Profit
0%
Sharpe Ratio
0
Sortino Ratio
0
Probabilistic Sharpe Ratio
0%
Loss Rate
0%
Win Rate
0%
Profit-Loss Ratio
0
Alpha
0
Beta
0
Annual Standard Deviation
0
Annual Variance
0
Information Ratio
2.136
Tracking Error
0.17
Treynor Ratio
0
Total Fees
$0.00
Estimated Strategy Capacity
$0
Lowest Capacity Asset
Portfolio Turnover
0%
from AlgorithmImports import *
import tensorflow as tf
import numpy as np
import pandas as pd


class MLPPredictionAlgorithm(QCAlgorithm):
    def Initialize(self):
        self.SetStartDate(2022, 1, 3)
        self.SetEndDate(2022, 3, 1)
        self.SetCash(100000)

        # Add SPX500 constituents
        self.symbols = [self.AddEquity(ticker, Resolution.Daily).Symbol for ticker in [
            "AAPL", "MSFT", "GOOGL", "AMZN", "META"]]

        # Dictionary to hold historical data
        self.data = {symbol: pd.DataFrame(
            columns=['open', 'high', 'low', 'close']) for symbol in self.symbols}

        # Model training flag
        self.model = None
        self.lookback = 20

        # Schedule the training event, and prediction event
        self.Schedule.On(self.DateRules.EveryDay(), self.TimeRules.At(
            12, 0), self.TrainModel)  # Schedule at noon every day
        self.Schedule.On(self.DateRules.EveryDay(), self.TimeRules.BeforeMarketClose(
            self.symbols[0], 15), self.MakePredictions)  # Schedule 15 minutes before market close


    def OnData(self, data):
        for symbol in self.symbols:
            if symbol not in data.Bars:
                # self.Debug("Missing data for symbol: " + str(symbol))
                continue

            # Update the historical data
            bar = data.Bars[symbol]
            new_row = pd.DataFrame({
                'open': [bar.Open],
                'high': [bar.High],
                'low': [bar.Low],
                'close': [bar.Close]
                # 'volume': [bar.Volume]
            })
            self.data[symbol] = pd.concat(
                [self.data[symbol], new_row], ignore_index=True)

            if len(self.data[symbol]) > self.lookback:
                self.data[symbol] = self.data[symbol].iloc[1:]

            #  self.debug(f"Data for {symbol}: {self.data[symbol].tail()}")


    def ModelConstruction(self):
        model = tf.keras.models.Sequential()
        model.add(tf.keras.layers.Dense(
            64, input_shape=(4,), activation=tf.nn.relu))
        model.add(tf.keras.layers.Dense(64, activation=tf.nn.relu))
        model.add(tf.keras.layers.Dense(64, activation=tf.nn.relu))
        model.add(tf.keras.layers.Dense(1, activation=tf.nn.relu))
        optimizer = tf.keras.optimizers.Adam(0.001)
        model.compile(optimizer=optimizer, loss='mse', metrics=['accuracy'])
        return model


    def TrainModel(self):
        X = []
        y = []

        for symbol in self.symbols:
            # self.debug(self.data[symbol].shape[0])
            if self.data[symbol].shape[0] < self.lookback:
                continue

            for i in range(0, self.lookback):
                # Flatten the 3d data into 2d, lose symbol information
                # self.debug(f"X: {X}")
                # self.debug(self.data)
                features = self.data[symbol].iloc[i].values.flatten()
                X.append(features)

            for i in range(0, self.lookback - 1):
                # Get the target label (return)
                symbol_return = (self.data[symbol].iloc[i + 1]['close'] - self.data[symbol].iloc[i]['close']) / self.data[symbol].iloc[i]['close']
                y.append(symbol_return)

            # X discard the last row
            X = X[:-1]

        # train the MLP model
        if len(X) > 0 and len(y) > 0:
            X = np.array(X)
            y = np.array(y)
            X = X.reshape(-1, 4)
            # self.debug(f"X shape: {X.shape}")
            # self.debug(f"y length: {len(y)}")
            
            # normalize the data
            X = (X - X.mean()) / X.std()
            y = (y - y.mean()) / y.std()

            self.debug(f"y: {y}")
            
            # Train the MLP model
            self.model = self.ModelConstruction()
            self.model.fit(X, y, epochs=10, batch_size=32, verbose=1)
            self.Debug("Model trained with {} samples.".format(len(X)))
        else:
            self.Debug("Not enough data to train the model.")


    def MakePredictions(self):
        if not self.model:
            self.Debug("Model is not trained yet")
            return

        for symbol in self.symbols:
            if len(self.data[symbol]) < self.lookback:
                self.Debug(f"Not enough data for {symbol}")
                continue

            features = self.data[symbol][['open', 'high', 'low',
                                          'close']].iloc[-1].values.flatten().reshape(-1, 4)
            # self.debug(f"Features shape: {features.shape}")
            self.debug(f"Features: {features}")
            prediction = self.model.predict(features)
            self.debug(f"Prediction for symbol {symbol}: {prediction}")