Overall Statistics
Total Orders
1718
Average Win
3.36%
Average Loss
-2.43%
Compounding Annual Return
408.329%
Drawdown
33.400%
Expectancy
0.138
Start Equity
100000
End Equity
854778.4
Net Profit
754.778%
Sharpe Ratio
3.861
Sortino Ratio
4.331
Probabilistic Sharpe Ratio
82.758%
Loss Rate
52%
Win Rate
48%
Profit-Loss Ratio
1.38
Alpha
3.178
Beta
3.852
Annual Standard Deviation
0.948
Annual Variance
0.9
Information Ratio
3.893
Tracking Error
0.908
Treynor Ratio
0.951
Total Fees
$60251.60
Estimated Strategy Capacity
$79000000.00
Lowest Capacity Asset
NQ YJHOAMPYKQGX
Portfolio Turnover
5464.47%
# region imports
from AlgorithmImports import *
from datetime import timedelta
import numpy as np
from sklearn.linear_model import LinearRegression
# endregion


class VolumeProfileAlgorithm(QCAlgorithm):
    def initialize(self):
        self.set_start_date(2023, 1, 1)
        self.set_end_date(2024, 7, 21)
        self.set_cash(100000)

        # Set the symbol of the asset we want to trade
        future = self.add_future(
            Futures.Indices.NASDAQ_100_E_MINI, Resolution.MINUTE
        )
        future.set_filter(timedelta(0), timedelta(182))
        self.future_symbol = future.symbol
        self.futures_contract = None
        self.contract_count = 0

        # Volume Profile indicator settings
        self.profile_period = 120  # 2 hours
        self.value_area_percentage = 0.4
        self.volume_profile = VolumeProfile(
            "Volume Profile", self.profile_period, self.value_area_percentage
        )

        # Rolling window to store past prices
        self.past_prices_period = 20
        self.past_prices = RollingWindow[TradeBar](self.past_prices_period)

        # Consolidate data
        self.consolidate(
            self.future_symbol, timedelta(minutes=1), self.on_data_consolidated
        )
        self.register_indicator(
            self.future_symbol, self.volume_profile, timedelta(hours=2)
        )

        # Setting stoploss
        self.stop_loss_len = 100
        self.stop_loss_indicator = self.min(
            self.future_symbol, self.stop_loss_len, Resolution.MINUTE
        )
        self.stop_loss_price = 0

        # Warm up using historical method
        history = self.history[TradeBar](self.future_symbol, timedelta(days=1), Resolution.MINUTE)
        for trade_bar in history:
            self.volume_profile.update(trade_bar)
            self.stop_loss_indicator.update(trade_bar.end_time, trade_bar.close)
            self.past_prices.add(trade_bar)
        self.log("Finished warming up indicator")
        
        # Free portfolio setting
        self.settings.free_portfolio_value = 0.3

    def on_data_consolidated(self, data: Slice):
        # Store the past prices of the future contract
        self.past_prices.add(data)

    def on_data(self, data: Slice):
        # Check if the strategy warm up period is over and indicators are ready
        if self.is_warming_up or not self.volume_profile.is_ready or not self.past_prices.is_ready or not self.stop_loss_indicator.is_ready:
            # self.log(
            #     f"Warming up: {self.is_warming_up}, Volume Profile Ready: {self.volume_profile.is_ready}, Past Prices Ready: {self.past_prices.is_ready}")
            return

        current_price = self.past_prices[0].close

        # Verify entry criteria to invest
        if not self.portfolio.invested:
            self.log("Not invested! Finding futures contract...")
            # Find the future contract with the max open interest above 1000
            # This for-loop works because we're only checking one futures security

            for chain in data.future_chains:
                popular_contracts = [
                    contract for contract in chain.value if contract.open_interest > 1000
                ]

                if len(popular_contracts) == 0:
                    continue

                self.futures_contract = max(
                    popular_contracts, key=lambda k: k.open_interest)

            # Check if price is moving towards the value area based on the direction of the slope
            # and the volume profile
            past_prices = [x.close for x in self.past_prices if x is not None]
            slope = self.compute_slope(past_prices)

            # Log the indicators and price
            self.log(f"Current Price: {current_price} and Slope: {slope}")
            self.log(f"Value Area High: {self.volume_profile.value_area_high}")
            self.log(f"Value Area Low: {self.volume_profile.value_area_low}")

            if (self.volume_profile.value_area_low <= current_price <= self.volume_profile.value_area_high):
                # Long condition
                if slope < -0.5:
                    self.log(
                        "Price is moving towards the value area! Invest!")
                    self.set_holdings(self.futures_contract.symbol, 1)
                    self.stop_loss_price = self.stop_loss_indicator.current.value
                    self.log(
                        f"Current price: {current_price}, stop order price: {self.stop_loss_price}")
                else:
                    self.log("Price isn't in value area, keep waiting...")

        # Exit or update exit stop loss price
        else:
            # Exit check
            if current_price < self.stop_loss_price:
                self.log(f"Stop loss at {current_price}")
                self.liquidate(self.futures_contract.symbol)
            # Check if you should update stop loss price
            elif self.past_prices[0].close > self.past_prices[1].close:
                self.stop_loss_price = self.stop_loss_price + \
                    (self.past_prices[0].close - self.past_prices[1].close)
                self.log(
                    f"Updating stop loss order of {self.stop_loss_price}!")

        # Plotting the data
        # self.plot("VolumeProfile","vp", self.volume_profile.current.value)
        # self.plot("VolumeProfile","profile_high", self.volume_profile.profile_high)
        # self.plot("VolumeProfile","profile_low", self.volume_profile.profile_low)
        # self.plot("VolumeProfile","poc_price", self.volume_profile.poc_price)
        # self.plot("VolumeProfile","poc_volume", self.volume_profile.poc_volume)
        # self.plot("VolumeProfile","value_area_volume", self.volume_profile.value_area_volume)
        # self.plot("VolumeProfile","value_area_high", self.volume_profile.value_area_high)
        # self.plot("VolumeProfile","value_area_low", self.volume_profile.value_area_low)
        # self.plot("VolumeProfile","current_price", self.past_prices[0].close)

    def compute_slope(self, prices: list) -> float:
        # Convert list to numpy array and reshape to 2D for sklearn
        prices_array = np.array(prices).reshape(-1, 1)

        # Create an array of indices representing time
        times = np.array(range(len(prices))).reshape(-1, 1)

        # Fit a linear regression model
        model = LinearRegression().fit(times, prices_array)

        # Return the slope of the regression line
        return model.coef_[0][0]