Overall Statistics
Total Orders
906
Average Win
1.11%
Average Loss
-0.94%
Compounding Annual Return
23.400%
Drawdown
22.600%
Expectancy
0.109
Start Equity
1000000
End Equity
1513747.5
Net Profit
51.375%
Sharpe Ratio
0.633
Sortino Ratio
0.483
Probabilistic Sharpe Ratio
38.688%
Loss Rate
49%
Win Rate
51%
Profit-Loss Ratio
1.18
Alpha
0.031
Beta
0.733
Annual Standard Deviation
0.199
Annual Variance
0.04
Information Ratio
-0.019
Tracking Error
0.186
Treynor Ratio
0.172
Total Fees
$26982.50
Estimated Strategy Capacity
$120000000.00
Lowest Capacity Asset
NQ YOGVNNAOI1OH
Portfolio Turnover
403.23%
# region imports
from AlgorithmImports import *
from datetime import timedelta
import numpy as np
from sklearn.linear_model import LinearRegression
from sklearn.feature_selection import f_regression
# endregion


class VolumeProfileAlgorithm(QCAlgorithm):
    def initialize(self):
        self.set_start_date(2023, 1, 1)
        self.set_end_date(2024, 12, 21)
        self.set_cash(1000000)

        # Set the symbol of the asset we want to trade
        future = self.add_future(
            Futures.Indices.NASDAQ_100_E_MINI, Resolution.MINUTE
        )
        future.set_filter(timedelta(0), timedelta(182))
        self.future_symbol = future.symbol
        self.futures_contract = None
        self.contract_count = 0

        # Volume Profile indicator settings
        self.profile_period = 120  # 2 hours
        self.value_area_percentage = 0.4
        self.volume_profile = VolumeProfile(
            "Volume Profile", self.profile_period, self.value_area_percentage
        )

        # Rolling window to store past prices
        self.past_prices_period = 20
        self.past_prices = RollingWindow[TradeBar](self.past_prices_period)

        # Consolidate data
        self.consolidate(
            self.future_symbol, timedelta(minutes=1), self.on_data_consolidated
        )
        self.register_indicator(
            self.future_symbol, self.volume_profile, timedelta(hours=2)
        )

        # Setting stoploss
        self.stop_loss_len = 100
        self.stop_loss_indicator = self.min(
            self.future_symbol, self.stop_loss_len, Resolution.MINUTE
        )
        self.stop_loss_price = 0

        # Warm up using historical method
        history = self.history[TradeBar](self.future_symbol, timedelta(days=1), Resolution.MINUTE)
        for trade_bar in history:
            self.volume_profile.update(trade_bar)
            self.stop_loss_indicator.update(trade_bar.end_time, trade_bar.close)
            self.past_prices.add(trade_bar)
        self.log("Finished warming up indicator")
        
        # Free portfolio setting
        self.settings.free_portfolio_value = 0.3

    def on_data_consolidated(self, data: Slice):
        # Store the past prices of the future contract
        self.past_prices.add(data)

    def on_data(self, data: Slice):
        # Check if the strategy warm up period is over and indicators are ready
        if self.is_warming_up or not self.volume_profile.is_ready or not self.past_prices.is_ready or not self.stop_loss_indicator.is_ready:
            # self.log(
            #     f"Warming up: {self.is_warming_up}, Volume Profile Ready: {self.volume_profile.is_ready}, Past Prices Ready: {self.past_prices.is_ready}")
            return

        current_price = self.past_prices[0].close

        # Verify entry criteria to invest
        if not self.portfolio.invested:
            # self.log("Not invested! Finding futures contract...")
            # Find the future contract with the max open interest above 1000
            # This for-loop works because we're only checking one futures security

            for chain in data.future_chains:
                popular_contracts = [
                    contract for contract in chain.value if contract.open_interest > 1000
                ]

                if len(popular_contracts) == 0:
                    continue

                self.futures_contract = max(
                    popular_contracts, key=lambda k: k.open_interest)

            # Check if price is moving towards the value area based on the direction of the slope
            # and the volume profile
            past_prices = [x.close for x in self.past_prices if x is not None]
            slope = self.compute_slope(past_prices)
            # Find the attack angle should be greater than -45 degrees
            angle = np.arctan(slope) * 180 / np.pi

            # # Log the indicators and price
            # self.log(f"Current Price: {current_price} and Angle: {angle}")
            # self.log(f"Value Area High: {self.volume_profile.value_area_high}")
            # self.log(f"Value Area Low: {self.volume_profile.value_area_low}")

            if (self.volume_profile.value_area_low < current_price < self.volume_profile.value_area_high):
                # Only enter if the attack angle is greater than -90 and less than -60
                if angle > -90 and angle < -60:
                    self.log(
                        "Price is moving towards the value area! Invest!")
                    self.set_holdings(self.futures_contract.symbol, .2)
                    self.stop_loss_price = self.volume_profile.value_area_low
                    self.log(f"Current price: {current_price}, Angle: {angle}")
                    self.log(f"Value Area High: {self.volume_profile.value_area_high}")
                    self.log(f"Value Area Low: {self.volume_profile.value_area_low}")

        # Exit or update exit stop loss price
        else:
            # Exit check
            if current_price <= self.stop_loss_price:
                self.log(f"Stop loss at {current_price}")
                self.liquidate(self.futures_contract.symbol)
            # Check if you should update stop loss price
            elif self.past_prices[0].close > self.past_prices[1].close:
                self.stop_loss_price = self.stop_loss_price + \
                    (self.past_prices[0].close - self.past_prices[1].close)
                self.log(
                    f"Updating stop loss order of {self.stop_loss_price}!")

        # Plotting the data
        # self.plot("VolumeProfile","vp", self.volume_profile.current.value)
        # self.plot("VolumeProfile","profile_high", self.volume_profile.profile_high)
        # self.plot("VolumeProfile","profile_low", self.volume_profile.profile_low)
        # self.plot("VolumeProfile","poc_price", self.volume_profile.poc_price)
        # self.plot("VolumeProfile","poc_volume", self.volume_profile.poc_volume)
        # self.plot("VolumeProfile","value_area_volume", self.volume_profile.value_area_volume)
        # self.plot("VolumeProfile","value_area_high", self.volume_profile.value_area_high)
        # self.plot("VolumeProfile","value_area_low", self.volume_profile.value_area_low)
        # self.plot("VolumeProfile","current_price", self.past_prices[0].close)

    def compute_slope(self, prices: list) -> float:
        # Convert list to numpy array and reshape to 2D for sklearn
        prices_array = np.array(prices).reshape(-1, 1)

        # Create an array of indices representing time
        times = np.array(range(len(prices))).reshape(-1, 1)

        # Fit a linear regression model
        model = LinearRegression().fit(times, prices_array)

        # Find the p-value of the slope
        p_value = f_regression(times, prices_array)[1]

        # Return the slope only if the p value is less than 0.05
        if p_value < 0.05:
            return model.coef_[0][0]
        else:
            return 0
