Overall Statistics
Total Orders
1910
Average Win
3.36%
Average Loss
-2.44%
Compounding Annual Return
423.349%
Drawdown
35.100%
Expectancy
0.145
Start Equity
100000
End Equity
1300058.7
Net Profit
1200.059%
Sharpe Ratio
3.95
Sortino Ratio
4.315
Probabilistic Sharpe Ratio
86.849%
Loss Rate
52%
Win Rate
48%
Profit-Loss Ratio
1.38
Alpha
3.059
Beta
3.781
Annual Standard Deviation
0.908
Annual Variance
0.824
Information Ratio
3.964
Tracking Error
0.869
Treynor Ratio
0.948
Total Fees
$80156.30
Estimated Strategy Capacity
$53000000.00
Lowest Capacity Asset
NQ YLZ9Z50BJE2P
Portfolio Turnover
5283.82%
# region imports
from AlgorithmImports import *
from datetime import timedelta
import numpy as np
from sklearn.linear_model import LinearRegression
# endregion


class VolumeProfileAlgorithm(QCAlgorithm):
    def initialize(self):
        self.set_start_date(2023, 1, 1)
        self.set_end_date(2024, 7, 21)
        self.set_cash(100000)

        # Set the symbol of the asset we want to trade
        future = self.add_future(
            Futures.Indices.NASDAQ_100_E_MINI, Resolution.MINUTE
        )
        future.set_filter(timedelta(0), timedelta(182))
        self.future_symbol = future.symbol
        self.futures_contract = None
        self.contract_count = 0

        # Volume Profile indicator settings
        self.profile_period = 120  # 2 hours
        self.value_area_percentage = 0.4
        self.volume_profile = VolumeProfile(
            "Volume Profile", self.profile_period, self.value_area_percentage
        )

        # Rolling window to store past prices
        self.past_prices_period = 20
        self.past_prices = RollingWindow[TradeBar](self.past_prices_period)

        # Consolidate data
        self.consolidate(
            self.future_symbol, timedelta(minutes=1), self.on_data_consolidated
        )
        self.register_indicator(
            self.future_symbol, self.volume_profile, timedelta(hours=2)
        )

        # Setting stoploss
        self.stop_loss_len = 100
        self.stop_loss_indicator = self.min(
            self.future_symbol, self.stop_loss_len, Resolution.MINUTE
        )
        self.stop_loss_price = 0

        # Warm up using historical method
        self.set_warm_up(timedelta(hours = 2))
        history = self.history[TradeBar](self.future_symbol,timedelta(days=1),Resolution.MINUTE)
        for trade_bar in history:
            self.volume_profile.update(trade_bar)
        self.log("Finished warming up indicator")
        
        # Free portfolio setting
        self.settings.free_portfolio_value = 0.3

    def on_data_consolidated(self, data: Slice):
        # Store the past prices of the future contract
        self.past_prices.add(data)

    def on_data(self, data: Slice):
        # Check if the strategy warm up period is over and indicators are ready
        if self.is_warming_up or not self.volume_profile.is_ready or not self.past_prices.is_ready or not self.stop_loss_indicator.is_ready:
            # self.log(
            #     f"Warming up: {self.is_warming_up}, Volume Profile Ready: {self.volume_profile.is_ready}, Past Prices Ready: {self.past_prices.is_ready}")
            return

        current_price = self.past_prices[0].close

        # Verify entry criteria to invest
        if not self.portfolio.invested:
            self.log("Not invested! Finding futures contract...")
            # Find the future contract with the max open interest above 1000
            # This for-loop works because we're only checking one futures security

            for chain in data.future_chains:
                popular_contracts = [
                    contract for contract in chain.value if contract.open_interest > 1000
                ]

                if len(popular_contracts) == 0:
                    continue

                self.futures_contract = max(
                    popular_contracts, key=lambda k: k.open_interest)

            # Check if price is moving towards the value area based on the direction of the slope
            # and the volume profile
            past_prices = [x.close for x in self.past_prices if x is not None]
            slope = self.compute_slope(past_prices)

            # Log the indicators and price
            self.log(f"Current Price: {current_price} and Slope: {slope}")
            self.log(f"Value Area High: {self.volume_profile.value_area_high}")
            self.log(f"Value Area Low: {self.volume_profile.value_area_low}")

            if (self.volume_profile.value_area_low <= current_price <= self.volume_profile.value_area_high):
                # Long condition
                if slope < -0.5:
                    self.log(
                        "Price is moving towards the value area! Invest!")
                    self.set_holdings(self.futures_contract.symbol, 1)
                    self.stop_loss_price = self.stop_loss_indicator.current.value
                    self.log(
                        f"Current price: {current_price}, stop order price: {self.stop_loss_price}")
                else:
                    self.log("Price isn't in value area, keep waiting...")

        # Exit or update exit stop loss price
        else:
            # Exit check
            if current_price < self.stop_loss_price:
                self.log(f"Stop loss at {current_price}")
                self.liquidate(self.futures_contract.symbol)
            # Check if you should update stop loss price
            elif self.past_prices[0].close > self.past_prices[1].close:
                self.stop_loss_price = self.stop_loss_price + \
                    (self.past_prices[0].close - self.past_prices[1].close)
                self.log(
                    f"Updating stop loss order of {self.stop_loss_price}!")

        # Plotting the data
        # self.plot("VolumeProfile","vp", self.volume_profile.current.value)
        # self.plot("VolumeProfile","profile_high", self.volume_profile.profile_high)
        # self.plot("VolumeProfile","profile_low", self.volume_profile.profile_low)
        # self.plot("VolumeProfile","poc_price", self.volume_profile.poc_price)
        # self.plot("VolumeProfile","poc_volume", self.volume_profile.poc_volume)
        # self.plot("VolumeProfile","value_area_volume", self.volume_profile.value_area_volume)
        # self.plot("VolumeProfile","value_area_high", self.volume_profile.value_area_high)
        # self.plot("VolumeProfile","value_area_low", self.volume_profile.value_area_low)
        # self.plot("VolumeProfile","current_price", self.past_prices[0].close)

    def compute_slope(self, prices: list) -> float:
        # Convert list to numpy array and reshape to 2D for sklearn
        prices_array = np.array(prices).reshape(-1, 1)

        # Create an array of indices representing time
        times = np.array(range(len(prices))).reshape(-1, 1)

        # Fit a linear regression model
        model = LinearRegression().fit(times, prices_array)

        # Return the slope of the regression line
        return model.coef_[0][0]



