Created with Highcharts 12.1.2EquityJan 2023Feb 2023Mar 2023Apr 2023May 2023Jun 2023Jul 2023Aug 2023Sep 2023Oct 2023Nov 2023Dec 20230200k400k-100-50001020-25025020M40M020M40M0100
Overall Statistics
Total Orders
2209
Average Win
0.32%
Average Loss
-0.19%
Compounding Annual Return
307.518%
Drawdown
61.900%
Expectancy
0.604
Start Equity
50000
End Equity
161648.72
Net Profit
223.297%
Sharpe Ratio
3.165
Sortino Ratio
3.708
Probabilistic Sharpe Ratio
71.689%
Loss Rate
41%
Win Rate
59%
Profit-Loss Ratio
1.70
Alpha
2.865
Beta
1.499
Annual Standard Deviation
0.93
Annual Variance
0.865
Information Ratio
3.155
Tracking Error
0.916
Treynor Ratio
1.963
Total Fees
$1256.28
Estimated Strategy Capacity
$13000000.00
Lowest Capacity Asset
MNQ YEBKSYL2454X
Portfolio Turnover
153.00%
from AlgorithmImports import *

class DarStkBear:
    def __init__(self, algorithm, symbol, execute_info):
        self.algorithm = algorithm
        self.symbol = symbol
        self.execute_info = execute_info
        
        self.swing_high = None
        self.low_TPIH = None
        self.last_low = None
        self.aom = None
        self.volume_imbalance = None

    def check_for_bearish_setup(self, window_5min, window_1min):
        self.algorithm.Debug("Executing check_for_bearish_setup")
        
        if not isinstance(window_5min, (list, RollingWindow)) or not isinstance(window_1min, (list, RollingWindow)):
            self.algorithm.Debug("window_5min and window_1min should be collections of TradeBar objects.")
            return

        if len(window_5min) < 3:
            self.algorithm.Debug(f"window_5min length: {len(window_5min)} is less than 3")
            return

        if len(window_1min) < 15:
            self.algorithm.Debug(f"window_1min length: {len(window_1min)} is less than 15")
            return

        if not self.checkSwingHigh(window_5min):
            self.algorithm.Debug("No swing high found")
            return

        if not self.checkCloseAbove(window_5min):
            self.algorithm.Debug("No close above the swing high")
            return

        if not self.checkCloseBelow(window_5min):
            self.algorithm.Debug("No close below the low that formed the swing high")
            return

        self.calcAOM()

        if not self.checkVI(window_1min):
            self.algorithm.Debug("No volume imbalance found")
            return

        self.DarStkBearEntry()

    def checkSwingHigh(self, window_5min):
        self.algorithm.Debug("Executing checkSwingHigh")
        
        if len(window_5min) < 3:
            return False

        bar1 = window_5min[-3]
        bar2 = window_5min[-2]
        bar3 = window_5min[-1]

        self.algorithm.Debug(f"Bar1: {bar1}, Bar2: {bar2}, Bar3: {bar3}")

        if bar2.High > bar1.High and bar2.High > bar3.High:
            self.swing_high = bar2.High
            self.low_TPIH = min(bar1.Low, bar3.Low)
            self.storeSwingHigh(bar2)
            self.algorithm.Debug(f"Swing high set at {self.swing_high}, Low TPIH set at {self.low_TPIH}")
            return True

        return False

    def storeSwingHigh(self, bar):
        self.swing_high = bar.High
        self.swing_high_time = bar.Time
        self.algorithm.Debug(f"Stored swing high at {self.swing_high} with time {self.swing_high_time}")

    def checkCloseAbove(self, window_5min):
        self.algorithm.Debug("Executing checkCloseAbove")
        
        for bar in window_5min:
            if bar.Close > self.swing_high:
                self.algorithm.Debug(f"Close above swing high found at {bar.Close}")
                return True

        return False

    def checkCloseBelow(self, window_5min):
        self.algorithm.Debug("Executing checkCloseBelow")
        
        for bar in window_5min:
            if bar.Close < self.low_TPIH:
                self.algorithm.Debug(f"Close below low TPIH found at {bar.Close}")
                return True

        return False

    def calcAOM(self):
        self.algorithm.Debug("Executing calcAOM")
        
        if not self.swing_high or not self.low_TPIH:
            return

        fib_range_high = self.swing_high
        fib_range_low = self.low_TPIH - (self.swing_high - self.low_TPIH) * 0.618
        self.aom = (fib_range_low, fib_range_high)
        self.algorithm.Debug(f"AOM set at {self.aom}")

    def checkVI(self, window_1min):
        self.algorithm.Debug("Executing checkVI")
        
        if not self.aom:
            return False

        for bar in window_1min[-15:]:
            if self.aom[0] <= bar.Low <= self.aom[1] and self.aom[0] <= bar.High <= self.aom[1]:
                self.algorithm.Debug(f"Volume imbalance found at {bar.High}")
                self.volume_imbalance = bar.High
                return True

        return False

    def DarStkBearEntry(self):
        self.algorithm.Debug("Executing DarStkBearEntry")
        
        if self.volume_imbalance is None:
            self.algorithm.Debug("Volume imbalance not set")
            return

        entry_price = self.volume_imbalance
        stop_loss_price = self.swing_high + (self.swing_high - self.low_TPIH) * 0.5
        take_profit_price = self.low_TPIH - (self.swing_high - self.low_TPIH) * 4

        current_contract = self.algorithm.Securities[self.symbol].Symbol
        mapped_contract = self.algorithm.Securities[current_contract].Mapped
        self.execute_info.place_order(mapped_contract, "sell", entry_price, stop_loss_price, take_profit_price)
        self.algorithm.Debug(f"Placed sell order at {entry_price} with stop loss at {stop_loss_price} and take profit at {take_profit_price} for {mapped_contract}")