# region imports
from AlgorithmImports import *
from datetime import timedelta
import numpy as np
from sklearn.linear_model import LinearRegression
# endregion


class VolumeProfileAlgorithm(QCAlgorithm):
    def initialize(self):
        self.set_start_date(2023, 1, 1)
        self.set_end_date(2024, 8, 1)
        self.set_cash(100000)

        # Set the symbol of the asset we want to trade
        future = self.add_future(
            Futures.Indices.NASDAQ_100_E_MINI, Resolution.MINUTE
        )
        future.set_filter(timedelta(0), timedelta(182))
        self.future_symbol = future.symbol
        self.futures_contract = None
        self.contract_count = 0

        # Volume Profile indicator settings
        self.profile_period = 120  # 2 hours
        self.value_area_percentage = 0.4
        self.volume_profile = VolumeProfile(
            "Volume Profile", self.profile_period, self.value_area_percentage
        )

        # Rolling window to store past prices
        self.past_prices_period = 20
        self.past_prices = RollingWindow[TradeBar](self.past_prices_period)

        # Long or short position
        self.is_long = True

        # Consolidate data
        self.consolidate(
            self.future_symbol, timedelta(minutes=1), self.on_data_consolidated
        )
        self.register_indicator(
            self.future_symbol, self.volume_profile, timedelta(hours=2)
        )

        # Setting stoploss
        self.stop_loss_len = 100
        self.stop_loss_indicator = self.min(
            self.future_symbol, self.stop_loss_len, Resolution.MINUTE
        )
        self.stop_loss_price = 0

        # Warm up period
        self.set_warm_up(timedelta(days=2))

        # Free portfolio setting
        self.settings.free_portfolio_value = 0.3

    def on_data_consolidated(self, data: Slice):
        # Store the past prices of the future contract
        self.past_prices.add(data)

    def on_data(self, data: Slice):
        # Check if the strategy warm up period is over and indicators are ready
        if self.is_warming_up or not self.volume_profile.is_ready or not self.past_prices.is_ready or not self.stop_loss_indicator.is_ready:
            # self.log(
            #     f"Warming up: {self.is_warming_up}, Volume Profile Ready: {self.volume_profile.is_ready}, Past Prices Ready: {self.past_prices.is_ready}")
            return

        current_price = self.past_prices[0].close

        # Verify entry criteria to invest
        if not self.portfolio.invested:
            self.log("Not invested! Finding futures contract...")
            # Find the future contract with the max open interest above 1000
            # This for-loop works because we're only checking one futures security

            for chain in data.future_chains:
                popular_contracts = [
                    contract for contract in chain.value if contract.open_interest > 1000
                ]

                if len(popular_contracts) == 0:
                    continue

                self.futures_contract = max(
                    popular_contracts, key=lambda k: k.open_interest)

            # Check if price is moving towards the value area based on the direction of the slope
            # and the volume profile
            past_prices = [x.close for x in self.past_prices if x is not None]
            slope = self.compute_slope(past_prices)

            # Log the indicators and price
            self.log(f"Current Price: {current_price} and Slope: {slope}")
            self.log(f"Value Area High: {self.volume_profile.value_area_high}")
            self.log(f"Value Area Low: {self.volume_profile.value_area_low}")

            if (self.volume_profile.value_area_low <= current_price <= self.volume_profile.value_area_high):
                # Long condition
                if slope < -0.5:
                    self.log(
                        "Price is moving towards the value area! Invest!")
                    self.set_holdings(self.futures_contract.symbol, 1)
                    self.stop_loss_price = self.stop_loss_indicator.current.value
                    self.log(
                        f"Current price: {current_price}, stop order price: {self.stop_loss_price}")
                else:
                    self.log("Price isn't in value area, keep waiting...")

        # Exit or update exit stop loss price
        else:
            # Exit check
            if current_price < self.stop_loss_price:
                self.log(f"Stop loss at {current_price}")
                self.liquidate(self.futures_contract.symbol)
            # Check if you should update stop loss price
            elif self.past_prices[0].close > self.past_prices[1].close:
                self.stop_loss_price = self.stop_loss_price + \
                    (self.past_prices[0].close - self.past_prices[1].close)
                self.log(
                    f"Updating stop loss order of {self.stop_loss_price}!")

        # Plotting the data
        # self.plot("VolumeProfile","vp", self.volume_profile.current.value)
        # self.plot("VolumeProfile","profile_high", self.volume_profile.profile_high)
        # self.plot("VolumeProfile","profile_low", self.volume_profile.profile_low)
        # self.plot("VolumeProfile","poc_price", self.volume_profile.poc_price)
        # self.plot("VolumeProfile","poc_volume", self.volume_profile.poc_volume)
        # self.plot("VolumeProfile","value_area_volume", self.volume_profile.value_area_volume)
        # self.plot("VolumeProfile","value_area_high", self.volume_profile.value_area_high)
        # self.plot("VolumeProfile","value_area_low", self.volume_profile.value_area_low)
        # self.plot("VolumeProfile","current_price", self.past_prices[0].close)

    def compute_slope(self, prices: list) -> float:
        # Convert list to numpy array and reshape to 2D for sklearn
        prices_array = np.array(prices).reshape(-1, 1)

        # Create an array of indices representing time
        times = np.array(range(len(prices))).reshape(-1, 1)

        # Fit a linear regression model
        model = LinearRegression().fit(times, prices_array)

        # Return the slope of the regression line
        return model.coef_[0][0]
