Overall Statistics
Total Trades
1462
Average Win
0.58%
Average Loss
-1.11%
Compounding Annual Return
9.882%
Drawdown
38.800%
Expectancy
0.108
Net Profit
86.781%
Sharpe Ratio
0.415
Probabilistic Sharpe Ratio
3.080%
Loss Rate
27%
Win Rate
73%
Profit-Loss Ratio
0.52
Alpha
0.044
Beta
0.536
Annual Standard Deviation
0.233
Annual Variance
0.054
Information Ratio
-0.01
Tracking Error
0.229
Treynor Ratio
0.18
Total Fees
$3442.74
Estimated Strategy Capacity
$4200000.00
Lowest Capacity Asset
ADBE R735QTJ8XC9X
Portfolio Turnover
8.49%
from AlgorithmImports import *
import talib
import pickle
from datetime import timedelta
from QuantConnect.Data.UniverseSelection import *
from QuantConnect.Algorithm.Framework.Alphas import *

# File imports
from portfolio import MyEqualWeightingPortfolioConstructionModel
from ttm_models import TTMAlphaModel, MyMaximumDrawdownPercentPerSecurity
from market_condition import MarketConditionData

###############################################################################
class TTMAlgorithm(QCAlgorithm):
    def Initialize(self):
        self.SetStartDate(2017, 1, 1)
        # self.SetEndDate(2017, 3, 31)
        self.SetCash(100000)
        self.SetTimeZone('US/Eastern')
        self.SetSecurityInitializer(
            CustomSecurityInitializer(
                self.BrokerageModel, 
                FuncSecuritySeeder(self.GetLastKnownPrices)
            )
        )
        # Benchmark
        self.bm = self.AddEquity("SPY", Resolution.Daily).Symbol
        self.market_condition = MarketConditionData(self, self.bm)

# Universe selection
        self.AddUniverse(self.CoarseSelectionFunction)
# Toggle daily vs hour here
        self.daily = False
        if self.daily:
            self.UniverseSettings.Resolution = Resolution.Daily
        else:
            self.UniverseSettings.Resolution = Resolution.Hour
# Do we need to use extended market hours to help?
# This way it has data at update time of 900
            self.UniverseSettings.ExtendedMarketHours = True
# self.Schedule.On(
#     self.DateRules.EveryDay(self.bm),
#     self.TimeRules.AfterMarketOpen(self.bm, 1),
#     self.UpdateAlpha
# ) 

        # Set TTM models
        self.ttm_alpha = TTMAlphaModel(self)
        self.AddAlpha(self.ttm_alpha)
        self.ttm_risk = MyMaximumDrawdownPercentPerSecurity(self)
        self.AddRiskManagement(self.ttm_risk)
        self.ttm_squeeze_long = True
        self.ttm_squeeze_short = False # test long only
        self.update_models = False
        self.last_update_time = None

        # Portfolio construction
        self.ttm_portfolio = MyEqualWeightingPortfolioConstructionModel(self)
        self.SetPortfolioConstruction(self.ttm_portfolio)
        
# Execution model?
        self.SetExecution(ImmediateExecutionModel())

        # Other settings
        self.Settings.MinAbsolutePortfolioTargetPercentage = \
            0.00000000000000000000001
        self.Settings.RebalancePortfolioOnInsightChanges = True
        self.Settings.RebalancePortfolioOnSecurityChanges = False
        self.Settings.FreePortfolioValuePercentage = 0.05

        # Variables
        self.times = []
        self.navs = []
        self.market_volatilities = []
        self.market_directions = []
        self.last_data = None

#-------------------------------------------------------------------------------
    def CoarseSelectionFunction(self, coarse):
        """Perform coarse filters on universe."""
        # Get the highest volume stocks
        stocks = [x for x in coarse if x.HasFundamentalData]
        sorted_by_dollar_volume = sorted(
            stocks, key=lambda x: x.DollarVolume, reverse=True
        ) 
        top = 50
        symbols = [x.Symbol for x in sorted_by_dollar_volume[:top]]
        # Print universe details when live mode
        if self.LiveMode:
            self.MyLog(f"Coarse filter returned {len(symbols)} stocks.")
        return symbols

