Overall Statistics
Total Trades
0
Average Win
0%
Average Loss
0%
Compounding Annual Return
0%
Drawdown
0%
Expectancy
0
Net Profit
0%
Sharpe Ratio
0
Probabilistic Sharpe Ratio
0%
Loss Rate
0%
Win Rate
0%
Profit-Loss Ratio
0
Alpha
0
Beta
0
Annual Standard Deviation
0
Annual Variance
0
Information Ratio
0
Tracking Error
0
Treynor Ratio
0
Total Fees
$0.00
Estimated Strategy Capacity
$0
Lowest Capacity Asset
#region imports
from AlgorithmImports import *
from System.Drawing import Color
#endregion

class Benchmark:

    def __init__(self, algo, underlying,  shares = 100, indicators = {}):
        self.algo = algo
        self.underlying = underlying

        self.tradingChart = Chart('Trade Plot')

        self.sqz = Chart('Squeeze')
        self.sqz.AddSeries(Series('No SQZ', SeriesType.Scatter, '.', Color.Green, ScatterMarkerSymbol.Circle))
        self.sqz.AddSeries(Series('Low SQZ', SeriesType.Scatter, '.', Color.Black, ScatterMarkerSymbol.Circle))
        self.sqz.AddSeries(Series('Mid SQZ', SeriesType.Scatter, '.', Color.Red, ScatterMarkerSymbol.Circle))
        self.sqz.AddSeries(Series('High SQZ', SeriesType.Scatter, '.', Color.Orange, ScatterMarkerSymbol.Circle))
        
        self.sqz.AddSeries(Series('UP Bull MOM', SeriesType.Bar, '.', Color.Aqua))
        self.sqz.AddSeries(Series('DOWN Bull MOM', SeriesType.Bar, '.', Color.Blue))
        self.sqz.AddSeries(Series('DOWN Bear MOM', SeriesType.Bar, '.', Color.Red))
        self.sqz.AddSeries(Series('UP Bear MOM', SeriesType.Bar, '.', Color.Yellow))
        self.algo.AddChart(self.sqz)

        self.tradingChart.AddSeries(Series('Price', SeriesType.Line, '$', Color.White))
        self.algo.AddChart(self.tradingChart)

        self.AddIndicators(indicators)

        self.resample = datetime.min
        self.resamplePeriod = (self.algo.EndDate - self.algo.StartDate) / 2000

    def AddIndicators(self, indicators):
        self.indicators = indicators
        for name, _i in indicators.items():
            self.algo.AddChart(Chart(name))

    def PrintBenchmark(self):
        if self.algo.Time <= self.resample: return

        self.resample = self.algo.Time  + self.resamplePeriod
        self.__PrintIndicators()

    def __PrintIndicators(self):
        ''' Prints the indicators array values to the Trade Plot chart.  '''
        for name, indicator in self.indicators.items():
            if name == 'Squeeze':
                self.__PlotSqueeze(indicator)
            else:
                self.algo.PlotIndicator(name, indicator)

    def __PlotSqueeze(self, indicator):
        if indicator.MomentumHistogramColor() == Color.Aqua:
            self.algo.Plot('Squeeze', 'UP Bull MOM', indicator.Current.Value)
        elif indicator.MomentumHistogramColor() == Color.Blue:
            self.algo.Plot('Squeeze', 'DOWN Bull MOM', indicator.Current.Value)
        elif indicator.MomentumHistogramColor() == Color.Red:
            self.algo.Plot('Squeeze', 'DOWN Bear MOM', indicator.Current.Value)
        elif indicator.MomentumHistogramColor() == Color.Yellow:
            self.algo.Plot('Squeeze', 'UP Bear MOM', indicator.Current.Value)

        if indicator.Squeeze == 0:
            self.algo.Plot('Squeeze', 'No SQZ', 0)
        elif indicator.Squeeze == 1:
            self.algo.Plot('Squeeze', 'Low SQZ', 0)
        elif indicator.Squeeze == 2:
            self.algo.Plot('Squeeze', 'Mid SQZ', 0)
        elif indicator.Squeeze == 3:
            self.algo.Plot('Squeeze', 'High SQZ', 0)