from AlgorithmImports import *

class DarStkBull:
    def __init__(self, algorithm, symbol, execute_info):
        self.algorithm = algorithm
        self.symbol = symbol
        self.execute_info = execute_info
        
        self.swing_low = None
        self.high_TPIL = None
        self.last_high = None
        self.aom = None
        self.volume_imbalance = None

    def check_for_bullish_setup(self, window_5min, window_1min):
        self.algorithm.Debug("Executing check_for_bullish_setup")
        
        if not isinstance(window_5min, (list, RollingWindow)) or not isinstance(window_1min, (list, RollingWindow)):
            self.algorithm.Debug("window_5min and window_1min should be collections of TradeBar objects.")
            return

        if len(window_5min) < 3:
            self.algorithm.Debug(f"window_5min length: {len(window_5min)} is less than 3")
            return

        if len(window_1min) < 15:
            self.algorithm.Debug(f"window_1min length: {len(window_1min)} is less than 15")
            return

        if not self.checkSwingLow(window_5min):
            self.algorithm.Debug("No swing low found")
            return

        if not self.checkCloseBelow(window_5min):
            self.algorithm.Debug("No close below the swing low")
            return

        if not self.checkCloseAbove(window_5min):
            self.algorithm.Debug("No close above the high that formed the swing low")
            return

        self.calcAOM()

        if not self.checkVI(window_1min):
            self.algorithm.Debug("No volume imbalance found")
            return

        self.DarStkBullEntry()

    def checkSwingLow(self, window_5min):
        self.algorithm.Debug("Executing checkSwingLow")
        
        if len(window_5min) < 3:
            return False

        bar1 = window_5min[-3]
        bar2 = window_5min[-2]
        bar3 = window_5min[-1]

        self.algorithm.Debug(f"Bar1: {bar1}, Bar2: {bar2}, Bar3: {bar3}")

        if bar2.Low < bar1.Low and bar2.Low < bar3.Low:
            self.swing_low = bar2.Low
            self.high_TPIL = max(bar1.High, bar3.High)
            self.storeSwingLow(bar2)
            self.algorithm.Debug(f"Swing low set at {self.swing_low}, High TPIL set at {self.high_TPIL}")
            return True

        return False

    def storeSwingLow(self, bar):
        self.swing_low = bar.Low
        self.swing_low_time = bar.Time
        self.algorithm.Debug(f"Stored swing low at {self.swing_low} with time {self.swing_low_time}")

    def checkCloseBelow(self, window_5min):
        self.algorithm.Debug("Executing checkCloseBelow")
        
        for bar in window_5min:
            if bar.Close < self.swing_low:
                self.algorithm.Debug(f"Close below swing low found at {bar.Close}")
                return True

        return False

    def checkCloseAbove(self, window_5min):
        self.algorithm.Debug("Executing checkCloseAbove")
        
        for bar in window_5min:
            if bar.Close > self.high_TPIL:
                self.algorithm.Debug(f"Close above high TPIL found at {bar.Close}")
                return True

        return False

    def calcAOM(self):
        self.algorithm.Debug("Executing calcAOM")
        
        if not self.swing_low or not self.high_TPIL:
            return

        fib_range_low = self.swing_low
        fib_range_high = self.high_TPIL + (self.high_TPIL - self.swing_low) * 0.618
        self.aom = (fib_range_low, fib_range_high)
        self.algorithm.Debug(f"AOM set at {self.aom}")

    def checkVI(self, window_1min):
        self.algorithm.Debug("Executing checkVI")
        
        if not self.aom:
            return False

        for bar in window_1min[-15:]:
            if self.aom[0] <= bar.Low <= self.aom[1] and self.aom[0] <= bar.High <= self.aom[1]:
                self.algorithm.Debug(f"Volume imbalance found at {bar.Low}")
                self.volume_imbalance = bar.Low
                return True

        return False

    def DarStkBullEntry(self):
        self.algorithm.Debug("Executing DarStkBullEntry")
        
        if self.volume_imbalance is None:
            self.algorithm.Debug("Volume imbalance not set")
            return

        entry_price = self.volume_imbalance
        stop_loss_price = self.swing_low - (self.high_TPIL - self.swing_low) * 0.5
        take_profit_price = self.high_TPIL + (self.high_TPIL - self.swing_low) * 4

        current_contract = self.algorithm.Securities[self.symbol].Symbol
        mapped_contract = self.algorithm.Securities[current_contract].Mapped
        self.execute_info.place_order(mapped_contract, "buy", entry_price, stop_loss_price, take_profit_price)
        self.algorithm.Debug(f"Placed buy order at {entry_price} with stop loss at {stop_loss_price} and take profit at {take_profit_price} for {mapped_contract}")