#-------------------------------------------------------------------------------
    def OnData(self, data):
        """Built-in event called on new data."""
        self.last_data = data
        # Catch first data (at 900 or later) to update model
        if self.Time.hour >= 9:
            update = False
            if self.last_update_time is None:
                update = True
            elif self.last_update_time.date() != self.Time.date():
                update = True
            if update:
                self.update_models = True
                self.last_update_time = self.Time

#-------------------------------------------------------------------------------
    def OnEndOfDay(self):
        """Built-in event called at the end of the day."""
        # Get the current market direction and volatility level
        direction = self.market_condition.direction
        volatility = self.market_condition.volatility

        # Save the daily ending portfolio value
        self.times.append(self.Time)
        self.navs.append(self.Portfolio.TotalPortfolioValue)
        self.market_volatilities.append(volatility)
        self.market_directions.append(direction)

        # Can we trade short?
        self.ttm_squeeze_short = False
        # previous_short = self.ttm_squeeze_short
        # if direction == 'bear' and 'volatile' not in volatility:
        #     self.ttm_squeeze_short = True
        # else:
        #     self.ttm_squeeze_short = False
        # if previous_short != self.ttm_squeeze_short:
        #     self.Log(f"TTM Squeeze short now {self.ttm_squeeze_short}")

        # Can we trade long?
        self.ttm_squeeze_long = True
        # previous_long = self.ttm_squeeze_long
        # if direction == 'bear' and volatility == 'volatile':
        #     self.ttm_squeeze_long = True
        # elif direction == 'neutral' and 'volatile' not in volatility:
        #     self.ttm_squeeze_long = True
        # elif direction == 'bull' and volatility == 'normal':
        #     self.ttm_squeeze_long = True
        # elif direction == 'strong bull':
        #     self.ttm_squeeze_long = True  
        # else:
        #     self.ttm_squeeze_long = False
        # if previous_long != self.ttm_squeeze_long:
        #     self.Log(f"TTM squeeze long now {self.ttm_squeeze_long}")

        # Plots
        if self.ttm_squeeze_short:
            value = 1
        else:
            value = 0
        self.Plot("Trade TTM Squeeze Short", "True", value)

        if self.ttm_squeeze_long:
            value = 1
        else:
            value = 0
        self.Plot("Trade TTM Squeeze Long", "True", value)

#------------------------------------------------------------------------------
    def OnEndOfAlgorithm(self):
        """Built-in event handler for end of the backtest."""
        # Save the portfolio values to the object store so we can evaluate
        #  them in the Research environment
        key = 'BB_V112L'
        d = {
            'time': self.times,
            'value': self.navs,
            'volatility': self.market_volatilities,
            'direction': self.market_directions
        }
        serialized = pickle.dumps(d)
        self.ObjectStore.SaveBytes(key, serialized)

###############################################################################
class CustomSecurityInitializer(BrokerageModelSecurityInitializer):
    def __init__(
        self, 
        brokerage_model: IBrokerageModel, 
        security_seeder: ISecuritySeeder
        ) -> None:
        super().__init__(brokerage_model, security_seeder)

    def Initialize(self, security: Security) -> None:
        """
        Define models to be used for securities as they are added to the 
        algorithm's universe.
        """
        # First, call the superclass definition
        super().Initialize(security)
        # Define the buying power model to use for the security
        security.SetBuyingPowerModel(SecurityMarginModel(1.0))
# Standard library imports
import datetime as DT
import math
# import numpy as np
# import pandas as pd
import pytz
import statistics
# QuantConnect specific imports
from AlgorithmImports import *

TIMEZONE = 'US/Eastern'