#region imports
from AlgorithmImports import *
#endregion

class MarketHours:
    def __init__(self, algorithm, symbol):
        self.algorithm = algorithm
        self.hours = algorithm.Securities[symbol].Exchange.Hours

    def get_CurrentOpen(self):
        return self.hours.GetNextMarketOpen(self.algorithm.Time, False)

    def get_CurrentClose(self):
        return self.hours.GetNextMarketClose(self.get_CurrentOpen(), False)
#region imports
from AlgorithmImports import *
from TTMSqueezePro import TTMSqueezePro
from MarketHours import MarketHours
# endregion

class SqueezeAlphaModel(AlphaModel):
    algorithm = None

    def __init__(self, algorithm, ticker, option):
        self.ticker = ticker
        self.option = option
        self.algorithm = algorithm
        self.symbol = algorithm.AddEquity(self.ticker, resolution = Resolution.Minute)

        # indicators
        self.squeeze = TTMSqueezePro("Squeeze", length = 20)
        algorithm.RegisterIndicator(self.ticker, self.squeeze, Resolution.Daily)
        # history = self.algorithm.History(self.symbol.Symbol, 21, Resolution.Daily)
        # self.squeeze.Warmup(history.loc[self.ticker])
        # TODO: think about warming up the indicator: https://www.quantconnect.com/docs/v2/writing-algorithms/indicators/manual-indicators
        # self.algorithm.SetWarmUp(TimeSpan.FromDays(60))
        self.algorithm.WarmUpIndicator(self.ticker, self.squeeze, Resolution.Daily)

        self.indicators = {
            'Squeeze' : self.squeeze
        }
        self.algorithm.benchmark.AddIndicators(self.indicators)

        self.algorithm.Schedule.On(self.algorithm.DateRules.EveryDay(self.ticker), self.algorithm.TimeRules.BeforeMarketClose(self.ticker, 5), self.UpdateSqueeze)
        # self.algorithm.Schedule.On(self.algorithm.DateRules.EveryDay(self.ticker), self.algorithm.TimeRules.BeforeMarketClose(self.ticker, 5), self.UpdateSqueeze)

    def UpdateSqueeze(self):
        data = self.algorithm.CurrentSlice
        if data.ContainsKey(self.ticker):
            # dataPoint = IndicatorDataPoint(self.symbol, data[self.ticker].EndTime, data[self.ticker].Close)
            # self.squeeze.ManualUpdate(data[self.ticker])
            if self.squeeze.IsReady:
                self.algorithm.benchmark.PrintBenchmark()

    def Update(self, algorithm, data):
        insights = []
        # TODO: all works good it's just one day delay. Consider removing the registerIndicator line and update the indicator manually here before day end.
        #       one problem might be with warmup. Consider this for warmup where we do that manually also: https://www.quantconnect.com/forum/discussion/1810/indicator-updates-after-hours-trading/p1/comment-5524
        if self.ticker not in data.Keys: return insights
        # Update the squeeze with the data on the current day before market close.
        # if self.marketHours.get_CurrentClose().hour - 1 == self.algorithm.Time.hour and self.algorithm.Time.minute == 45:
        #     bar = data.Bars[self.ticker]
        #     self.squeeze.Update(bar)

        if algorithm.IsWarmingUp: return insights
        if not self.squeeze.IsReady: return insights

        # reset your indicators when splits and dividends occur.
        # If a split or dividend occurs, the data in your indicators becomes invalid because it doesn't account for the price adjustments that the split or dividend causes.
        if data.Splits.ContainsKey(self.ticker) or data.Dividends.ContainsKey(self.ticker):
            # Reset the indicator
            self.squeeze.Reset()

        # if self.marketHours.get_CurrentClose().hour - 1 == self.algorithm.Time.hour:
            # Update squeeze indicator for today.
            # TODO: manually update the squeeze indicator for the today value. I'm thinking it should be temporary and replaced once the regular update comes the next day.
            # if self.algorithm.Time.minute == 25:
            #     self.squeeze.Update(self.consolidator.WorkingBar)

        return insights
#region imports
from AlgorithmImports import *
from collections import deque
from scipy import stats
import talib
from numpy import mean, array
#endregion

