Overall Statistics
Total Trades
20
Average Win
1.37%
Average Loss
-1.90%
Compounding Annual Return
2.819%
Drawdown
2.800%
Expectancy
0.031
Net Profit
0.450%
Sharpe Ratio
0.237
Probabilistic Sharpe Ratio
35.839%
Loss Rate
40%
Win Rate
60%
Profit-Loss Ratio
0.72
Alpha
0.032
Beta
0.186
Annual Standard Deviation
0.108
Annual Variance
0.012
Information Ratio
0.396
Tracking Error
0.158
Treynor Ratio
0.138
Total Fees
$20.00
Estimated Strategy Capacity
$26000.00
Lowest Capacity Asset
QQQ Y5MGN9M0F5K6|QQQ RIWIV7K5Z9LX
#region imports
from AlgorithmImports import *
#endregion
BACKTEST_START_YEAR = 2022                              # Set start Year of the Backtest
BACKTEST_START_MONTH = 12                             # Set start Month of the Backtest
BACKTEST_START_DAY = 3                                  # Set start Day of the Backtest

BACKTEST_END_YEAR = 2023                                # Set end Year of the Backtest
BACKTEST_END_MONTH = 1                                  # Set end Month of the Backtest       
BACKTEST_END_DAY = 30                                   # Set end Day of the Backtest

BACKTEST_ACCOUNT_CASH = 2000                          # Set Backtest Strategy Cash


STOCKS = ["QQQ"] # , "IWM", "QQQ", "AMZN", "AAPL", "NVDA", "TSLA", "META", "NFLX", "MSFT"]



# Percentage around price at start of day for which we request options data
# 6 here would get strike prices that are within +6% and -6% of price at start of day
OPTION_SCAN_PERCENTAGE = 6


#PUT ATR PROFIT AND STOP LOSS, CHANGE CALCULATION FOR HISTOGRAM TO TRADINGVIEW ONE


START_MINUTES_AFTER_OPEN = 10
END_MINUTES_BEFORE_CLOSE = 10



ATR_PERIOD = 14
ATR_MULTIPLIER = 1
ATR_MULTIPLIER_STOP_LOSS = 1

BOLLINGER_PERIOD = 20
BOLLINGER_FACTOR = 2

KELTNER_PERIOD = 20
KELTNER_FACTOR = 1.5


TTM_SQUEEZE_PERIOD = 20

# Importing QCAlgorithm and other needed classes, functions etc.
from AlgorithmImports import *
from symboldata import SymbolData
import config