################################################################################
class MarketConditionData(object):
    """Class to store data for the market symbol."""
    # Global variables

    # # For market volatility (period = 10 days)
    # MARKET_ATR_PERIOD = 10
    # VOLATILITY_QUIET_MAX = 0.010
    # VOLATILITY_NORMAL_MAX = 0.021
    # VOLATILITY_VOLATILE_MAX = 0.043

    # For market volatility (period = 15 days)
    MARKET_ATR_PERIOD = 15
    VOLATILITY_QUIET_MAX = 0.010
    VOLATILITY_NORMAL_MAX = 0.021
    VOLATILITY_VOLATILE_MAX = 0.043

    # # For market volatility (period = 20 days)
    # MARKET_ATR_PERIOD = 20
    # VOLATILITY_QUIET_MAX = 0.011
    # VOLATILITY_NORMAL_MAX = 0.02
    # VOLATILITY_VOLATILE_MAX = 0.04

    # # For market direction (period = 50 days)
    # SQN_PERIOD = 50
    # DIRECTION_STRONG_BULL = 1.37
    # DIRECTION_BULL = 0.90
    # DIRECTION_STRONG_BEAR = -DIRECTION_STRONG_BULL

    # For market direction (period = 100 days)
    SQN_PERIOD = 100
    DIRECTION_STRONG_BULL = 1.43
    DIRECTION_BULL = 0.87
    DIRECTION_STRONG_BEAR = -DIRECTION_STRONG_BULL

    # # For market direction (period = 200 days)
    # SQN_PERIOD = 200
    # DIRECTION_STRONG_BULL = 1.60
    # DIRECTION_BULL = 0.90
    # DIRECTION_STRONG_BEAR = -DIRECTION_STRONG_BULL

    # Require confirmation bars
    CONFIRMATION_BARS = 5

    def __init__(self, algo, symbol_object):
        """Initialize MarketSymbolData object."""
        # Save a reference to the QCAlgorithm class
        self.algo = algo
        # Save the .Symbol object and the symbol's string
        self.symbol_object = symbol_object
        self.symbol = symbol_object.Value
        # Get the symbol's exchange market info
        self.get_exchange_info()
        # Add strategy specific variables
        self.add_strategy_variables()
        # Add the bars and indicators required
        self.add_bars()
        self.add_indicators()

#-------------------------------------------------------------------------------
    def get_exchange_info(self):
        """Get the security's exchange info."""
        # Get the SecurityExchangeHours Class object for the symbol
        self.exchange_hours = \
            self.algo.Securities[self.symbol_object].Exchange.Hours
            
        # Create a datetime I know the market was open for the full day
        dt = DT.datetime(2021, 1, 4)
        # Get the next open datetime from the SecurityExchangeHours Class
        mkt_open_dt = self.exchange_hours.GetNextMarketOpen(dt, False)
        # Save the typical (regualar session) market open and close times
        self.mkt_open = mkt_open_dt.time()
        mkt_close_dt = self.exchange_hours.GetNextMarketClose(dt, False)
        self.mkt_close = mkt_close_dt.time()
        
        # Get the exchange timezone
        self.mkt_tz = pytz.timezone(str(self.exchange_hours.TimeZone))
        # Create pytz timezone objects for the exchange tz and local tz
        exchange_tz = self.mkt_tz
        local_tz = pytz.timezone(TIMEZONE)
        # Get the difference in the timezones
        # REF: http://pytz.sourceforge.net/#tzinfo-api 
        #  for pytz timezone.utcoffset() method
        # 3600 seconds/hour
        exchange_utc_offset_hrs = int(exchange_tz.utcoffset(dt).seconds/3600)
        local_utc_offset_hrs = int(local_tz.utcoffset(dt).seconds/3600)
        self.offset_hrs = exchange_utc_offset_hrs-local_utc_offset_hrs
        # NOTE: offset hours are very helpful if you want to schedule functions
        #  around market open/close times
        
        # Get the market close time for the local time zone
        self.mkt_close_local_tz = \
            (mkt_close_dt-DT.timedelta(hours=self.offset_hrs)).time()

#-------------------------------------------------------------------------------
    def add_strategy_variables(self):
        """Add required variables for the market condition."""
        # Initialize
        self.direction = None
        self.volatility = None