# Good indicator template: https://www.quantconnect.com/forum/discussion/12691/python-indicator-template/p1
# I did not use it here but it should derive from.

# Use like this:
#
# self.spy = self.AddEquity("SPY", Resolution.Daily).Symbol
# NOT SURE WHERE TO PUT THE EXTRA PARAMETERS LIKE SECURITY AND ALGORITHM§
# 1. in this first try i'm sending the algorithm when creating it.
# self.squeeze = TTMSqueezePro("squeeze", 21)
# self.RegisterIndicator(self.spy, self.squeeze, Resolution.Daily)
# history = self.History(self.spy, 21, Resolution.Daily)
# self.squeeze.Warmup(history)

# TODO: there is a problem with the values being one day late. I'm not sure if it's because of how we close it but it might be the daily period.

class TTMSqueezePro(PythonIndicator):
    Squeeze = 0 # like value this is the second part of the indicator.

    # //BOLLINGER BANDS
    BB_mult = 2.0 # input.float(2.0, "Bollinger Band STD Multiplier")
    # //KELTNER CHANNELS
    KC_mult_high = 1.0 # input.float(1.0, "Keltner Channel #1")
    KC_mult_mid = 1.5 # input.float(1.5, "Keltner Channel #2")
    KC_mult_low = 2.0 # input.float(2.0, "Keltner Channel #3")

    SQZ_COLORS = ['green', 'black', 'red', 'orange']

    # according to this example we don't need to pass security and algo as that
    # will be passed with RegisterIndicator.
    # https://github.com/QuantConnect/Lean/blob/b9d3d999170f385e2371fc3fef1823f2ddd4c83b/Algorithm.Python/CustomWarmUpPeriodIndicatorAlgorithm.py

    def __init__(self, name, length = 20):
        # default indicator definition
        super().__init__()
        self.Name = name
        self.Value = 0
        self.Time = datetime.min
        self.Squeeze = 0

        # set automatic warmup period
        self.WarmUpPeriod = length * 3

        self.length = length
        self.queue = deque(maxlen=self.length)
        self.queueMean = deque(maxlen=self.length)
        self.queueSqz = deque(maxlen=self.length)

        # define the base indicators to use
        self.BB = BollingerBands(self.length, self.BB_mult)
        self.KCHigh = KeltnerChannels(self.length, self.KC_mult_high)
        self.KCMid = KeltnerChannels(self.length, self.KC_mult_mid)
        self.KCLow = KeltnerChannels(self.length, self.KC_mult_low)

        self.MAX = Maximum(self.length)
        self.MIN = Minimum(self.length)
        self.SMA = SimpleMovingAverage('SMA', self.length)

    @property
    def IsReady(self) -> bool:
        # it's ready when:
        # - we have enough data to calculate the momentum oscilator value
        # - we have enough data to calculate the squeeze data
        return (len(self.queueMean) >= self.length) and self.BB.IsReady and self.KCHigh.IsReady and self.KCMid.IsReady and self.KCLow.IsReady

    def ManualUpdate(self, input) -> bool:
        self.Update(input)
        if self.IsReady:
            self.Current = IndicatorDataPoint(input.Symbol, input.Time, self.Value)
        return self.IsReady

    def Update(self, input) -> bool:
        # update all the indicators with the new data
        dataPoint = IndicatorDataPoint(input.Symbol, input.Time, input.Close)
        self.BB.Update(dataPoint)
        self.SMA.Update(dataPoint)
        # Feed into the Max and Min indicators the highest and lowest values only
        self.MAX.Update(IndicatorDataPoint(input.Symbol, input.Time, input.High))
        self.MIN.Update(IndicatorDataPoint(input.Symbol, input.Time, input.Low))
        # Keltner channels indicators
        self.KCHigh.Update(input)
        self.KCMid.Update(input)
        self.KCLow.Update(input)

        # Calculate the mom oscillator only after we get the proper amount of values for the array.
        if len(self.queueMean) >= self.length:
            self.Time = input.Time
            self.Value = self.MomOscillator()
            # self.Current = IndicatorDataPoint(input.Symbol, input.Time, self.Value)
            # self.OnUpdated(self.Current)
            data = IndicatorDataPoint(input.Symbol, input.Time, self.Value)
            if len(self.queue) > 0 and data.Time == self.queue[0].Time:
                self.queue[0] = data
            else:
                self.queue.appendleft(data)

        # calculate momentum oscilator.
        if self.MAX.IsReady and self.MIN.IsReady and self.SMA.IsReady:
            data = IndicatorDataPoint(input.Symbol, input.Time, self.MeanPrice(input.Close))
            if len(self.queueMean) > 0 and data.Time == self.queueMean[0].Time:
                self.queueMean[0] = data
            else:
                self.queueMean.appendleft(data)

        # Add the value and sqz status to a queue so we can check later if
        # we switched status recently.
        if self.BB.IsReady and self.KCHigh.IsReady and self.KCMid.IsReady and self.KCLow.IsReady:
            self.Squeeze = self.SqueezeValue()
            data = IndicatorDataPoint(input.Symbol, input.Time, self.Squeeze)
            if len(self.queueSqz) > 0 and data.Time == self.queueSqz[0].Time:
                self.queueSqz[0] = data
            else:
                self.queueSqz.appendleft(data)

        return self.IsReady

    def KC_basis(self):
        return self.sma.Current.Value

    def BB_basis(self):
        # return self.sma.Current.Value
        return self.BB.MiddleBand.Current.Value

    def BB_upper(self):
        return self.BB.UpperBand.Current.Value

    def BB_lower(self):
        return self.BB.LowerBand.Current.Value

    def KC_upper_high(self):
        return self.KCHigh.UpperBand.Current.Value

    def KC_lower_high(self):
        return self.KCHigh.LowerBand.Current.Value

    def KC_upper_mid(self):
        return self.KCMid.UpperBand.Current.Value

    def KC_lower_mid(self):
        return self.KCMid.LowerBand.Current.Value

    def KC_upper_low(self):
        return self.KCLow.UpperBand.Current.Value

    def KC_lower_low(self):
        return self.KCLow.LowerBand.Current.Value

    # //SQUEEZE CONDITIONS
    def NoSqz(self):
        return self.BB_lower() < self.KC_lower_low() or self.BB_upper() > self.KC_upper_low() # NO SQUEEZE: GREEN

    def LowSqz(self):
        return self.BB_lower() >= self.KC_lower_low() or self.BB_upper() <= self.KC_upper_low() # LOW COMPRESSION: BLACK

    def MidSqz(self):
        return self.BB_lower() >= self.KC_lower_mid() or self.BB_upper() <= self.KC_upper_mid() # MID COMPRESSION: RED

    def HighSqz(self):
        return self.BB_lower() >= self.KC_lower_high() or self.BB_upper() <= self.KC_upper_high() # HIGH COMPRESSION: ORANGE

    # //SQUEEZE DOTS COLOR
    # sq_color = HighSqz ? color.new(color.orange, 0) : MidSqz ? color.new(color.red, 0) : LowSqz ? color.new(color.black, 0) : color.new(color.green, 0)
    def SqueezeColor(self):
        if self.HighSqz():
            return 'orange'
        elif self.MidSqz():
            return 'red'
        elif self.LowSqz():
            return 'black'
        else:
            return 'green'

    def SqueezeValue(self):
        return self.SQZ_COLORS.index(self.SqueezeColor())

    def MomentumHistogramColor(self):
        bullish = Color.Aqua if self.queue[0].Value > self.queue[1].Value else Color.Blue
        bearish = Color.Red if self.queue[0].Value < self.queue[1].Value else Color.Yellow
        return bullish if self.Bullish() else bearish

    def Bullish(self):
        return self.queue[0].Value > 0

    def Bearish(self):
        return self.queue[0].Value <= 0

    # This calculates the mean price value that we'll add to a series type/array and we'll use to
    # calculate the momentum oscilator (aka linear regression)
    def MeanPrice(self, price):
        return price - mean([mean([self.MAX.Current.Value, self.MIN.Current.Value]), self.SMA.Current.Value])

    def LosingMomentum(self, maxDays = 2):
        # reverse the momentum values
        slicedQueue = list(self.queue)[:maxDays]
        # if absolute then we also check the momentum `color`
        # go over the reversed values and make sure they are decreasing
        return all(earlier.Value > later.Value for later, earlier in zip(slicedQueue, slicedQueue[1:]))

    def GainingMomentum(self, maxDays = 2):
        # reverse the momentum values
        slicedQueue = list(self.queue)[:maxDays]
        # if absolute then we also check the momentum `color`
        # go over the reversed values and make sure they are increasing
        return all(earlier.Value < later.Value for later, earlier in zip(slicedQueue, slicedQueue[1:]))

    # It's squeezing if the colors are different than green.
    def Squeezing(self):
        current = self.queueSqz[0]
        return current.Value != self.SQZ_COLORS.index('green')

    def SqueezeChange(self, toColor = 'green'):
        earlier = self.queueSqz[1]
        last = self.queueSqz[0]
        colorIndex = self.SQZ_COLORS.index(toColor)
        return last.Value == colorIndex and earlier.Value != last.Value

    def SqueezeDuration(self, over = 2):
        # pick last `over` days but today/current value
        slicedQueue = list(self.queueSqz)[1:over+1]
        colorIndex = self.SQZ_COLORS.index('green')
        # go over the reversed values and make sure they are increasing
        return all(val.Value != colorIndex for val in slicedQueue)

    def MomOscillator(self):
        '''
        //MOMENTUM OSCILLATOR
        mom = ta.linreg(close - math.avg(math.avg(
                                                  ta.highest(high, length),
                                                  ta.lowest(low, length)
                                                  ),
                                         ta.sma(close, length)
                                        ),
                        length, 0)

        https://www.quantconnect.com/forum/discussion/10168/least-squares-linear-regression/p1/comment-28627
        https://www.tradingview.com/pine-script-reference/v4/#fun_linreg

        // linreg = intercept + slope * (length - 1 - offset)
        // where length is the y argument,
        // offset is the z argument,
        // intercept and slope are the values calculated with the least squares method on source series (x argument).
        -> linreg(source, length, offset) → series[float]
        '''
        # x = [range(len(self.queueMean))]
        # y = self.queueMean

        # slope, intercept = stats.linregress(x, y)[0], stats.linregress(x, y)[1]
        # linreg = intercept + slope * (self.length - 1)

        # we need to reverse the queue in order to get the most recent regression
        series = array([m.Value for m in reversed(self.queueMean)])
        size = len(series)
        # considering we are not taking a shorter regression value we are going to get the last value
        # of the returned array as that is where the linar regression value sits the rest are `nan`
        linreg = talib.LINEARREG(series, size)[size - 1]

        return linreg

    def Warmup(self, history):
        for index, row in history.iterrows():
            self.Update(row)