# Class definition for the main algorithm that inherits from the QCAlgorithm class
# This class contains the Initialize, OnData, and other methods that are required to run the algorithm
# It is intended to be the main container for the logic of the algorithm
# QCAlgorithm is a base class provided by the QuantConnect Lean Algorithm Framework for building and backtesting algorithmic trading strategies. 
# It provides a set of predefined methods and properties that can be overridden or used as is to implement a trading strategy. 
# The class can be used to handle data feeds, manage the portfolio and execution, and perform custom computations. 
# The QCAlgorithm class is the foundation of a trading strategy in QuantConnect, 
# and it is designed to be extended by the user to create custom trading logic. 
# It's a class that provides an easy and efficient way to create, test, and execute trading strategies. 
# The user can use this class as a base class to create a new algorithm, and it will have access to all the features of the Lean engine.
class Core(QCAlgorithm):

    def Initialize(self):
        self.SetTimeZone(TimeZones.Chicago)
        # Setting start and end date for the backtest
        self.SetStartDate(config.BACKTEST_START_YEAR, config.BACKTEST_START_MONTH, config.BACKTEST_START_DAY)  
        self.SetEndDate(config.BACKTEST_END_YEAR, config.BACKTEST_END_MONTH, config.BACKTEST_END_DAY)   

        # Setting backtest account cash              
        self.SetCash(config.BACKTEST_ACCOUNT_CASH) 

        # Initialize stock and symbol data objects     
        self.stocks = config.STOCKS
        self.symbols = []
        self.symbol_instances = {}

        # Add stocks to the algorithm and store their symbols 
        for stock in self.stocks:
            try:
                self.symbols.append(self.AddEquity(stock, Resolution.Minute, extendedMarketHours = True).Symbol)
            except:
                raise Exception(f"Unable to add stock of symbol {stock} to the algorithm")
        
        # Create symbol data objects for each stock symbol 
        for symbol in self.symbols:
            self.symbol_instances[symbol] = SymbolData(self, symbol)

    # This function gets called with each new data point that arrives
    def OnData(self, data: Slice):
        # Loop through the symbol data objects
        for symbol, symbolData in self.symbol_instances.items():
            # get list of invested options for the symbol
            option_invested = [x.Key for x in self.Portfolio if x.Value.Invested and x.Value.Type==SecurityType.Option and x.Key.Underlying == symbol]

            # Check if we are in the allowed Time window for trades
            if symbolData.allow_trading:
                # Check that we are not invested into an option for the symbol
                if len(option_invested) == 0:

                    # Loop through options data
                    for i in data.OptionChains:
                        if i.Key != symbolData.option_symbol: continue


                        optionchain = i.Value
                        if (data.OptionChains.Count == 0):
                            return
                        
                        # Check if we have a long signal
                        if symbolData.check_long():

                            day = 0

                            # Filter options by expiration and call/put
                            for i in range(5):
                                call =  [x for x in optionchain if x.Right == OptionRight.Call and ((x.Expiry - self.Time).days) == day]
                                day += 1
                                if len(call) > 0:
                                    break
                            

                            if len(call) == 0:
                                continue
                            
                            # Find call closest to ATM or first strike ITM
                            call_to_buy = min(filter(lambda x: x.Strike <= self.Securities[symbol].Price, call), key = lambda x: abs(x.Strike - self.Securities[symbol].Price))

                            # Send market order for the option contract
                            self.MarketOrder(call_to_buy.Symbol, 1)
                            self.Debug(symbol)
                            self.Debug(self.Time)
                            self.Debug(f"1 hour {symbolData.ema_5} {symbolData.ema_6} {symbolData.ema_7} {symbolData.ema_8} // 4 hour {symbolData.ema_9} {symbolData.ema_10} {symbolData.ema_11} {symbolData.ema_12}")
                            self.Debug(f"keltner lower {symbolData.k_lower_queue}")
                            self.Debug(f"keltner upper {symbolData.k_upper_queue}")
                            self.Debug(f"bollinger lower {symbolData.b_lower_queue}")
                            self.Debug(f"bollinger upper {symbolData.b_upper_queue}")
                            self.Debug(f"ttm squeeze {symbolData.ttm_squeeze_queue}")
                            self.Debug(f"ema 5 minute {symbolData.ema_4}")
                            self.Debug(f"Price {symbolData.bar_5min}")
                            self.Debug(f"High {max(symbolData.high_queue)} Low {min(symbolData.low_queue)}")
                            symbolData.current_contract = call_to_buy
                            symbolData.entry_price = self.Securities[symbol].Close

                        # Check if we have a short signal
                        elif symbolData.check_short():
                            day = 0

                            # Filter options by expiration and call/put
                            for i in range(5):
                                put =  [x for x in optionchain if x.Right == OptionRight.Put and ((x.Expiry - self.Time).days) == day]
                                day += 1
                                if len(put) > 0:
                                    break
                                

                            if len(put) == 0:
                                continue

                            # Find put closest to ATM or first strike ITM      
                            put_to_buy = min(filter(lambda x: x.Strike >= self.Securities[symbol].Price, put), key = lambda x: abs(x.Strike - self.Securities[symbol].Price))

                            # Send market order for the option contract
                            self.Debug(symbol)
                            self.Debug(self.Time)
                            self.Debug(f"1 hour {symbolData.ema_5} {symbolData.ema_6} {symbolData.ema_7} {symbolData.ema_8} // 4 hour {symbolData.ema_9} {symbolData.ema_10} {symbolData.ema_11} {symbolData.ema_12}")
                            self.Debug(f"keltner lower {symbolData.k_lower_queue}")
                            self.Debug(f"keltner upper {symbolData.k_upper_queue}")
                            self.Debug(f"bollinger lower {symbolData.b_lower_queue}")
                            self.Debug(f"bollinger upper {symbolData.b_upper_queue}")
                            self.Debug(f"5 minute EMA {symbolData.ema_4}")
                            self.Debug(f"ttm squeeze {symbolData.ttm_squeeze_queue}")
                            self.Debug(f"ema 5 minute {symbolData.ema_4}")
                            self.Debug(f"Price {symbolData.bar_5min}")
                            self.MarketOrder(put_to_buy.Symbol, 1)
                            symbolData.current_contract = put_to_buy
                            symbolData.entry_price = self.Securities[symbol].Close
                else:
                    option = option_invested[0]
                    check_call = [x.Key for x in self.Portfolio if x.Value.Invested and x.Value.Type==SecurityType.Option and x.Key.Underlying == symbol and x.Key.ID.OptionRight == OptionRight.Call]
                    check_put = [x.Key for x in self.Portfolio if x.Value.Invested and x.Value.Type==SecurityType.Option and x.Key.Underlying == symbol and x.Key.ID.OptionRight == OptionRight.Put]

                    if len(check_call) > 0:
                        symbolData.check_atr_profit("LONG", option)
                        
                        symbolData.check_atr_loss("LONG", option)
                            

                    if len(check_put) > 0:
                        symbolData.check_atr_profit("SHORT", option)
                       
                        symbolData.check_atr_loss("SHORT", option)
                          


                    #self.Debug(self.Portfolio[option].UnrealizedProfitPercent)
                    # Check for profit and loss on the position and liquidate accordingly
                    # if self.Portfolio[option].UnrealizedProfitPercent >= 0.05:
                    #     self.Liquidate(option)
                    #     symbolData.current_contract = None
                    # elif self.Portfolio[option].UnrealizedProfitPercent <= -0.05:
                    #     self.Liquidate(option)
                    #     symbolData.current_contract = None
            
            # Liquidate at end of trading window
            elif len(option_invested) > 0:
                for option in option_invested:
                    self.Liquidate(option)