#-------------------------------------------------------------------------------
    def add_bars(self):
        """Add bars required."""
        # Create the daily consolidator for the symbol
        self.calendar_initialized = False
        # daily_consolidator = TradeBarConsolidator(self.daily_calendar)
        daily_consolidator = TradeBarConsolidator(DT.timedelta(days=1))
        # Create an event handler to be called on each new consolidated bar
        daily_consolidator.DataConsolidated += self.on_daily_consolidated
        # Link daily_consolidator with our symbol and add it to the algo manager
        self.algo.SubscriptionManager.AddConsolidator(
            self.symbol_object, daily_consolidator
        )
        # Save daily_consolidator link so we can remove it when necessary
        # This is not necessary
        # self.daily_consolidator = daily_consolidator

#-------------------------------------------------------------------------------
    def add_indicators(self):
        """Add indicators and other required variables."""
        # Keep a list of indicators
        # Will add (indicator, update_method) tuples
        self.indicators = []

        # For volatility, we use the ATR
        self.atr = AverageTrueRange(self.MARKET_ATR_PERIOD)
        self.indicators.append((self.atr, 'bar'))
        # For the market direction we use the SQN
        self.sqn = SQN(self.SQN_PERIOD)
        self.indicators.append((self.sqn, 'bar'))

        # Keep track of 2 SMAs
        self.fast_ma = SimpleMovingAverage(20)
        self.indicators.append((self.fast_ma, 'close'))
        self.slow_ma = SimpleMovingAverage(60)
        self.indicators.append((self.slow_ma, 'close'))
        self.uptrend = False
        self.downtrend = False

        # Require confirmation rolling windows
        self.volatility_window = RollingWindow[str](self.CONFIRMATION_BARS)
        self.direction_window = RollingWindow[str](self.CONFIRMATION_BARS)

        # Keep track of the number of bars required to warmup indicators
        self.min_bars = int(max([self.MARKET_ATR_PERIOD*3, self.SQN_PERIOD]))+1
        self.bar_window = RollingWindow[TradeBar](self.min_bars)

        # Warm up the indicators with historical data
        self.warmup_indicators()

#-------------------------------------------------------------------------------
    def daily_calendar(self, dt):
        """
        Set up daily consolidator calendar info.
        This should return a start datetime object that is timezone unaware
        with a valid date/time for the desired securities' exchange's time zone.
        
        Useful Refs:
        datetime.replace() method:
        https://docs.python.org/3/library/datetime.html#datetime.datetime.replace
        """
        # Need to handle case where algo initializes and this function is called
        #  for the first time.
        if not self.calendar_initialized:
            # Since this doesn't matter, we'll pass dt as start and one day 
            # as the timedelta until end_dt
            start_dt = dt
            end_dt = start_dt + DT.timedelta(1)
            self.calendar_initialized = True
            return CalendarInfo(start_dt, end_dt-start_dt)

        # Create a datetime.datetime object to represent the market open for the
        # **EXCHANGE** timezone
        start = dt.replace(
            hour=self.mkt_open.hour, 
            minute=self.mkt_open.minute,
            second=0, 
            microsecond=0
        )
        # Get today's end time from the SecurityExchangeHours Class object
        end = self.exchange_hours.GetNextMarketClose(start, False)
        # Catch when start is after the passed dt
        # QC now throws an error in this case
        if start > dt:
            # To handle the QC error, pass period for no data
            # Set the end to be the next desired start
            end = start
            # And set start to dt to avoid QC throwing error
            start = dt
            # This will result in the next dt being the desired start time

        # start = dt.replace(hour=9, minute=30, second=0, microsecond=0)
        # end = dt.replace(hour=16, minute=0, second=0, microsecond=0)

        # Return the start datetime and the consolidation period
        return CalendarInfo(start, end-start)

#-------------------------------------------------------------------------------
    def on_daily_consolidated(self, sender, bar):
        """Event handler for daily bars."""
        # Manually update all of the daily indicators
        self.update_daily_indicators(bar)