# region imports
from AlgorithmImports import *
from datetime import timedelta
import numpy as np
from sklearn.linear_model import LinearRegression
# endregion


class VolumeProfileAlgorithm(QCAlgorithm):
    def initialize(self):
        self.set_start_date(2023, 1, 1)
        self.set_end_date(2024, 8, 1)
        self.set_cash(100000)

        # Set the symbol of the asset we want to trade
        future = self.add_future(
            Futures.Indices.NASDAQ_100_E_MINI, Resolution.MINUTE
        )
        future.set_filter(timedelta(0), timedelta(182))
        self.future_symbol = future.symbol
        self.futures_contract = None
        self.contract_count = 0

        # Volume Profile indicator settings
        self.profile_period = 120  # 2 hours
        self.value_area_percentage = 0.4
        self.volume_profile = VolumeProfile(
            "Volume Profile", self.profile_period, self.value_area_percentage
        )

        # Rolling window to store past prices
        self.past_prices_period = 20
        self.past_prices = RollingWindow[TradeBar](self.past_prices_period)

        # Long or short position
        self.is_long = True

        # Consolidate data
        self.consolidate(
            self.future_symbol, timedelta(minutes=1), self.on_data_consolidated
        )
        self.register_indicator(
            self.future_symbol, self.volume_profile, timedelta(hours=2)
        )

        # Setting stoploss
        self.stop_loss_len = 100
        self.stop_loss_indicator = self.min(
            self.future_symbol, self.stop_loss_len, Resolution.MINUTE
        )
        self.stop_loss_price = 0

        # Warm up period
        self.set_warm_up(timedelta(days=2))

        # Free portfolio setting
        self.settings.free_portfolio_value = 0.3

    def on_data_consolidated(self, data: Slice):
        # Store the past prices of the future contract
        self.past_prices.add(data)

    def on_data(self, data: Slice):
        # Check if the strategy warm up period is over and indicators are ready
        if self.is_warming_up or not self.volume_profile.is_ready or not self.past_prices.is_ready or not self.stop_loss_indicator.is_ready:
            # self.log(
            #     f"Warming up: {self.is_warming_up}, Volume Profile Ready: {self.volume_profile.is_ready}, Past Prices Ready: {self.past_prices.is_ready}")
            return

        current_price = self.past_prices[0].close

        # Verify entry criteria to invest
        if not self.portfolio.invested:
            self.log("Not invested! Finding futures contract...")
            # Find the future contract with the max open interest above 1000
            # This for-loop works because we're only checking one futures security

            for chain in data.future_chains:
                popular_contracts = [
                    contract for contract in chain.value if contract.open_interest > 1000
                ]

                if len(popular_contracts) == 0:
                    continue

                self.futures_contract = max(
                    popular_contracts, key=lambda k: k.open_interest)

            # Check if price is moving towards the value area based on the direction of the slope
            # and the volume profile
            past_prices = [x.close for x in self.past_prices if x is not None]
            slope = self.compute_slope(past_prices)

            # Log the indicators and price
            self.log(f"Current Price: {current_price} and Slope: {slope}")
            self.log(f"Value Area High: {self.volume_profile.value_area_high}")
            self.log(f"Value Area Low: {self.volume_profile.value_area_low}")

            if (self.volume_profile.value_area_low <= current_price <= self.volume_profile.value_area_high):
                # Long condition
                if slope < -0.5:
                    self.log(
                        "Price is moving towards the value area! Invest!")
                    self.set_holdings(self.futures_contract.symbol, 1)
                    self.stop_loss_price = self.stop_loss_indicator.current.value
                    self.log(
                        f"Current price: {current_price}, stop order price: {self.stop_loss_price}")
                else:
                    self.log("Price isn't in value area, keep waiting...")

        # Exit or update exit stop loss price
        else:
            # Exit check
            if current_price < self.stop_loss_price:
                self.log(f"Stop loss at {current_price}")
                self.liquidate(self.futures_contract.symbol)
            # Check if you should update stop loss price
            elif self.past_prices[0].close > self.past_prices[1].close:
                self.stop_loss_price = self.stop_loss_price + \
                    (self.past_prices[0].close - self.past_prices[1].close)
                self.log(
                    f"Updating stop loss order of {self.stop_loss_price}!")

        # Plotting the data
        # self.plot("VolumeProfile","vp", self.volume_profile.current.value)
        # self.plot("VolumeProfile","profile_high", self.volume_profile.profile_high)
        # self.plot("VolumeProfile","profile_low", self.volume_profile.profile_low)
        # self.plot("VolumeProfile","poc_price", self.volume_profile.poc_price)
        # self.plot("VolumeProfile","poc_volume", self.volume_profile.poc_volume)
        # self.plot("VolumeProfile","value_area_volume", self.volume_profile.value_area_volume)
        # self.plot("VolumeProfile","value_area_high", self.volume_profile.value_area_high)
        # self.plot("VolumeProfile","value_area_low", self.volume_profile.value_area_low)
        # self.plot("VolumeProfile","current_price", self.past_prices[0].close)

    def compute_slope(self, prices: list) -> float:
        # Convert list to numpy array and reshape to 2D for sklearn
        prices_array = np.array(prices).reshape(-1, 1)

        # Create an array of indices representing time
        times = np.array(range(len(prices))).reshape(-1, 1)

        # Fit a linear regression model
        model = LinearRegression().fit(times, prices_array)

        # Return the slope of the regression line
        return model.coef_[0][0]