#region imports
from AlgorithmImports import *
from TTMSqueezePro import TTMSqueezePro
from SqueezeAlphaModel import SqueezeAlphaModel
from Benchmark import Benchmark
# endregion

class AddAlphaModelAlgorithm(QCAlgorithm):
    def Initialize(self):
        ''' Initialise the data and resolution required, as well as the cash and start-end dates for your algorithm. All algorithms must initialized.'''
        # self.SetStartDate(2012, 2, 4)  # Set Start Date
        # self.SetEndDate(2020, 31, 7)    # Set End Date
        self.SetStartDate(2021, 7, 1)  # Set Start Date
        self.SetEndDate(2022, 7, 1)    # Set End Date

        self.SetCash(100_000)          # Set Strategy Cash
        # Set settings and account setup
        self.UniverseSettings.Resolution = Resolution.Minute
        self.UniverseSettings.FillForward = False
        self.SetBrokerageModel(BrokerageName.InteractiveBrokersBrokerage, AccountType.Margin) # Set InteractiveBrokers Brokerage model

        # Main variables
        self.ticker = "TSLA"
        self.benchmark = Benchmark(self, self.ticker)
        self.option = Symbol.Create(self.ticker, SecurityType.Option, Market.USA, f"?{self.ticker}")

        # Squeeze model
        self.SetAlpha(SqueezeAlphaModel(self, self.ticker, self.option))