#-------------------------------------------------------------------------------
    def update_daily_indicators(self, bar):
        """Manually update all of the symbol's daily bar indicators."""
        # Loop through all indicators
        for indicator, update_method in self.indicators:
            if update_method == 'close':
                indicator.Update(bar.EndTime, bar.Close)
            elif update_method == 'bar':
                indicator.Update(bar)
        # Add the bar to the bar window
        self.bar_window.Add(bar)
        # Check if the indicators are ready
        if self.indicators_ready:
            # Get the current market volatility
            atr_pct = self.atr.Current.Value/bar.Close
            previous_volatility = self.volatility
            volatility = self.get_market_volatility(atr_pct)
            self.volatility_window.Add(volatility)
            if self.volatility_window.IsReady:
                # Check if the volatility values are all the same
                if len(set(list(self.volatility_window))) == 1:
                    self.volatility = volatility
            if self.volatility:
                # Check for change
                if previous_volatility != self.volatility:
                    self.algo.Log(
                        f"Market volatility change from {previous_volatility} to "
                        f"{self.volatility}",
                    )

            # Get the current market direction
            previous_direction = self.direction
            sqn = self.sqn.Value
            direction = self.get_market_direction(sqn)
            self.direction_window.Add(direction)
            if self.direction_window.IsReady:
                # Check if the direction values are all the same
                if len(set(list(self.direction_window))) == 1:
                    self.direction = direction
            if self.direction:
                # Check for change
                if previous_direction != self.direction:
                    self.algo.Log(
                        f"Market direction change from {previous_direction} to "
                        f"{self.direction}",
                    )

        if self.fast_ma.IsReady and self.slow_ma.IsReady:
            fast = self.fast_ma.Current.Value
            slow = self.slow_ma.Current.Value
            self.uptrend = fast > slow
            self.downtrend = fast < slow

#-------------------------------------------------------------------------------
    def reset_indicators(self):
        """Manually reset all of the indicators.""" 
        # Loop through all indicators
        for indicator, update_method in self.indicators:
            indicator.Reset()
        # Reset the bar window
        self.bar_window.Reset()

#-------------------------------------------------------------------------------
    def adjust_indicators(self, adjustment):
        """Adjust all indicators for splits or dividends."""  
        # Get a list of the current bars
        bars = list(self.bar_window)
        # Current order is newest to oldest (default for rolling window)
        # Reverse the list to be oldest to newest
        bars.reverse()
        # Reset all indicators
        self.reset_indicators()
        # Loop through the bars from oldest to newest
        for bar in bars:
            # Adjust the bar by the adjustment factor
            bar.Open *= adjustment
            bar.High *= adjustment
            bar.Low *= adjustment
            bar.Close *= adjustment
            # Use the bar to update the indicators
            # This also adds the bar to the rolling window
            self.update_daily_indicators(bar)

#-------------------------------------------------------------------------------
    def warmup_indicators(self):
        """Warm up indicators using historical data.""" 
        # Update warmup variable, so we don't try to take any signals
        self.warming_up = True
        # Get historical daily trade bars
        daily_bars = self.algo.History[TradeBar](
            self.symbol_object, 
            int(1.5*self.min_bars),
            Resolution.Daily
            )
        # Loop through the bars and update the consolidator
        for bar in daily_bars:
            # self.consolidator1.Update(bar) # don't use method with daily bars
            # Instead pass the bar directly to the event handler
            self.on_daily_consolidated(None, bar)

#-------------------------------------------------------------------------------
    @ property
    def indicators_ready(self):
        """Check if all of the indicators used are ready (warmed up)."""
        # Loop through all indicators
        for indicator, update_method in self.indicators:
            # Return False if the indicator is not ready
            if not indicator.IsReady:
                return False
        # Check rolling windows
        if not self.bar_window.IsReady:
            return False
        # Otherwise return True
        return True

#-------------------------------------------------------------------------------
    def get_market_volatility(self, atr_pct):
        # Check if nan
        if math.isnan(atr_pct):
            return 'nan'
        # Check if the current level is at or below the quiet level
        elif atr_pct <= self.VOLATILITY_QUIET_MAX:
            return 'quiet'
        # Otherwise check if the current level is at or below the normal max level
        elif atr_pct <= self.VOLATILITY_NORMAL_MAX:
            return 'normal'
        # Otherwise check if the current level is at or below the volatile max level
        elif atr_pct <= self.VOLATILITY_VOLATILE_MAX:
            return 'volatile'
        # Otherwise very volatile
        return 'very volatile'