from AlgorithmImports import *

class DetermineBias:
    def __init__(self, algorithm, multi_time_frame_analysis):
        self.algorithm = algorithm
        self.multi_time_frame_analysis = multi_time_frame_analysis
        self.current_structure = "neutral"
        self.h1_data = None
        self.m15_data = None
        self.swing_highs = []
        self.swing_lows = []

    def update_data(self):
        latest_bars = self.multi_time_frame_analysis.GetLatestBars()
        if latest_bars:
            self.h1_data = latest_bars.get("1hour")
            self.m15_data = latest_bars.get("15min")
            self.algorithm.Debug("Data updated in DetermineBias.")
        else:
            self.algorithm.Debug("Latest bars not available for updating data.")

    def analyze_m15_swings(self):
        if not self.m15_data or len(self.m15_data) < 3:
            return

        for i in range(2, len(self.m15_data)):
            bar1 = self.m15_data[i - 2]
            bar2 = self.m15_data[i - 1]
            bar3 = self.m15_data[i]

            if bar2.High > bar1.High and bar2.High > bar3.High:
                self.swing_highs.append(bar2)
                self.algorithm.Debug(f"Swing high added: {bar2.Time} High: {bar2.High}")

            if bar2.Low < bar1.Low and bar2.Low < bar3.Low:
                self.swing_lows.append(bar2)
                self.algorithm.Debug(f"Swing low added: {bar2.Time} Low: {bar2.Low}")

    def update_current_structure(self):
        self.update_data()
        if not self.m15_data or len(self.m15_data) < 3:
            self.algorithm.Debug("Not enough data to update current structure.")
            return self.current_structure

        self.analyze_m15_swings()

        if len(self.swing_highs) >= 3 and len(self.swing_lows) >= 3:
            current_swing_high = self.swing_highs[-1]
            current_swing_low = self.swing_lows[-1]

            if self.current_structure == "bearish" and self.m15_data[-1].Close > current_swing_high.High:
                self.current_structure = "potential_bullish_reversal"
                self.algorithm.Debug("Potential bullish reversal detected.")
            elif self.current_structure == "bullish" and self.m15_data[-1].Close < current_swing_low.Low:
                self.current_structure = "potential_bearish_reversal"
                self.algorithm.Debug("Potential bearish reversal detected.")

            if current_swing_high.Time > current_swing_low.Time and self.current_structure != "bullish":
                self.current_structure = "bullish"
                self.algorithm.Debug("Market structure changed to bullish.")
            elif current_swing_low.Time > current_swing_high.Time and self.current_structure != "bearish":
                self.current_structure = "bearish"
                self.algorithm.Debug("Market structure changed to bearish.")

        # Ensure the structure is updated at least once to avoid infinite neutral state
        if self.current_structure == "neutral" and len(self.m15_data) >= 15:
            last_swing = self.m15_data[-1]
            if last_swing.Close > self.m15_data[0].High:
                self.current_structure = "bullish"
                self.algorithm.Debug("Initial market structure set to bullish.")
            elif last_swing.Close < self.m15_data[0].Low:
                self.current_structure = "bearish"
                self.algorithm.Debug("Initial market structure set to bearish.")

        return self.current_structure

    def determine_bias(self):
        self.update_data()
        return self.update_current_structure()