# region imports
from AlgorithmImports import *
from datetime import timedelta
import numpy as np
from sklearn.linear_model import LinearRegression
# endregion


class VolumeProfileAlgorithm(QCAlgorithm):
    def initialize(self):
        self.set_start_date(2020, 1, 1)
        self.set_end_date(2024, 6, 1)
        self.set_cash(100000)

        # Set the symbol of the asset we want to trade
        future = self.add_future(
            Futures.Indices.NASDAQ_100_E_MINI, 
            Resolution.MINUTE,
            data_normalization_mode = DataNormalizationMode.RAW
        )
        future.set_filter(timedelta(0), timedelta(182))
        self.future_symbol = future.symbol
        self.futures_contract = None
        self.contract_count = 0

        # Volume Profile indicator settings
        self.profile_period = 234 * 5  # So 5 min would be 234 5 min period
        self.value_area_percentage = 0.4
        self.volume_profile = VolumeProfile(
            "Volume Profile", self.profile_period, self.value_area_percentage
        )

        # Rolling window to store past prices
        self.past_prices_period = 5
        self.past_prices = RollingWindow[TradeBar](self.past_prices_period)

        # Long or short position
        self.is_long = True

        # Consolidate data
        self.consolidate(
            self.future_symbol, timedelta(minutes=5), self.on_data_consolidated
        )
        self.register_indicator(
            self.future_symbol, self.volume_profile, timedelta(hours=2)
        )

        # Warm up period
        self.set_warm_up(timedelta(days=2))

        # Free portfolio setting
        self.settings.free_portfolio_value = 0.3

    def on_data_consolidated(self, data: Slice):
        # Store the past prices of the future contract
        self.past_prices.add(data)

        # Check if the strategy warm up period is over and indicators are ready
        if self.is_warming_up or not self.volume_profile.is_ready or not self.past_prices.is_ready:
            # self.log(
            #     f"Warming up: {self.is_warming_up}, Volume Profile Ready: {self.volume_profile.is_ready}, Past Prices Ready: {self.past_prices.is_ready}")
            return

        current_price = self.past_prices[0].close

        # Verify entry criteria to invest
        if not self.portfolio.invested:
            self.log("Not invested! Finding futures contract...")
            # Find the future contract with the max open interest above 1000
            # This for-loop only iterates ONCE because 
            # it's only checking one futures security...I think
            for chain in self.future_chains:
                popular_contracts = [
                    contract for contract in chain.value if contract.open_interest > 1000
                ]

                if len(popular_contracts) == 0:
                    continue

                self.futures_contract = max(
                    popular_contracts, key=lambda k: k.open_interest)

            # Check if price is moving towards the value area based on the direction of the slope
            # and the volume profile
            past_prices = [x.close for x in self.past_prices if x is not None]
            slope = self.compute_slope(past_prices)

            # # Log the indicators and price
            # self.log(f"Current Price: {current_price} and Slope: {slope}")
            # self.log(f"Value Area High: {self.volume_profile.value_area_high}")
            # self.log(f"Value Area Low: {self.volume_profile.value_area_low}")

            # Find the range for low volume node
            top_range = self.volume_profile.profile_high - self.volume_profile.value_area_high
            bottom_range = self.volume_profile.value_area_low - self.volume_profile.profile_low
            heavy_top = top_range < bottom_range

            # Check if price is inside value_area_high <-> volume_profile_high
            if heavy_top and self.volume_profile.value_area_low >= current_price >= (self.volume_profile.profile_low + bottom_range * .5):
                # If price is inside check if the price is decline rapidly
                if slope < -1.5:
                    # Entry
                    self.set_holdings(self.futures_contract.symbol, -1)
                    # Set the stoploss
                    self.stop_loss = self.volume_profile.poc_price
                    # Set the take profit
                    self.take_profit = (self.volume_profile.profile_low + bottom_range * .5)

        # Exit or update exit stop loss price
        else:
            # Exit check
            if current_price >= self.stop_loss:
                self.liquidate(self.futures_contract.symbol,tag = 'Stop Loss')
            elif current_price <= self.take_profit:
                self.liquidate(self.futures_contract.symbol,tag = 'Take Profit')
            # Trailing
            # elif self.past_prices[0].close > self.past_prices[1].close:
            #     self.stop_loss_price = self.stop_loss_price + \
            #         (self.past_prices[0].close - self.past_prices[1].close)
            #     self.log(
            #         f"Updating stop loss order of {self.stop_loss_price}!")


    def on_data(self, data: Slice):
        self.future_chains = data.future_chains

    def compute_slope(self, prices: list) -> float:
        # Convert list to numpy array and reshape to 2D for sklearn
        prices_array = np.array(prices).reshape(-1, 1)

        # Create an array of indices representing time
        times = np.array(range(len(prices))).reshape(-1, 1)

        # Fit a linear regression model
        model = LinearRegression().fit(times, prices_array)

        # Return the slope of the regression line
        return model.coef_[0][0]