#-------------------------------------------------------------------------------
    def get_market_direction(self, sqn):
        # Check if nan
        if math.isnan(sqn):
            return 'nan'
        # Check if the current level is above the strong bull level
        elif sqn > self.DIRECTION_STRONG_BULL:
            return 'strong bull'
        # Otherwise check if the current level is at or above the bull level
        elif sqn >= self.DIRECTION_BULL:
            return 'bull'
        # Otherwise check if the current level is at or above 0
        elif sqn >= 0:
            return 'neutral'
        # Otherwise check if the current level is at or above the strong bear level
        elif sqn >= self.DIRECTION_STRONG_BEAR:
            return 'bear'
        # Otherwise strong bear
        return 'strong bear'

################################################################################
class SQN:
    """Custom 'System Quality Number' indicator."""
    def __init__(self, period):
        """Initialize the indicator."""
        self.Name = "SQN" # may want to customize this
        self.Time = datetime.min
        self.Value = 0
        self.IsReady = False

        # Create required indicators to calculate this indicator
        self.percent_changes = RollingWindow[float](period)
        self.previous_close = None
        self.mean = None
        self.std = None
        self.multiplier = min(10, math.sqrt(period))

#-------------------------------------------------------------------------------
    def __repr__(self):
        """
        Returns the object representation in string format.
        Called via repr() on the object.
        """
        return f"{self.Name} -> IsReady: {self.IsReady}. Time: {self.Time}. " + \
            f"Value: {self.Value}"

#-------------------------------------------------------------------------------
    def Reset(self):
        """Reset the indicator."""
        # Reset the rolling window
        self.percent_changes.Reset()

#-------------------------------------------------------------------------------
    def Update(self, input):
        """
        Update the indicator with the input. 
        This is a required function for custom indicators!
        """
        # Get the price
        price = input.Close
        # Check if we have a previous close
        if self.previous_close:
            # Calculate the percent change and add to the rolling window
            percent_change = (price-self.previous_close)/self.previous_close
            self.percent_changes.Add(percent_change)
            # Check if we have enough percent changes
            if self.percent_changes.IsReady:
                # Calculate the SQN
                percent_changes = list(self.percent_changes)
                # percent_changes.reverse() # to put in oldest to newest order
                self.mean = statistics.mean(percent_changes)
                self.std = statistics.stdev(percent_changes)
                self.sqn = (self.mean/self.std)*self.multiplier
                self.IsReady = True
                self.Value = self.sqn
        # Save the close
        self.previous_close = input.Close
from AlgorithmImports import *
from datetime import timedelta