from AlgorithmImports import *
from QuantConnect.Securities.Option import OptionPriceModels
from collections import deque
from scipy import stats
import config

class SymbolData():



    def __init__(self, algorithm, symbol):
        # Store algorithm and symbol
        self.algorithm = algorithm
        self.symbol = symbol


        self.current_contract = None

        # Add options data for symbol
        self.option = self.algorithm.AddOption(self.symbol, Resolution.Minute)
        self.option.SetLeverage(1.0)
        self.option_symbol = self.option.Symbol
        #self.option.PriceModel = OptionPriceModels.CrankNicolsonFD()
        self.option.SetFilter(self.UniverseFunc)

        # Create EMAs for 5min bars
        self.ema_1 = ExponentialMovingAverage(8)
        self.ema_2 = ExponentialMovingAverage(21)
        self.ema_3 = ExponentialMovingAverage(34)
        self.ema_4 = ExponentialMovingAverage(55)

        # Create EMAs for 1 hour bars
        self.ema_5 = ExponentialMovingAverage(8)
        self.ema_6 = ExponentialMovingAverage(21)
        self.ema_7 = ExponentialMovingAverage(34)
        self.ema_8 = ExponentialMovingAverage(55)

        # Create EMAs for 4 hour bars
        self.ema_9 = ExponentialMovingAverage(8)
        self.ema_10 = ExponentialMovingAverage(21)
        self.ema_11 = ExponentialMovingAverage(34)
        self.ema_12 = ExponentialMovingAverage(55)
      
        # Create deques to hold highs and lows
        self.high_queue = deque(maxlen=config.TTM_SQUEEZE_PERIOD)
        self.low_queue = deque(maxlen=config.TTM_SQUEEZE_PERIOD)

        # Create bollinger band, keltner channel and SMA
        self.bollinger_1 = BollingerBands(config.BOLLINGER_PERIOD, config.BOLLINGER_FACTOR)
        self.keltner_1 = KeltnerChannels(config.KELTNER_PERIOD, config.KELTNER_FACTOR)
        self.sma_1 = SimpleMovingAverage(config.TTM_SQUEEZE_PERIOD)
        self.linear_reg = LeastSquaresMovingAverage(20)

        # Create deques to hold x and y values for linear regression needed in TTM squeeze
        self.delta_x_queue = deque(maxlen=config.TTM_SQUEEZE_PERIOD)
        self.delta_y_queue = deque(maxlen=config.TTM_SQUEEZE_PERIOD)
        self.ling_reg_value = None
        self.bar_counter = 0
        # Create deque to store ttm squeeze histogram values
        self.ttm_squeeze_queue = deque(maxlen=2)

        # Create deques to store bollinger and keltner values
        self.b_upper_queue = deque(maxlen=6)
        self.b_lower_queue = deque(maxlen=6)
        self.k_upper_queue = deque(maxlen=6)
        self.k_lower_queue = deque(maxlen=6)
        

        self.atr = AverageTrueRange(config.ATR_PERIOD)

        self.bar_5min = None

        # Schedule function calls at start and end of trading window, to open and close trading window
        self.algorithm.Schedule.On(self.algorithm.DateRules.EveryDay(), self.algorithm.TimeRules.AfterMarketOpen(self.symbol, config.START_MINUTES_AFTER_OPEN), self.open_window)
        self.algorithm.Schedule.On(self.algorithm.DateRules.EveryDay(), self.algorithm.TimeRules.BeforeMarketClose(self.symbol, config.END_MINUTES_BEFORE_CLOSE), self.close_window)

        self.allow_trading = False

        # Create consolidator for 1 hour bars
        self.hourly_consolidator = TradeBarConsolidator(timedelta(hours=1))
        self.algorithm.SubscriptionManager.AddConsolidator(self.symbol, self.hourly_consolidator)
        self.hourly_consolidator.DataConsolidated += self.receive_hourly_bars


       
        # Create consolidator for 4 hour bars
        self.hourly_consolidator_2 = TradeBarConsolidator(timedelta(hours=4))
        self.algorithm.SubscriptionManager.AddConsolidator(self.symbol, self.hourly_consolidator_2)
        self.hourly_consolidator_2.DataConsolidated += self.receive_hourly_bars_2

        # Get bars from history to warm up hourly consolidators and indicators
        history = self.algorithm.History(tickers=[self.symbol], 
                                                    start=self.algorithm.Time - timedelta(days=90), 
                                                    end=self.algorithm.Time, 
                                                    resolution=Resolution.Hour,
                                                    fillForward=False, 
                                                    extendedMarket=True)


        for row in history.itertuples():
            bar = TradeBar(row.Index[1], self.symbol, row.open, row.high, row.low, row.close, row.volume)
            self.hourly_consolidator.Update(bar)
            self.hourly_consolidator_2.Update(bar)

        # Create consolidator for 5 minute bars
        self.minute_consolidator = TradeBarConsolidator(timedelta(minutes=5))
        self.algorithm.SubscriptionManager.AddConsolidator(self.symbol, self.minute_consolidator)
        self.minute_consolidator.DataConsolidated += self.receive_minute_bars

         # Get bars from history to warm up minute consolidators and indicators
        history = self.algorithm.History(tickers=[self.symbol], 
                                                    start=self.algorithm.Time - timedelta(days=15), 
                                                    end=self.algorithm.Time, 
                                                    resolution=Resolution.Minute, 
                                                    fillForward=False,
                                                    extendedMarket=True)

        for row in history.itertuples():
            bar = TradeBar(row.Index[1], self.symbol, row.open, row.high, row.low, row.close, row.volume)
            self.minute_consolidator.Update(bar)




    # Open trading window
    def open_window(self):
        self.allow_trading = True
    

    # Close trading window
    def close_window(self):
        self.allow_trading = False



    # Receive 5 min bars and update indicators
    def receive_minute_bars(self, sender, bar):
        self.atr.Update(bar)
        self.bar_5min = bar.Close
        self.ema_1.Update(IndicatorDataPoint(bar.EndTime, bar.Close))
        self.ema_2.Update(IndicatorDataPoint(bar.EndTime, bar.Close))
        self.ema_3.Update(IndicatorDataPoint(bar.EndTime, bar.Close))
        self.ema_4.Update(IndicatorDataPoint(bar.EndTime, bar.Close))
        self.bollinger_1.Update(IndicatorDataPoint(bar.EndTime, bar.Close))
        self.keltner_1.Update(bar)
        self.sma_1.Update(IndicatorDataPoint(bar.EndTime, bar.Close))
        self.high_queue.appendleft(bar.High)
        self.low_queue.appendleft(bar.Low)
        self.bar_counter += 1
        self.delta_x_queue.appendleft(self.bar_counter)
        if self.bollinger_1.IsReady and self.keltner_1.IsReady and self.sma_1.IsReady:
            self.b_upper_queue.appendleft(self.bollinger_1.UpperBand.Current.Value)
            self.b_lower_queue.appendleft(self.bollinger_1.LowerBand.Current.Value)
            self.k_upper_queue.appendleft(self.keltner_1.UpperBand.Current.Value)
            self.k_lower_queue.appendleft(self.keltner_1.LowerBand.Current.Value)
            self.calculate_ttm_squeeze(bar)


    # Calculate TTM Squeeze histogram
    def calculate_ttm_squeeze(self, bar):
        if len(self.high_queue) == config.TTM_SQUEEZE_PERIOD and len(self.low_queue) == config.TTM_SQUEEZE_PERIOD:
            highest = max(self.high_queue)
            lowest = min(self.low_queue)
            e_1 = ((highest + lowest) / 2) + self.sma_1.Current.Value
            osc_value = bar.Close - e_1 / 2
          
            # if self.algorithm.Time.day == 23 and self.algorithm.Time.hour == 13 and self.algorithm.Time.minute == 45:
            #     self.algorithm.Debug(f"High {max(self.high_queue)} Low {min(self.low_queue)} Close {bar.Close} Midline {donchian_midline} SMA {self.sma_1.Current.Value}")
            
            self.delta_y_queue.appendleft(osc_value)
            self.linear_reg.Update(IndicatorDataPoint(bar.EndTime, osc_value))        


            if len(self.delta_x_queue) == config.TTM_SQUEEZE_PERIOD and len(self.delta_y_queue) == config.TTM_SQUEEZE_PERIOD:
                #slope, intercept, r, p, std_err = stats.linregress(self.delta_x_queue, self.delta_y_queue)
                #self.ling_reg_value = intercept + slope * (config.TTM_SQUEEZE_PERIOD - 1 - 0)
            #     if self.algorithm.Time.hour == 12 and self.algorithm.Time.minute == 0 and self.algorithm.Time.day == 24:
            #         self.algorithm.Debug(self.ling_reg_value)
            #         self.algorithm.Debug(self.algorithm.Securities[self.symbol].Close)
                self.ttm_squeeze_queue.appendleft(self.linear_reg.Current.Value)



    # Check long conditions
    def check_long(self):
        if self.ema_5.Current.Value > self.ema_6.Current.Value > self.ema_7.Current.Value > self.ema_8.Current.Value:
            if self.ema_9.Current.Value > self.ema_10.Current.Value > self.ema_11.Current.Value > self.ema_12.Current.Value:
                if self.bar_5min > self.ema_4.Current.Value:
                    if self.ttm_squeeze_queue[0] > self.ttm_squeeze_queue[1]:
                        if self.check_squeeze():
                            return True
                        else:
                            return False
                    else:
                        return False
                else:
                    return False
            else:
                return False
        else:
            return False

    

    # Check short conditions
    def check_short(self):
        if self.ema_5.Current.Value < self.ema_6.Current.Value < self.ema_7.Current.Value < self.ema_8.Current.Value:
            if self.ema_9.Current.Value < self.ema_10.Current.Value < self.ema_11.Current.Value < self.ema_12.Current.Value:
                if self.bar_5min < self.ema_4.Current.Value:
                    if self.ttm_squeeze_queue[0] < self.ttm_squeeze_queue[1]:
                        if self.check_squeeze():
                            return True
                        else:
                            return False
                    else:
                        return False
                else:
                    return False
            else:
                return False
        else:
            return False


 


    # Check for volatility squeeze
    def check_squeeze(self):
        if self.b_upper_queue[0] > self.k_upper_queue[0] or self.b_lower_queue[0] < self.k_lower_queue[0]:
            if (self.b_upper_queue[1] < self.k_upper_queue[1] and self.b_lower_queue[1] > self.k_lower_queue[1] 
                and self.b_upper_queue[2] < self.k_upper_queue[2] and self.b_lower_queue[2] > self.k_lower_queue[2] 
                and self.b_upper_queue[3] < self.k_upper_queue[3] and self.b_lower_queue[3] > self.k_lower_queue[3]
                and self.b_upper_queue[4] < self.k_upper_queue[4] and self.b_lower_queue[4] > self.k_lower_queue[4]
                and self.b_upper_queue[5] < self.k_upper_queue[5] and self.b_lower_queue[5] > self.k_lower_queue[5]):
                return True
            else:
                return False
        else:
            return False
               


    # Receive 1 hour bars and update indicators
    def receive_hourly_bars(self, sender, bar):    
        self.ema_5.Update(IndicatorDataPoint(bar.EndTime, bar.Close))
        self.ema_6.Update(IndicatorDataPoint(bar.EndTime, bar.Close))
        self.ema_7.Update(IndicatorDataPoint(bar.EndTime, bar.Close))
        self.ema_8.Update(IndicatorDataPoint(bar.EndTime, bar.Close))
      

    # Receive 4 hour bars and update indicators
    def receive_hourly_bars_2(self, sender, bar):    
        self.ema_9.Update(IndicatorDataPoint(bar.EndTime, bar.Close))
        self.ema_10.Update(IndicatorDataPoint(bar.EndTime, bar.Close))
        self.ema_11.Update(IndicatorDataPoint(bar.EndTime, bar.Close))
        self.ema_12.Update(IndicatorDataPoint(bar.EndTime, bar.Close))




    def check_atr_profit(self, direction, option):
        if direction == "LONG":
            if self.algorithm.Securities[self.symbol].Price >= (self.entry_price + (self.atr.Current.Value * config.ATR_MULTIPLIER)):
                self.algorithm.Liquidate(option, tag=f"PROFIT at {(self.entry_price + (self.atr.Current.Value * config.ATR_MULTIPLIER))}")
        elif direction == "SHORT":
            if self.algorithm.Securities[self.symbol].Price <= (self.entry_price - (self.atr.Current.Value * config.ATR_MULTIPLIER)):
                self.algorithm.Liquidate(option, tag=f"PROFIT at {(self.entry_price - (self.atr.Current.Value * config.ATR_MULTIPLIER))}")

    

    def check_atr_loss(self, direction, option):
        if direction == "LONG":
            if self.algorithm.Securities[self.symbol].Price <= (self.entry_price - (self.atr.Current.Value * config.ATR_MULTIPLIER_STOP_LOSS)):
                self.algorithm.Liquidate(option, tag=f"STOP at {(self.entry_price - (self.atr.Current.Value * config.ATR_MULTIPLIER_STOP_LOSS))}")
          
        elif direction == "SHORT":
            if self.algorithm.Securities[self.symbol].Price >= (self.entry_price + (self.atr.Current.Value * config.ATR_MULTIPLIER_STOP_LOSS)):
                self.algorithm.Liquidate(option, tag=f"STOP at {(self.entry_price + (self.atr.Current.Value * config.ATR_MULTIPLIER_STOP_LOSS))}")
           





    # Request optiosn data for symbol
    def UniverseFunc(self, universe):
        
      
        ATM = self.algorithm.Securities[self.symbol].Price * config.OPTION_SCAN_PERCENTAGE/100
        
        # Return all options that are within 6% and -6% of price and expiration between 1 and 5 days
        return universe.IncludeWeeklys().Strikes(-ATM, ATM).Expiration(TimeSpan.FromDays(1),TimeSpan.FromDays(5))