# region imports
from AlgorithmImports import *
from datetime import timedelta
import numpy as np
from sklearn.linear_model import LinearRegression
# endregion


class VolumeProfileAlgorithm(QCAlgorithm):
    def initialize(self):
        self.set_start_date(2020, 1, 1)
        self.set_end_date(2024, 6, 1)
        self.set_cash(100000)

        # Set the symbol of the asset we want to trade
        future = self.add_future(
            Futures.Indices.NASDAQ_100_E_MINI, 
            Resolution.MINUTE,
            data_normalization_mode = DataNormalizationMode.RAW
        )
        future.set_filter(timedelta(0), timedelta(182))
        self.future_symbol = future.symbol
        self.futures_contract = None
        self.contract_count = 0

        # Volume Profile indicator settings
        self.profile_period = 234 * 5  # So 5 min would be 234 5 min period
        self.value_area_percentage = 0.4
        self.volume_profile = VolumeProfile(
            "Volume Profile", self.profile_period, self.value_area_percentage
        )

        # Rolling window to store past prices
        self.past_prices_period = 5
        self.past_prices = RollingWindow[TradeBar](self.past_prices_period)

        # Long or short position
        self.is_long = True

        # Consolidate data
        self.consolidate(
            self.future_symbol, timedelta(minutes=5), self.on_data_consolidated
        )
        self.register_indicator(
            self.future_symbol, self.volume_profile, timedelta(hours=2)
        )

        # Warm up period
        self.set_warm_up(timedelta(days=2))

        # Free portfolio setting
        self.settings.free_portfolio_value = 0.3

    def on_data_consolidated(self, data: Slice):
        # Store the past prices of the future contract
        self.past_prices.add(data)

        # Check if the strategy warm up period is over and indicators are ready
        if self.is_warming_up or not self.volume_profile.is_ready or not self.past_prices.is_ready:
            # self.log(
            #     f"Warming up: {self.is_warming_up}, Volume Profile Ready: {self.volume_profile.is_ready}, Past Prices Ready: {self.past_prices.is_ready}")
            return

        current_price = self.past_prices[0].close

        # Verify entry criteria to invest
        if not self.portfolio.invested:
            self.log("Not invested! Finding futures contract...")
            # Find the future contract with the max open interest above 1000
            # This for-loop only iterates ONCE because 
            # it's only checking one futures security...I think
            for chain in self.future_chains:
                popular_contracts = [
                    contract for contract in chain.value if contract.open_interest > 1000
                ]

                if len(popular_contracts) == 0:
                    continue

                self.futures_contract = max(
                    popular_contracts, key=lambda k: k.open_interest)

            # Check if price is moving towards the value area based on the direction of the slope
            # and the volume profile
            past_prices = [x.close for x in self.past_prices if x is not None]
            slope = self.compute_slope(past_prices)

            # # Log the indicators and price
            # self.log(f"Current Price: {current_price} and Slope: {slope}")
            # self.log(f"Value Area High: {self.volume_profile.value_area_high}")
            # self.log(f"Value Area Low: {self.volume_profile.value_area_low}")

            # Find the range for low volume node
            top_range = self.volume_profile.profile_high - self.volume_profile.value_area_high
            bottom_range = self.volume_profile.value_area_low - self.volume_profile.profile_low
            heavy_top = top_range < bottom_range

            # Check if price is inside value_area_high <-> volume_profile_high
            if heavy_top and self.volume_profile.value_area_low >= current_price >= (self.volume_profile.profile_low + bottom_range * .5):
                # If price is inside check if the price is decline rapidly
                if slope < -1.5:
                    # Entry
                    self.set_holdings(self.futures_contract.symbol, -1)
                    # Set the stoploss
                    self.stop_loss = self.volume_profile.poc_price
                    # Set the take profit
                    self.take_profit = (self.volume_profile.profile_low + bottom_range * .5)

        # Exit or update exit stop loss price
        else:
            # Exit check
            if current_price >= self.stop_loss:
                self.liquidate(self.futures_contract.symbol,tag = 'Stop Loss')
            elif current_price <= self.take_profit:
                self.liquidate(self.futures_contract.symbol,tag = 'Take Profit')
            # Trailing
            # elif self.past_prices[0].close > self.past_prices[1].close:
            #     self.stop_loss_price = self.stop_loss_price + \
            #         (self.past_prices[0].close - self.past_prices[1].close)
            #     self.log(
            #         f"Updating stop loss order of {self.stop_loss_price}!")


    def on_data(self, data: Slice):
        self.future_chains = data.future_chains

    def compute_slope(self, prices: list) -> float:
        # Convert list to numpy array and reshape to 2D for sklearn
        prices_array = np.array(prices).reshape(-1, 1)

        # Create an array of indices representing time
        times = np.array(range(len(prices))).reshape(-1, 1)

        # Fit a linear regression model
        model = LinearRegression().fit(times, prices_array)

        # Return the slope of the regression line
        return model.coef_[0][0]