###############################################################################
class MyEqualWeightingPortfolioConstructionModel(PortfolioConstructionModel):
    '''Provides an implementation of IPortfolioConstructionModel that gives equal weighting to all securities.
    The target percent holdings of each security is 1/N where N is the number of securities.
    For insights of direction InsightDirection.Up, long targets are returned and
    for insights of direction InsightDirection.Down, short targets are returned.'''

    def __init__(self, algo, rebalance = Resolution.Daily, portfolioBias = PortfolioBias.LongShort):
        '''Initialize a new instance of EqualWeightingPortfolioConstructionModel
        Args:
            rebalance: Rebalancing parameter. If it is a timedelta, date rules or Resolution, it will be converted into a function.
                              If None will be ignored.
                              The function returns the next expected rebalance time for a given algorithm UTC DateTime.
                              The function returns null if unknown, in which case the function will be called again in the
                              next loop. Returning current time will trigger rebalance.
            portfolioBias: Specifies the bias of the portfolio (Short, Long/Short, Long)'''
        super().__init__()
        self.algo = algo
        self.portfolioBias = portfolioBias

        # If the argument is an instance of Resolution or Timedelta
        # Redefine rebalancingFunc
        rebalancingFunc = rebalance
        if isinstance(rebalance, int):
            rebalance = Extensions.ToTimeSpan(rebalance)
        if isinstance(rebalance, timedelta):
            rebalancingFunc = lambda dt: dt + rebalance
        if rebalancingFunc:
            self.SetRebalancingFunc(rebalancingFunc)

    def DetermineTargetPercent(self, activeInsights):
        '''Will determine the target percent for each insight
        Args:
            activeInsights: The active insights to generate a target for'''
        result = {}
        algo = self.algo

        # Check if daily resolution
        if not algo.daily:
            # Skip if not time for update
            if not algo.update_models:
                return result

        # give equal weighting to each security
        # Get long and short counts
        count_short = sum(x.Direction == InsightDirection.Down and self.RespectPortfolioBias(x) for x in activeInsights)
        count_long = sum(x.Direction == InsightDirection.Up and self.RespectPortfolioBias(x) for x in activeInsights)
        # Get total counts based on what's allowed
        count = 0
        if algo.ttm_squeeze_short:
            count += count_short
        if algo.ttm_squeeze_long:
            count += count_long
        percent = 0 if count == 0 else 1.0 / count

        # Get long and short percentages
        if algo.ttm_squeeze_short:
            short_percent = percent
        else:
            short_percent = 0

        if algo.ttm_squeeze_long:
            long_percent = percent
        else:
            long_percent = 0

        for insight in activeInsights:
            if insight.Direction == InsightDirection.Up:
                result[insight] = (insight.Direction if self.RespectPortfolioBias(insight) else InsightDirection.Flat) * long_percent
        
            if insight.Direction == InsightDirection.Down:
                result[insight] = (insight.Direction if self.RespectPortfolioBias(insight) else InsightDirection.Flat) * short_percent
        
        return result

    def RespectPortfolioBias(self, insight):
        '''Method that will determine if a given insight respects the portfolio bias
        Args:
            insight: The insight to create a target for
        '''
        return self.portfolioBias == PortfolioBias.LongShort or insight.Direction == self.portfolioBias
from AlgorithmImports import *
import talib
from datetime import timedelta