from AlgorithmImports import *

class ExecuteInfo:
    def __init__(self, algorithm):
        self.algorithm = algorithm
        
        # Risk parameters
        self.position_size = 2  # Number of micro E-mini NQ contracts
        self.stop_loss_handles = 10  # Stop loss in handles
        self.take_profit_handles = 40  # Take profit in handles
        
        # Derived risk parameters in dollars
        self.tick_value = 0.5  # Tick value for micro E-mini NQ (1 tick = $0.50)
        self.handles_to_ticks = 4  # 1 handle = 4 ticks for NQ
        self.stop_loss_ticks = self.stop_loss_handles * self.handles_to_ticks
        self.take_profit_ticks = self.take_profit_handles * self.handles_to_ticks
        self.risk_per_contract = self.stop_loss_ticks * self.tick_value  # Risk per contract
        self.reward_per_contract = self.take_profit_ticks * self.tick_value  # Reward per contract
        
        # Calculate total risk and reward
        self.total_risk = self.position_size * self.risk_per_contract  # Total risk for position
        self.total_reward = self.position_size * self.reward_per_contract  # Total reward for position

    def place_order(self, contract, direction, entry_price, stop_loss_price=None, take_profit_price=None):
        if direction.lower() == "buy":
            self.algorithm.MarketOrder(contract, 1)
            self.algorithm.Debug(f"Buy order placed for {contract} at {entry_price}")
        elif direction.lower() == "sell":
            self.algorithm.MarketOrder(contract, -1)
            self.algorithm.Debug(f"Sell order placed for {contract} at {entry_price}")

        if stop_loss_price is not None:
            self.algorithm.Debug(f"Stop loss set at {stop_loss_price}")

        if take_profit_price is not None:
            self.algorithm.Debug(f"Take profit set at {take_profit_price}")

    def set_stop_loss(self, symbol, entry_price):
        """
        Set a stop loss order.
        """
        stop_loss_price = entry_price - self.stop_loss_handles if self.algorithm.Portfolio[symbol].Quantity > 0 else entry_price + self.stop_loss_handles
        self.algorithm.StopMarketOrder(symbol, -self.position_size, stop_loss_price)

    def set_take_profit(self, symbol, entry_price):
        """
        Set a take profit order.
        """
        take_profit_price = entry_price + self.take_profit_handles if self.algorithm.Portfolio[symbol].Quantity > 0 else entry_price - self.take_profit_handles
        self.algorithm.LimitOrder(symbol, -self.position_size, take_profit_price)
from AlgorithmImports import *
from multi_time_frame_analysis import MultiTimeFrameAnalysis
from execute_info import ExecuteInfo
from DarStkBull import DarStkBull
from DarStkBear import DarStkBear
from determine_bias import DetermineBias

class DarStk(QCAlgorithm):
    def Initialize(self):
        self.SetStartDate(2023, 1, 1)
        self.SetEndDate(2023, 11, 1)
        self.SetCash(50000)
        
        # Set base resolution to minute
        self.UniverseSettings.Resolution = Resolution.Minute

        # Adding the future security with base resolution
        self.future = self.AddFuture(Futures.Indices.MICRO_NASDAQ_100_E_MINI)
        self.future.SetFilter(0, 90)
        self.future_symbol = self.future.Symbol

        self.Debug("DarStk v1.2.1 initialized successfully")
        self.SetTimeZone(TimeZones.NewYork)
        self.SetWarmUp(timedelta(days=10))

        # Initialize multi-time frame analysis
        self.multi_time_frame_analysis = MultiTimeFrameAnalysis(self, self.future_symbol)
        
        # Initialize execute info
        self.execute_info = ExecuteInfo(self)

        # Initialize DetermineBias
        self.determine_bias = DetermineBias(self, self.multi_time_frame_analysis)

        # Initialize DarStkBull and DarStkBear
        self.dar_stk_bull = DarStkBull(self, self.future_symbol, self.execute_info)
        self.dar_stk_bear = DarStkBear(self, self.future_symbol, self.execute_info)

        # Define trading hours
        self.trading_start_time = time(10, 0)  # 10:00 AM
        self.trading_end_time = time(15, 30)   # 3:30 PM

    def OnData(self, data):
        self.Debug(f"OnData called at {self.Time}")

        # Skip trading logic if still warming up
        if self.IsWarmingUp:
            self.Debug("Skipping OnData because the algorithm is still warming up.")
            return

        current_time = self.Time.time()

        # Execute trading logic only during defined trading hours
        if self.trading_start_time <= current_time <= self.trading_end_time:
            latest_bars = self.multi_time_frame_analysis.GetLatestBars()
            if latest_bars:
                self.Debug("Executing trading logic.")
                self.ExecuteTradingLogic(latest_bars)
            else:
                self.Debug("Skipping ExecuteTradingLogic because latest bars are not ready.")

    def ExecuteTradingLogic(self, latest_bars):
        # Determine market structure
        market_structure = self.determine_bias.update_current_structure()
        self.Debug(f"Determined market structure: {market_structure}")

        # Execute trades based on market structure
        if market_structure == "bullish":
            self.dar_stk_bull.check_for_bullish_setup(latest_bars["5min"], latest_bars["1min"])
        elif market_structure == "bearish":
            self.dar_stk_bear.check_for_bearish_setup(latest_bars["5min"], latest_bars["1min"])
        else:
            self.Debug("Market structure is neutral; no trades will be executed.")

    def OnWarmUpFinished(self):
        self.Debug("Warm-up finished. Indicators and rolling windows are ready.")
