Overall Statistics |
Total Orders 218 Average Win 9.15% Average Loss -6.08% Compounding Annual Return 87.561% Drawdown 41.600% Expectancy 0.241 Start Equity 100000 End Equity 311640.5 Net Profit 211.640% Sharpe Ratio 1.301 Sortino Ratio 0.882 Probabilistic Sharpe Ratio 49.821% Loss Rate 50% Win Rate 50% Profit-Loss Ratio 1.51 Alpha 0.574 Beta 1.758 Annual Standard Deviation 0.63 Annual Variance 0.397 Information Ratio 1.12 Tracking Error 0.607 Treynor Ratio 0.466 Total Fees $4579.50 Estimated Strategy Capacity $350000000.00 Lowest Capacity Asset NQ YOGVNNAOI1OH Portfolio Turnover 500.27% |
# region imports from AlgorithmImports import * from datetime import timedelta import numpy as np from sklearn.linear_model import LinearRegression # endregion class VolumeProfileAlgorithm(QCAlgorithm): def initialize(self): self.set_start_date(2023, 1, 1) self.set_end_date(2024, 10, 21) self.set_cash(100000) # Set the symbol of the asset we want to trade future = self.add_future( Futures.Indices.NASDAQ_100_E_MINI, Resolution.MINUTE ) future.set_filter(timedelta(0), timedelta(182)) self.future_symbol = future.symbol self.futures_contract = None self.contract_count = 0 # Volume Profile indicator settings self.profile_period = 120 # 2 hours self.value_area_percentage = 0.4 self.volume_profile = VolumeProfile( "Volume Profile", self.profile_period, self.value_area_percentage ) # Rolling window to store past prices self.past_prices_period = 21 self.past_prices = RollingWindow[TradeBar](self.past_prices_period) # Consolidate data self.consolidate( self.future_symbol, timedelta(minutes=1), self.on_data_consolidated ) self.register_indicator( self.future_symbol, self.volume_profile, timedelta(hours=2) ) # Create TEMA indicator self.tema = self.tema(self.future_symbol, 7000, Resolution.MINUTE) # Create EMA indicator self.ema = self.ema(self.future_symbol, 30, 0.5, Resolution.MINUTE) # Warm up using historical method history = self.history[TradeBar](self.future_symbol, timedelta(days=1), Resolution.MINUTE) for trade_bar in history: self.volume_profile.update(trade_bar) self.past_prices.add(trade_bar) self.log("Finished warming up indicator") # Free portfolio setting self.settings.free_portfolio_value = 0.3 # Setting trailing stop loss self.trailing_stop_loss_percentage = 0.005 # 0.5% self.highest_price_since_entry = 0 self.stop_loss_price = 0 def on_data_consolidated(self, data: Slice): # Store the past prices of the future contract self.past_prices.add(data) def on_data(self, data: Slice): # Check if the strategy warm up period is over and indicators are ready if (self.is_warming_up or not self.volume_profile.is_ready or not self.past_prices.is_ready or not self.tema.is_ready or not self.ema.is_ready): return current_price = self.past_prices[0].close # Verify entry criteria to invest if not self.portfolio.invested: self.log("Not invested! Finding futures contract...") # Find the future contract with the max open interest above 1000 # This for-loop works because we're only checking one futures security for chain in data.future_chains: popular_contracts = [ contract for contract in chain.value if contract.open_interest > 1000 ] if len(popular_contracts) == 0: continue self.futures_contract = max( popular_contracts, key=lambda k: k.open_interest) # Check if price is moving towards the value area based on the direction of the slope # and the volume profile past_prices = [x.close for x in self.past_prices if x is not None] slope = self.compute_slope(past_prices) # Log the indicators and price self.log(f"Current Price: {current_price} and Slope: {slope}") self.log(f"Value Area High: {self.volume_profile.value_area_high}") self.log(f"Value Area Low: {self.volume_profile.value_area_low}") self.log(f"TEMA: {self.tema.current.value}") self.log(f"EMA: {self.ema.current.value}") if (self.volume_profile.value_area_low <= current_price <= self.volume_profile.value_area_high): # Long condition if (slope < -0.4 and current_price > self.tema.current.value and current_price > self.ema.current.value): self.log( "Price is moving towards the value area, above TEMA, and above EMA! Invest!") self.set_holdings(self.futures_contract.symbol, 1) self.highest_price_since_entry = current_price self.stop_loss_price = current_price * (1 - self.trailing_stop_loss_percentage) self.log( f"Current price: {current_price}, initial stop loss price: {self.stop_loss_price}") else: self.log("Price isn't in value area, above TEMA, or above EMA, keep waiting...") # Exit or update exit stop loss price else: # Update highest price since entry if current_price > self.highest_price_since_entry: self.highest_price_since_entry = current_price new_stop_loss = current_price * (1 - self.trailing_stop_loss_percentage) if new_stop_loss > self.stop_loss_price: self.stop_loss_price = new_stop_loss self.log(f"Updating trailing stop loss to: {self.stop_loss_price}") # Exit check if current_price < self.stop_loss_price: self.log(f"Stop loss triggered at {current_price}") self.liquidate(self.futures_contract.symbol) self.highest_price_since_entry = 0 # Reset for next trade # Plotting the data (commented out) # self.plot("VolumeProfile", "current_price", self.past_prices[0].close) # self.plot("VolumeProfile", "value_area_high", self.volume_profile.value_area_high) # self.plot("VolumeProfile", "value_area_low", self.volume_profile.value_area_low) # self.plot("VolumeProfile", "TEMA", self.tema.current.value) # self.plot("VolumeProfile", "EMA", self.ema.current.value) # self.plot("VolumeProfile","vp", self.volume_profile.current.value) # self.plot("VolumeProfile","profile_high", self.volume_profile.profile_high) # self.plot("VolumeProfile","profile_low", self.volume_profile.profile_low) # self.plot("VolumeProfile","poc_price", self.volume_profile.poc_price) # self.plot("VolumeProfile","poc_volume", self.volume_profile.poc_volume) # self.plot("VolumeProfile","value_area_volume", self.volume_profile.value_area_volume) # self.plot("VolumeProfile","value_area_high", self.volume_profile.value_area_high) # self.plot("VolumeProfile","value_area_low", self.volume_profile.value_area_low) # self.plot("VolumeProfile","current_price", self.past_prices[0].close) def compute_slope(self, prices: list) -> float: # Convert list to numpy array and reshape to 2D for sklearn prices_array = np.array(prices).reshape(-1, 1) # Create an array of indices representing time times = np.array(range(len(prices))).reshape(-1, 1) # Fit a linear regression model model = LinearRegression().fit(times, prices_array) # Return the slope of the regression line return model.coef_[0][0]