###############################################################################
class TTMAlphaModel(AlphaModel):
    Name = "TTM"
    def __init__(self, algo, period=20):
        self.algo = algo
        self.period = period
        self.prev_squeeze = {}

    def Update(self, algorithm, data):
        insights = []
        algo = self.algo
        # Get current time
        current_time = algorithm.Time

        # Check if daily resolution
        if not algo.daily:
            # Skip if not time for update
            if not algo.update_models:
                return insights

        # Get historical data for universe
        universe = algorithm.UniverseManager.ActiveSecurities
        for security in universe:
            try:
                # Optimal for long signals
                history = algorithm.History(security.Value.Symbol, 55, Resolution.Daily)
                # Optimal for short signals
                history2 = algorithm.History(security.Value.Symbol, 30, Resolution.Daily)

                # Calculate indicators
                bb_upper, _, bb_lower = talib.BBANDS(history['close'], timeperiod=self.period)
                bb_upper2, _, bb_lower2 = talib.BBANDS(history2['close'], timeperiod=self.period)
                kama = talib.KAMA(history['close'], timeperiod=self.period)
                kama2 = talib.KAMA(history2['close'], timeperiod=self.period)
                atr = talib.ATR(history['high'], history['low'], history['close'], timeperiod=20)
                atr2 = talib.ATR(history2['high'], history2['low'], history2['close'], timeperiod=20)
                mom = talib.MOM(history['close'], timeperiod=20)
                mom2 = talib.MOM(history2['close'], timeperiod=20)
                if len(mom) < 5:
                    continue
                if len(mom2) < 5:
                    continue
                smoothed_mom = mom.rolling(5).mean()
                smoothed_mom2 = mom2.rolling(5).mean()
                kc_upper = kama + (1.5 * atr)
                kc_lower = kama - (1.5 * atr)
                kc_upper2 = kama2 + (1.5 * atr2)
                kc_lower2 = kama2 - (1.5 * atr2)

                # Calculate TTM Squeeze
                if bb_upper[-1] < kc_upper[-1] and bb_lower[-1] > kc_lower[-1]:
                    squeeze = True
                else:
                    squeeze = False
                if bb_upper[-2] < kc_upper[-2] and bb_lower[-2] > kc_lower[-2]:
                    prev_squeeze = True
                else:
                    prev_squeeze = False 

                if bb_upper2[-1] < kc_upper2[-1] and bb_lower2[-1] > kc_lower2[-1]:
                    squeeze2 = True
                else:
                    squeeze2 = False
                if bb_upper2[-2] < kc_upper2[-2] and bb_lower2[-2] > kc_lower2[-2]:
                    prev_squeeze2 = True
                else:
                    prev_squeeze2 = False 

                mom_bullish = smoothed_mom[-1] > smoothed_mom[-2] and smoothed_mom[-1] > 0 and smoothed_mom[-2] > 0 #Blue
                mom_bullish_stop = smoothed_mom[-1] < smoothed_mom[-2] #Dark Blue

                mom_bearish = smoothed_mom2[-1] < smoothed_mom2[-2] and smoothed_mom2[-1] < 0 and smoothed_mom2[-2] < 0 #Red
                mom_bearish_stop = smoothed_mom2[-1] > smoothed_mom2[-2] #Yellow                

                # Check for signals
                if mom_bullish:
                    if squeeze and prev_squeeze:
                        insights.append(Insight.Price(security.Value.Symbol, timedelta(30), InsightDirection.Up))
                if mom_bearish:                       
                    if squeeze2 and prev_squeeze2:
                        insights.append(Insight.Price(security.Value.Symbol, timedelta(30), InsightDirection.Down))               
                if algorithm.Portfolio[security.Value.Symbol].Invested:
                    if algorithm.Portfolio[security.Value.Symbol].IsLong and mom_bullish_stop:
                        insights.append(Insight.Price(security.Value.Symbol, timedelta(1), InsightDirection.Flat))                    
                        #algorithm.Liquidate(security.Value.Symbol.Value, "Liquidated exit short")
                    elif algorithm.Portfolio[security.Value.Symbol].IsShort and mom_bearish_stop:
                        insights.append(Insight.Price(security.Value.Symbol, timedelta(1), InsightDirection.Flat))
                        #algorithm.Liquidate(security.Value.Symbol.Value, "Liquidated exit short")                                            
            except:
                continue

        return insights

###############################################################################
class MyMaximumDrawdownPercentPerSecurity(RiskManagementModel):
    '''
    Provides an implementation of IRiskManagementModel that limits the drawdown 
    per holding to the specified percentage
    REF: https://github.com/QuantConnect/Lean/blob/master/Algorithm.Framework/
    Risk/MaximumDrawdownPercentPerSecurity.py
    '''
    def __init__(self, algo, max_dd_pct=0.15):
        '''
        Initializes a new instance of the MaximumDrawdownPercentPerSecurity class
        Args:
            max_dd_pct: The maximum percentage drawdown allowed 
             for any single security holding
        '''
        self.algo = algo
        self.max_dd_pct = -abs(max_dd_pct)
        # self.constant_check_multiple = constant_check_multiple

    def ManageRisk(self, algorithm, targets):
        '''
        Manages the algorithm's risk at each time step
        Args:
            algorithm: The algorithm instance
            targets: The current portfolio targets to be assessed for risk

        DO NOT USE algorithm - QuantConnect.Algorithm.QCAlgorithm object
        INSTEAD USE self.algo - main.MeanReversionAlgorithm object
        '''
        algo = self.algo
        targets = []

# Test this frequency

        # Check if daily resolution
        if not algo.daily:
            # Skip if not time for update
            if not algo.update_models:
                return targets

        # Loop through securities
        for kvp in algo.Securities:
            security = kvp.Value
            symbol_object = security.Symbol
            symbol = str(symbol_object).split(" ")[0]
            if not security.Invested:
                continue
            pnl = security.Holdings.UnrealizedProfitPercent
            if pnl < self.max_dd_pct:
                # Cancel insights
                algorithm.Insights.Cancel([symbol_object])
                # Liquidate
                targets.append(PortfolioTarget(symbol_object, 0))
        return targets