#region imports
from AlgorithmImports import *
#endregion

class MultiTimeFrameAnalysis:
    def __init__(self, algorithm, symbol):
        self.algorithm = algorithm  # Store a reference to the algorithm instance
        self.symbol = symbol        # Store the symbol for which consolidators are set up
        
        # Adding consolidators for different time frames
        self.consolidator_1hour = TradeBarConsolidator(timedelta(hours=1))
        self.consolidator_15min = TradeBarConsolidator(timedelta(minutes=15))
        self.consolidator_5min = TradeBarConsolidator(timedelta(minutes=5))
        self.consolidator_1min = TradeBarConsolidator(timedelta(minutes=1))

        # Subscribing to the DataConsolidated events
        self.consolidator_1hour.DataConsolidated += self.OnDataConsolidated_1hour
        self.consolidator_15min.DataConsolidated += self.OnDataConsolidated_15min
        self.consolidator_5min.DataConsolidated += self.OnDataConsolidated_5min
        self.consolidator_1min.DataConsolidated += self.OnDataConsolidated_1min

        # Adding consolidators to the SubscriptionManager
        self.algorithm.SubscriptionManager.AddConsolidator(self.symbol, self.consolidator_1hour)
        self.algorithm.SubscriptionManager.AddConsolidator(self.symbol, self.consolidator_15min)
        self.algorithm.SubscriptionManager.AddConsolidator(self.symbol, self.consolidator_5min)
        self.algorithm.SubscriptionManager.AddConsolidator(self.symbol, self.consolidator_1min)

        # Initializing RollingWindows for each time frame
        self.window_1hour = RollingWindow[TradeBar](1)     # 1 bar (1 hour of 1-hour data)
        self.window_15min = RollingWindow[TradeBar](4)       # 4 bars (1 hour of 15-minute data)
        self.window_5min = RollingWindow[TradeBar](12)   # 12 bars (1 hour of 5-minute data)
        self.window_1min = RollingWindow[TradeBar](60)     # 60 bars (1 hour of 1-minute data)
        self.algorithm.Debug("MultiTimeFrameAnalysis initialized successfully")

    def OnDataConsolidated_1hour(self, sender, bar: TradeBar):
        if isinstance(bar, TradeBar):
            self.window_1hour.Add(bar)

    def OnDataConsolidated_15min(self, sender, bar: TradeBar):
        if isinstance(bar, TradeBar):
            self.window_15min.Add(bar)

    def OnDataConsolidated_5min(self, sender, bar: TradeBar):
        if isinstance(bar, TradeBar):
            self.window_5min.Add(bar)

    def OnDataConsolidated_1min(self, sender, bar: TradeBar):
        if isinstance(bar, TradeBar):
            self.window_1min.Add(bar)

    def IsReady(self):
        """
        Check if all rolling windows have sufficient data.
        """
        return self.window_1hour.IsReady and self.window_15min.IsReady and self.window_5min.IsReady and self.window_1min.IsReady

    def GetLatestBars(self):
        if self.IsReady():
            return {
                "1hour": list(self.window_1hour),
                "15min": list(self.window_15min),
                "5min": list(self.window_5min),
                "1min": list(self.window_1min)
            }
        else:
            self.algorithm.Debug("Rolling windows are not ready.")
            return None  # Ensure to handle the case when data is not ready