Overall Statistics
Total Orders
218
Average Win
9.15%
Average Loss
-6.08%
Compounding Annual Return
87.561%
Drawdown
41.600%
Expectancy
0.241
Start Equity
100000
End Equity
311640.5
Net Profit
211.640%
Sharpe Ratio
1.301
Sortino Ratio
0.882
Probabilistic Sharpe Ratio
49.821%
Loss Rate
50%
Win Rate
50%
Profit-Loss Ratio
1.51
Alpha
0.574
Beta
1.758
Annual Standard Deviation
0.63
Annual Variance
0.397
Information Ratio
1.12
Tracking Error
0.607
Treynor Ratio
0.466
Total Fees
$4579.50
Estimated Strategy Capacity
$350000000.00
Lowest Capacity Asset
NQ YOGVNNAOI1OH
Portfolio Turnover
500.27%
# region imports
from AlgorithmImports import *
from datetime import timedelta
import numpy as np
from sklearn.linear_model import LinearRegression
# endregion


class VolumeProfileAlgorithm(QCAlgorithm):
    def initialize(self):
        self.set_start_date(2023, 1, 1)
        self.set_end_date(2024, 10, 21)
        self.set_cash(100000)

        # Set the symbol of the asset we want to trade
        future = self.add_future(
            Futures.Indices.NASDAQ_100_E_MINI, Resolution.MINUTE
        )
        future.set_filter(timedelta(0), timedelta(182))
        self.future_symbol = future.symbol
        self.futures_contract = None
        self.contract_count = 0

        # Volume Profile indicator settings
        self.profile_period = 120  # 2 hours
        self.value_area_percentage = 0.4
        self.volume_profile = VolumeProfile(
            "Volume Profile", self.profile_period, self.value_area_percentage
        )

        # Rolling window to store past prices
        self.past_prices_period = 21
        self.past_prices = RollingWindow[TradeBar](self.past_prices_period)

        # Consolidate data
        self.consolidate(
            self.future_symbol, timedelta(minutes=1), self.on_data_consolidated
        )
        self.register_indicator(
            self.future_symbol, self.volume_profile, timedelta(hours=2)
        )

        # Create TEMA indicator
        self.tema = self.tema(self.future_symbol, 7000, Resolution.MINUTE)

        # Create EMA indicator
        self.ema = self.ema(self.future_symbol, 30, 0.5, Resolution.MINUTE)

        # Warm up using historical method
        history = self.history[TradeBar](self.future_symbol, timedelta(days=1), Resolution.MINUTE)
        for trade_bar in history:
            self.volume_profile.update(trade_bar)
            self.past_prices.add(trade_bar)
        self.log("Finished warming up indicator")
        
        # Free portfolio setting
        self.settings.free_portfolio_value = 0.3

        # Setting trailing stop loss
        self.trailing_stop_loss_percentage = 0.005  # 0.5%
        self.highest_price_since_entry = 0
        self.stop_loss_price = 0

    def on_data_consolidated(self, data: Slice):
        # Store the past prices of the future contract
        self.past_prices.add(data)

    def on_data(self, data: Slice):
        # Check if the strategy warm up period is over and indicators are ready
        if (self.is_warming_up or not self.volume_profile.is_ready or 
            not self.past_prices.is_ready or not self.tema.is_ready or
            not self.ema.is_ready):
            return

        current_price = self.past_prices[0].close

        # Verify entry criteria to invest
        if not self.portfolio.invested:
            self.log("Not invested! Finding futures contract...")
            # Find the future contract with the max open interest above 1000
            # This for-loop works because we're only checking one futures security

            for chain in data.future_chains:
                popular_contracts = [
                    contract for contract in chain.value if contract.open_interest > 1000
                ]

                if len(popular_contracts) == 0:
                    continue

                self.futures_contract = max(
                    popular_contracts, key=lambda k: k.open_interest)

            # Check if price is moving towards the value area based on the direction of the slope
            # and the volume profile
            past_prices = [x.close for x in self.past_prices if x is not None]
            slope = self.compute_slope(past_prices)

            # Log the indicators and price
            self.log(f"Current Price: {current_price} and Slope: {slope}")
            self.log(f"Value Area High: {self.volume_profile.value_area_high}")
            self.log(f"Value Area Low: {self.volume_profile.value_area_low}")
            self.log(f"TEMA: {self.tema.current.value}")
            self.log(f"EMA: {self.ema.current.value}")

            if (self.volume_profile.value_area_low <= current_price <= self.volume_profile.value_area_high):
                # Long condition
                if (slope < -0.4 and 
                    current_price > self.tema.current.value and
                    current_price > self.ema.current.value):
                    self.log(
                        "Price is moving towards the value area, above TEMA, and above EMA! Invest!")
                    self.set_holdings(self.futures_contract.symbol, 1)
                    self.highest_price_since_entry = current_price
                    self.stop_loss_price = current_price * (1 - self.trailing_stop_loss_percentage)
                    self.log(
                        f"Current price: {current_price}, initial stop loss price: {self.stop_loss_price}")
                else:
                    self.log("Price isn't in value area, above TEMA, or above EMA, keep waiting...")

        # Exit or update exit stop loss price
        else:
            # Update highest price since entry
            if current_price > self.highest_price_since_entry:
                self.highest_price_since_entry = current_price
                new_stop_loss = current_price * (1 - self.trailing_stop_loss_percentage)
                if new_stop_loss > self.stop_loss_price:
                    self.stop_loss_price = new_stop_loss
                    self.log(f"Updating trailing stop loss to: {self.stop_loss_price}")

            # Exit check
            if current_price < self.stop_loss_price:
                self.log(f"Stop loss triggered at {current_price}")
                self.liquidate(self.futures_contract.symbol)
                self.highest_price_since_entry = 0  # Reset for next trade

        # Plotting the data (commented out)
        # self.plot("VolumeProfile", "current_price", self.past_prices[0].close)
        # self.plot("VolumeProfile", "value_area_high", self.volume_profile.value_area_high)
        # self.plot("VolumeProfile", "value_area_low", self.volume_profile.value_area_low)
        # self.plot("VolumeProfile", "TEMA", self.tema.current.value)
        # self.plot("VolumeProfile", "EMA", self.ema.current.value)
        # self.plot("VolumeProfile","vp", self.volume_profile.current.value)
        # self.plot("VolumeProfile","profile_high", self.volume_profile.profile_high)
        # self.plot("VolumeProfile","profile_low", self.volume_profile.profile_low)
        # self.plot("VolumeProfile","poc_price", self.volume_profile.poc_price)
        # self.plot("VolumeProfile","poc_volume", self.volume_profile.poc_volume)
        # self.plot("VolumeProfile","value_area_volume", self.volume_profile.value_area_volume)
        # self.plot("VolumeProfile","value_area_high", self.volume_profile.value_area_high)
        # self.plot("VolumeProfile","value_area_low", self.volume_profile.value_area_low)
        # self.plot("VolumeProfile","current_price", self.past_prices[0].close)

    def compute_slope(self, prices: list) -> float:
        # Convert list to numpy array and reshape to 2D for sklearn
        prices_array = np.array(prices).reshape(-1, 1)

        # Create an array of indices representing time
        times = np.array(range(len(prices))).reshape(-1, 1)

        # Fit a linear regression model
        model = LinearRegression().fit(times, prices_array)

        # Return the slope of the regression line
        return model.coef_[0][0]