Overall Statistics |
Total Orders 1235 Average Win 0.48% Average Loss -0.23% Compounding Annual Return 15.346% Drawdown 21.300% Expectancy 0.099 Start Equity 100000 End Equity 108560.73 Net Profit 8.561% Sharpe Ratio 0.332 Sortino Ratio 0.338 Probabilistic Sharpe Ratio 32.003% Loss Rate 65% Win Rate 35% Profit-Loss Ratio 2.13 Alpha 0.184 Beta -0.503 Annual Standard Deviation 0.291 Annual Variance 0.085 Information Ratio -0.24 Tracking Error 0.321 Treynor Ratio -0.192 Total Fees $2083.15 Estimated Strategy Capacity $13000000.00 Lowest Capacity Asset FDX R735QTJ8XC9X Portfolio Turnover 63.42% |
# Imports import pickle from scipy.stats import norm, zscore import traceback # QuantConnect specific imports from AlgorithmImports import * import QuantConnect as qc # Import from files from market_condition import MarketConditionData from portfolio import MyPortfolioConstructionModel from risk import MultipleStrategyRiskManagement from ttm_models import TTMAlphaModel from zscore_models import ZScoreAlphaModel """ AE Revision Notes (09/01/2023) - Added logs to help with live trading - Updated ttm_models.py to not emit flat insights for ZScore positions. (09/05/2023) - Calling EndOfDayFunction via scheduled events. - Added log for liquidating stop dropped out of universe. (09/08/2023) - Updated to save the target portfolio allocation to the Object Store. (09/15/2023) - Updated portfolio.py Updated target allocation to skip symbols is weight is 0. This is the case when the strategy/direction is not allowed. Also including insights with weights for not new TTM signals. (10/03/2023) - Changed portfolio.py str(symbol) to str(symbol.ID). Waiting to confirm: check TTM model when one day go flat signal - how does that affect target_allocation """ ############################################################################### class CustomAlgorithm(QCAlgorithm): def Initialize(self): # Set starting date, cash and ending date of the backtest self.SetStartDate(2023, 9, 26) # self.SetEndDate(2017, 3, 31) self.SetCash(100000) self.SetTimeZone('US/Eastern') self.SetSecurityInitializer( CustomSecurityInitializer( self.BrokerageModel, FuncSecuritySeeder(self.GetLastKnownPrices) ) ) # Add market symbol and create market condition instance self.bm = self.AddEquity("SPY", Resolution.Hour).Symbol self.bm_hours = self.Securities[self.bm].Exchange.Hours self.market_condition = MarketConditionData(self, self.bm) # Universe selection self.AddUniverse(self.CoarseSelectionFunction) self.UniverseSettings.Resolution = Resolution.Hour self.UniverseSettings.ExtendedMarketHours = True # Add zscore models # zscore weight! self.zscore_weight = 0.70 if self.zscore_weight > 0: self.AddAlpha(ZScoreAlphaModel(self)) self.zscore_symbols = [] self.previous_zscore_symbols = [] self.trade_zscore = True # Add TTM models self.ttm_weight = 1.0-self.zscore_weight if self.ttm_weight > 0: self.AddAlpha(TTMAlphaModel(self)) self.ttm_squeeze_long = False # initialize as False self.ttm_squeeze_short = False # initialize as False self.update_models = False self.last_update_time = None # Set the portfolio construction model self.SetPortfolioConstruction(MyPortfolioConstructionModel(self)) # Set the risk management model self.AddRiskManagement(MultipleStrategyRiskManagement(self)) # Set the execution model self.SetExecution(ImmediateExecutionModel()) # Schedule function self.Schedule.On( self.DateRules.EveryDay(self.bm), self.TimeRules.At(23,50), self.EndOfDayFunction ) # Other settings self.Settings.MinAbsolutePortfolioTargetPercentage = \ 0.00000000000000000000001 self.Settings.RebalancePortfolioOnInsightChanges = True self.Settings.RebalancePortfolioOnSecurityChanges = False self.Settings.FreePortfolioValuePercentage = 0.05 # Save daily portfolio values self.times = [] self.navs = [] self.market_volatilities = [] self.market_directions = [] # Always run EndOfDayFunction immediately when trading live if self.LiveMode: self.EndOfDayFunction() #------------------------------------------------------------------------------- def CoarseSelectionFunction(self, coarse): stocks = list(filter(lambda x: x.HasFundamentalData, coarse)) sortedByDollarVolume = sorted( stocks, key=lambda x: x.DollarVolume, reverse=True ) symbols = [x.Symbol for x in sortedByDollarVolume[:50]] # Print universe details when live mode if self.LiveMode: self.MyLog(f"Coarse filter returned {len(symbols)} stocks.") return symbols #------------------------------------------------------------------------------- def OnSecuritiesChanged(self, changes): # liquidate removed securities for security in changes.RemovedSecurities: symbol = security.Symbol # Only liquidate for the Zscore strategy if symbol in self.zscore_symbols \ or symbol in self.previous_zscore_symbols: if security.Invested: if self.LiveMode: self.MyLog( f"Liquidating {symbol.Value} since it's a zscore " "symbol that's dropped out of the universe." ) self.Liquidate(security.Symbol) #------------------------------------------------------------------------------- def OnData(self, data): """Built-in event called on new data.""" # Only necessary for ttm if self.ttm_weight > 0: # Catch first data (at 900 or later) to update model if self.Time.hour >= 9: update = False if self.last_update_time is None: update = True elif self.last_update_time.date() != self.Time.date(): update = True if update: self.update_models = True self.last_update_time = self.Time else: self.update_models = False #------------------------------------------------------------------------------- def MyLog(self, message): """Add algo time to log if live trading. Otherwise just log message.""" # Log all messages in live trading mode with local time added if self.LiveMode: self.Log(f'{self.Time}: {message}') else: self.Log(message) #------------------------------------------------------------------------------- def ResubmitOrder(self, order, msg): """Built-in event handler for orders.""" if type(order) == qc.Orders.MarketOrder \ or type(order) == qc.Orders.MarketOnOpenOrder: order_type = 'Market' elif type(order) == qc.Orders.LimitOrder: order_type = 'Limit' # Get the limit price limit_price = order.LimitPrice else: self.MyLog( f"Invalid Order, but not a market or limit order! Order type=" f"{type(order)}" ) return # Get the order message, symbol, and qty self.MyLog( f"Invalid {order_type} Order! error: {msg}" ) symbol = order.Symbol order_qty = int(order.Quantity) # Check for insufficient buying power if 'Insufficient buying power' in msg: # Get the initial margin and free margin initial_margin = float( msg.split("Initial Margin: ")[1].split(",")[0] ) free_margin = float( msg.split("Free Margin: ")[1].split(",")[0].strip('.') ) # Get the max allowed position size margin_per_share = abs(initial_margin/order_qty) max_shares = int(abs((0.95*free_margin/margin_per_share))) # Check for 'desired position your previous day...' error elif 'DESIRED POSITION YOUR PREVIOUS DAY EQUITY WITH LOAN VALUE' in msg: # Get the initial margin and previous day equity loan value initial_margin = float( msg.split("INITIAL MARGIN [")[1].split("USD")[0].replace(' ','') ) loan_value = float( msg.split("LOAN VALUE [")[1].split("USD")[0].replace(' ','') ) # Get the max allowed position size margin_per_share = abs(initial_margin/order_qty) max_shares = int(abs((0.95*loan_value/margin_per_share))) else: self.MyLog(f"Unrecognized error message: {msg}") self.MyLog(f"Will try to reduce order qty by 50%") # Try to cut order size in half max_shares = int(order_qty*0.5) # Get new qty if order_qty < 0: order_qty = -abs(max_shares) else: order_qty = abs(max_shares) self.MyLog( f"Initial number of shares exceeds margin requirements! Reducing " f"order qty to {order_qty}" ) # Resubmit an order with a reduced qty if order_type == 'Market': self.MarketOrder(symbol, order_qty, asynchronous=True) elif order_type == 'Limit': self.MyLog(f"Limit price={limit_price}") order = self.LimitOrder(symbol, order_qty, limit_price) # Add order to the list of limit orders # self.limit_orders.append(order) #------------------------------------------------------------------------------- def OnOrderEvent(self, orderEvent): """Built-in event handler for orders.""" # Log message if self.LiveMode: self.MyLog( f"New order event: {orderEvent}, Status={orderEvent.Status}" ) # Catch invalid order if orderEvent.Status == OrderStatus.Invalid: try: # Resubmit a new order order = self.Transactions.GetOrderById(orderEvent.OrderId) # ticket = self.Transactions.GetOrderTicket(orderEvent.OrderId) # response = ticket.GetMostRecentOrderResponse() msg = orderEvent.get_Message() # Only resubmit order if for zscore symbol = order.Symbol if symbol in self.zscore_symbols \ or symbol in self.previous_zscore_symbols: self.ResubmitOrder(order, msg) except: if self.LiveMode: self.MyLog( f'OnOrderEvent() exception: {traceback.format_exc()}' ) #------------------------------------------------------------------------------- def EndOfDayFunction(self): """Event called at the end of the day.""" if self.LiveMode: self.MyLog("Running EndOfDayFunction") # Get the current market direction and volatility level direction = self.market_condition.direction volatility = self.market_condition.volatility # Save the daily ending portfolio value self.times.append(self.Time) self.navs.append(self.Portfolio.TotalPortfolioValue) self.market_volatilities.append(volatility) self.market_directions.append(direction) # Check if we can trade the zscore algo # We always trade zscore right now - filtering based on regimes significantly lowers returns # Check if we can trade the TTM algo # Can we trade short? previous_short = self.ttm_squeeze_short if direction == 'bear' and volatility == 'normal': self.ttm_squeeze_short = True elif direction == 'bull' and volatility == 'quiet': self.ttm_squeeze_short = True else: self.ttm_squeeze_short = False if self.LiveMode or (previous_short != self.ttm_squeeze_short): self.Log(f"TTM Squeeze short {self.ttm_squeeze_short}") # Can we trade long? previous_long = self.ttm_squeeze_long if direction == 'neutral' and volatility == 'quiet': self.ttm_squeeze_long = True elif direction == 'bear' and volatility == 'volatile': self.ttm_squeeze_long = True elif direction == 'bull' and volatility == 'normal': self.ttm_squeeze_long = True elif direction == 'neutral' and volatility == 'normal': self.ttm_squeeze_long = True else: self.ttm_squeeze_long = False if self.LiveMode or (previous_long != self.ttm_squeeze_long): self.Log(f"TTM squeeze long {self.ttm_squeeze_long}") # Create plot if self.trade_zscore: value = 1 else: value = 0 self.Plot("Trade ZScore", "True", value) if self.ttm_squeeze_short: value = 1 else: value = 0 self.Plot("Trade TTM Squeeze Short", "True", value) if self.ttm_squeeze_long: value = 1 else: value = 0 self.Plot("Trade TTM Squeeze Long", "True", value) #------------------------------------------------------------------------------- def OnEndOfAlgorithm(self): """Built-in event handler for end of the backtest.""" # Save the portfolio values to the object store so we can evaluate # them in the Research environment key = 'ZScore' d = { 'time': self.times, 'value': self.navs, 'volatility': self.market_volatilities, 'direction': self.market_directions } serialized = pickle.dumps(d) self.ObjectStore.SaveBytes(key, serialized) ############################################################################### class CustomSecurityInitializer(BrokerageModelSecurityInitializer): def __init__( self, brokerage_model: IBrokerageModel, security_seeder: ISecuritySeeder ) -> None: super().__init__(brokerage_model, security_seeder) def Initialize(self, security: Security) -> None: """ Define models to be used for securities as they are added to the algorithm's universe. """ # First, call the superclass definition # Sets the reality models of each security using the default models # of the brokerage model super().Initialize(security) # Define the buying power model to use for the security security.SetBuyingPowerModel(SecurityMarginModel(1.0))
# Standard library imports import datetime as DT import math # import numpy as np # import pandas as pd import pytz import statistics # QuantConnect specific imports from AlgorithmImports import * # Previously from notes_and_inputs.py TIMEZONE = 'US/Eastern' ################################################################################ class MarketConditionData(object): """Class to store data for the market symbol.""" # Global variables # # For market volatility (period = 10 days) # MARKET_ATR_PERIOD = 10 # VOLATILITY_QUIET_MAX = 0.010 # VOLATILITY_NORMAL_MAX = 0.021 # VOLATILITY_VOLATILE_MAX = 0.043 # For market volatility (period = 15 days) MARKET_ATR_PERIOD = 15 VOLATILITY_QUIET_MAX = 0.010 VOLATILITY_NORMAL_MAX = 0.021 VOLATILITY_VOLATILE_MAX = 0.043 # # For market volatility (period = 20 days) # MARKET_ATR_PERIOD = 20 # VOLATILITY_QUIET_MAX = 0.011 # VOLATILITY_NORMAL_MAX = 0.02 # VOLATILITY_VOLATILE_MAX = 0.04 # # For market direction (period = 50 days) # SQN_PERIOD = 50 # DIRECTION_STRONG_BULL = 1.37 # DIRECTION_BULL = 0.90 # DIRECTION_STRONG_BEAR = -DIRECTION_STRONG_BULL # For market direction (period = 100 days) SQN_PERIOD = 100 DIRECTION_STRONG_BULL = 1.43 DIRECTION_BULL = 0.87 DIRECTION_STRONG_BEAR = -DIRECTION_STRONG_BULL # # For market direction (period = 200 days) # SQN_PERIOD = 200 # DIRECTION_STRONG_BULL = 1.60 # DIRECTION_BULL = 0.90 # DIRECTION_STRONG_BEAR = -DIRECTION_STRONG_BULL # Require confirmation bars CONFIRMATION_BARS = 5 def __init__(self, algo, symbol_object): """Initialize MarketSymbolData object.""" # Save a reference to the QCAlgorithm class self.algo = algo # Save the .Symbol object and the symbol's string self.symbol_object = symbol_object self.symbol = symbol_object.Value # Get the symbol's exchange market info self.get_exchange_info() # Add strategy specific variables self.add_strategy_variables() # Add the bars and indicators required self.add_bars() self.add_indicators() #------------------------------------------------------------------------------- def get_exchange_info(self): """Get the security's exchange info.""" # Get the SecurityExchangeHours Class object for the symbol self.exchange_hours = \ self.algo.Securities[self.symbol_object].Exchange.Hours # Create a datetime I know the market was open for the full day dt = DT.datetime(2021, 1, 4) # Get the next open datetime from the SecurityExchangeHours Class mkt_open_dt = self.exchange_hours.GetNextMarketOpen(dt, False) # Save the typical (regualar session) market open and close times self.mkt_open = mkt_open_dt.time() mkt_close_dt = self.exchange_hours.GetNextMarketClose(dt, False) self.mkt_close = mkt_close_dt.time() # Get the exchange timezone self.mkt_tz = pytz.timezone(str(self.exchange_hours.TimeZone)) # Create pytz timezone objects for the exchange tz and local tz exchange_tz = self.mkt_tz local_tz = pytz.timezone(TIMEZONE) # Get the difference in the timezones # REF: http://pytz.sourceforge.net/#tzinfo-api # for pytz timezone.utcoffset() method # 3600 seconds/hour exchange_utc_offset_hrs = int(exchange_tz.utcoffset(dt).seconds/3600) local_utc_offset_hrs = int(local_tz.utcoffset(dt).seconds/3600) self.offset_hrs = exchange_utc_offset_hrs-local_utc_offset_hrs # NOTE: offset hours are very helpful if you want to schedule functions # around market open/close times # Get the market close time for the local time zone self.mkt_close_local_tz = \ (mkt_close_dt-DT.timedelta(hours=self.offset_hrs)).time() #------------------------------------------------------------------------------- def add_strategy_variables(self): """Add required variables for the market condition.""" # Initialize self.direction = None self.volatility = None #------------------------------------------------------------------------------- def add_bars(self): """Add bars required.""" # Create the daily consolidator for the symbol self.calendar_initialized = False daily_consolidator = TradeBarConsolidator(self.daily_calendar) # daily_consolidator = TradeBarConsolidator(DT.timedelta(days=1)) # Create an event handler to be called on each new consolidated bar daily_consolidator.DataConsolidated += self.on_daily_consolidated # Link daily_consolidator with our symbol and add it to the algo manager self.algo.SubscriptionManager.AddConsolidator( self.symbol_object, daily_consolidator ) # Save daily_consolidator link so we can remove it when necessary # This is not necessary # self.daily_consolidator = daily_consolidator #------------------------------------------------------------------------------- def add_indicators(self): """Add indicators and other required variables.""" # Keep a list of indicators # Will add (indicator, update_method) tuples self.indicators = [] # For volatility, we use the ATR self.atr = AverageTrueRange(self.MARKET_ATR_PERIOD) self.indicators.append((self.atr, 'bar')) # For the market direction we use the SQN self.sqn = SQN(self.SQN_PERIOD) self.indicators.append((self.sqn, 'bar')) # Require confirmation rolling windows self.volatility_window = RollingWindow[str](self.CONFIRMATION_BARS) self.direction_window = RollingWindow[str](self.CONFIRMATION_BARS) # Keep track of the number of bars required to warmup indicators self.min_bars = int(max([self.MARKET_ATR_PERIOD*3, self.SQN_PERIOD]))+1 self.bar_window = RollingWindow[TradeBar](self.min_bars) # Warm up the indicators with historical data self.warmup_indicators() #------------------------------------------------------------------------------- def daily_calendar(self, dt): """ Set up daily consolidator calendar info. This should return a start datetime object that is timezone unaware with a valid date/time for the desired securities' exchange's time zone. Useful Refs: datetime.replace() method: https://docs.python.org/3/library/datetime.html#datetime.datetime.replace """ # Need to handle case where algo initializes and this function is called # for the first time. if not self.calendar_initialized: # Since this doesn't matter, we'll pass dt as start and one day # as the timedelta until end_dt start_dt = dt end_dt = start_dt + DT.timedelta(1) self.calendar_initialized = True return CalendarInfo(start_dt, end_dt-start_dt) # Create a datetime.datetime object to represent the market open for the # **EXCHANGE** timezone start = dt.replace( hour=self.mkt_open.hour, minute=self.mkt_open.minute, second=0, microsecond=0 ) # Get today's end time from the SecurityExchangeHours Class object end = self.exchange_hours.GetNextMarketClose(start, False) # Catch when start is after the passed dt # QC now throws an error in this case if start > dt: # To handle the QC error, pass period for no data # Set the end to be the next desired start end = start # And set start to dt to avoid QC throwing error start = dt # This will result in the next dt being the desired start time # start = dt.replace(hour=9, minute=30, second=0, microsecond=0) # end = dt.replace(hour=16, minute=0, second=0, microsecond=0) # Return the start datetime and the consolidation period return CalendarInfo(start, end-start) #------------------------------------------------------------------------------- def on_daily_consolidated(self, sender, bar): """Event handler for daily bars.""" # Skip if not during regular trading hours if not self.warming_up: if bar.Time.time() < self.mkt_open: return elif bar.EndTime.time() > self.mkt_close: return # Manually update all of the daily indicators self.update_daily_indicators(bar) #------------------------------------------------------------------------------- def update_daily_indicators(self, bar): """Manually update all of the symbol's daily bar indicators.""" # Loop through all indicators for indicator, update_method in self.indicators: if update_method == 'close': indicator.Update(bar.EndTime, bar.Close) elif update_method == 'bar': indicator.Update(bar) # Add the bar to the bar window self.bar_window.Add(bar) # Check if the indicators are ready if self.indicators_ready: # Get the current market volatility atr_pct = self.atr.Current.Value/bar.Close previous_volatility = self.volatility volatility = self.get_market_volatility(atr_pct) self.volatility_window.Add(volatility) if self.volatility_window.IsReady: # Check if the volatility values are all the same if len(set(list(self.volatility_window))) == 1: self.volatility = volatility if self.volatility: # Check for change if previous_volatility != self.volatility: self.algo.Log( f"Market volatility change from {previous_volatility} to " f"{self.volatility}", ) # Get the current market direction previous_direction = self.direction sqn = self.sqn.Value direction = self.get_market_direction(sqn) self.direction_window.Add(direction) if self.direction_window.IsReady: # Check if the direction values are all the same if len(set(list(self.direction_window))) == 1: self.direction = direction if self.direction: # Check for change if previous_direction != self.direction: self.algo.Log( f"Market direction change from {previous_direction} to " f"{self.direction}", ) #------------------------------------------------------------------------------- def reset_indicators(self): """Manually reset all of the indicators.""" # Loop through all indicators for indicator, update_method in self.indicators: indicator.Reset() # Reset the bar window self.bar_window.Reset() #------------------------------------------------------------------------------- def adjust_indicators(self, adjustment): """Adjust all indicators for splits or dividends.""" # Get a list of the current bars bars = list(self.bar_window) # Current order is newest to oldest (default for rolling window) # Reverse the list to be oldest to newest bars.reverse() # Reset all indicators self.reset_indicators() # Loop through the bars from oldest to newest for bar in bars: # Adjust the bar by the adjustment factor bar.Open *= adjustment bar.High *= adjustment bar.Low *= adjustment bar.Close *= adjustment # Use the bar to update the indicators # This also adds the bar to the rolling window self.update_daily_indicators(bar) #------------------------------------------------------------------------------- def warmup_indicators(self): """Warm up indicators using historical data.""" # Update warmup variable, so we don't try to take any signals self.warming_up = True # Get historical daily trade bars daily_bars = self.algo.History[TradeBar]( self.symbol_object, int(1.5*self.min_bars), Resolution.Daily ) # Loop through the bars and update the consolidator for bar in daily_bars: # self.consolidator1.Update(bar) # don't use method with daily bars # Instead pass the bar directly to the event handler self.on_daily_consolidated(None, bar) self.warming_up = False #------------------------------------------------------------------------------- @ property def indicators_ready(self): """Check if all of the indicators used are ready (warmed up).""" # Loop through all indicators for indicator, update_method in self.indicators: # Return False if the indicator is not ready if not indicator.IsReady: return False # Check rolling windows if not self.bar_window.IsReady: return False # Otherwise return True return True #------------------------------------------------------------------------------- def get_market_volatility(self, atr_pct): # Check if nan if math.isnan(atr_pct): return 'nan' # Check if the current level is at or below the quiet level elif atr_pct <= self.VOLATILITY_QUIET_MAX: return 'quiet' # Otherwise check if the current level is at or below the normal max level elif atr_pct <= self.VOLATILITY_NORMAL_MAX: return 'normal' # Otherwise check if the current level is at or below the volatile max level elif atr_pct <= self.VOLATILITY_VOLATILE_MAX: return 'volatile' # Otherwise very volatile return 'very volatile' #------------------------------------------------------------------------------- def get_market_direction(self, sqn): # Check if nan if math.isnan(sqn): return 'nan' # Check if the current level is above the strong bull level elif sqn > self.DIRECTION_STRONG_BULL: return 'strong bull' # Otherwise check if the current level is at or above the bull level elif sqn >= self.DIRECTION_BULL: return 'bull' # Otherwise check if the current level is at or above 0 elif sqn >= 0: return 'neutral' # Otherwise check if the current level is at or above the strong bear level elif sqn >= self.DIRECTION_STRONG_BEAR: return 'bear' # Otherwise strong bear return 'strong bear' ################################################################################ class SQN: """Custom 'System Quality Number' indicator.""" def __init__(self, period): """Initialize the indicator.""" self.Name = "SQN" # may want to customize this self.Time = datetime.min self.Value = 0 self.IsReady = False # Create required indicators to calculate this indicator self.percent_changes = RollingWindow[float](period) self.previous_close = None self.mean = None self.std = None self.multiplier = min(10, math.sqrt(period)) #------------------------------------------------------------------------------- def __repr__(self): """ Returns the object representation in string format. Called via repr() on the object. """ return f"{self.Name} -> IsReady: {self.IsReady}. Time: {self.Time}. " + \ f"Value: {self.Value}" #------------------------------------------------------------------------------- def Reset(self): """Reset the indicator.""" # Reset the rolling window self.percent_changes.Reset() #------------------------------------------------------------------------------- def Update(self, input): """ Update the indicator with the input. This is a required function for custom indicators! """ # Get the price price = input.Close # Check if we have a previous close if self.previous_close: # Calculate the percent change and add to the rolling window percent_change = (price-self.previous_close)/self.previous_close self.percent_changes.Add(percent_change) # Check if we have enough percent changes if self.percent_changes.IsReady: # Calculate the SQN percent_changes = list(self.percent_changes) # percent_changes.reverse() # to put in oldest to newest order self.mean = statistics.mean(percent_changes) self.std = statistics.stdev(percent_changes) self.sqn = (self.mean/self.std)*self.multiplier self.IsReady = True self.Value = self.sqn # Save the close self.previous_close = input.Close
# Imports from AlgorithmImports import * from scipy.stats import norm import datetime as dt import pickle # File imports - n/a # Global variables OS_KEY = "LiveAlgo" ############################################################################### class MyPortfolioConstructionModel(PortfolioConstructionModel): ''' Provides an implementation of IPortfolioConstructionModel that gives equal weighting to all securities. The target percent holdings of each security is 1/N where N is the number of securities. For insights of direction InsightDirection.Up, long targets are returned and for insights of direction InsightDirection.Down, short targets are returned. ''' def __init__( self, algo, rebalance=Resolution.Hour, portfolio_bias=PortfolioBias.LongShort ): ''' Initialize a new instance of EqualWeightingPortfolioConstructionModel Args: algo: QCAlgorithm instance. rebalance: Rebalancing parameter. If it is a timedelta, date rules or Resolution, it will be converted into a function. If None will be ignored. The function returns the next expected rebalance time for a given algorithm UTC DateTime. The function returns null if unknown, in which case the function will be called again in the next loop. Returning current time will trigger rebalance. portfolio_bias: Specifies the bias of the portfolio (Short, LongShort, Long) ''' super().__init__() self.portfolio_bias = portfolio_bias self.algo = algo # If the argument is an instance of Resolution or Timedelta # Redefine rebalancingFunc rebalancingFunc = rebalance if isinstance(rebalance, int): rebalance = Extensions.ToTimeSpan(rebalance) if isinstance(rebalance, timedelta): rebalancingFunc = lambda dt: dt + rebalance if rebalancingFunc: self.SetRebalancingFunc(rebalancingFunc) def DetermineTargetPercent(self, activeInsights): ''' Will determine the target percent for each insight Args: activeInsights: The active insights to generate a target for ''' result = {} algo = self.algo # Debugging if self.algo.LiveMode: algo.MyLog(f"Checking portfolio target percents...") # Give equal weighting to each security based on strategy zscore_symbols = [] zscore_count = 0 count_short = 0 count_long = 0 zscore_counts = [] count_shorts = [] count_longs = [] # Loop through active insights # for x in activeInsights: # 9/15 change # Loop through ALL insights for x in self.algo.Insights: # Skip if flat insight if x.Direction == InsightDirection.Flat: continue # Check if for zscore symbol symbol = x.Symbol if symbol in self.algo.zscore_symbols \ or symbol in self.algo.previous_zscore_symbols: zscore_symbols.append(symbol) zscore_count += 1 if symbol not in zscore_counts: zscore_counts.append(symbol) # TTM signals else: # # Skip if not time for update # if not algo.update_models: # continue if x.Direction == InsightDirection.Down: count_short += 1 if symbol not in count_shorts: count_shorts.append(symbol) elif x.Direction == InsightDirection.Up: count_long += 1 if symbol not in count_longs: count_longs.append(symbol) # Get zscore_count and count_short/count_long zscore_count = len(zscore_counts) count_short = len(count_shorts) count_long = len(count_longs) # Handle for Zscore if zscore_count == 0: zscore_percent = 0 else: zscore_percent = 1.0/zscore_count zscore_percent = zscore_percent * self.algo.zscore_weight # Handle for TTM count = 0 if algo.ttm_squeeze_short: count += count_short if algo.ttm_squeeze_long: count += count_long percent = 0 if count == 0 else 1.0 / count if algo.ttm_squeeze_short: short_percent = percent * self.algo.ttm_weight else: short_percent = 0 if algo.ttm_squeeze_long: long_percent = percent * self.algo.ttm_weight else: long_percent = 0 # Loop through new/active insights # for insight in activeInsights: active_insight_symbols = [x.Symbol for x in activeInsights] # Loop through ALL insights # 9/15 change target_allocation = {} for insight in self.algo.Insights: symbol = insight.Symbol symbol_object_str = str(symbol.ID) # # Catch if not in "activeInsights" # if symbol not in active_insight_symbols: # print('debug') # Check if for zscore symbol if symbol in zscore_symbols: weight = (insight.Direction if self.RespectPortfolioBias(insight) else InsightDirection.Flat) * zscore_percent result[insight] = weight # Add to target allocation if weight not 0 if weight != 0: target_allocation[symbol_object_str] = ('ZScore', weight, self.algo.Time) # TTM symbol else: # Skip adding to result if not time for update skip_result = True if not algo.update_models else False # if not algo.update_models: # skip_result = True if insight.Direction == InsightDirection.Down: weight = (insight.Direction if self.RespectPortfolioBias(insight) else InsightDirection.Flat) * short_percent if not skip_result: result[insight] = weight # Add to target allocation if weight not 0 if weight != 0: target_allocation[symbol_object_str] = ('TTM', weight, self.algo.Time) if insight.Direction == InsightDirection.Up: weight = (insight.Direction if self.RespectPortfolioBias(insight) else InsightDirection.Flat) * long_percent if not skip_result: result[insight] = weight # Add to target allocation if weight not 0 if weight != 0: target_allocation[symbol_object_str] = ('TTM', weight, self.algo.Time) # # Save the target allocation to the Object Store # target_allocation = {} # if len(result) > 0: # for insight, weight in result.items(): # # Skip if weight is 0 # if weight == 0: # continue # symbol_object = insight.Symbol # if symbol in zscore_symbols: # strategy = 'ZScore' # else: # strategy = 'TTM' # if weight > 0: # print('debug') # symbol_object_str = str(symbol_object) # target_allocation[symbol_object_str] = (strategy, weight, self.algo.Time) # Save the target allocation to the Object Store - only for live trading! serialized = pickle.dumps(target_allocation) if self.algo.LiveMode: self.algo.ObjectStore.SaveBytes(OS_KEY, serialized) # Log message when desired if self.algo.LiveMode: self.algo.MyLog(f"Target Allocation saved to the Object Store: {target_allocation}") return result def RespectPortfolioBias(self, insight): ''' Method that will determine if a given insight respects the portfolio bias. Args: insight: The insight to create a target for ''' return self.portfolio_bias == PortfolioBias.LongShort \ or insight.Direction == self.portfolio_bias
# Imports from AlgorithmImports import * import datetime as dt # File imports - n/a ############################################################################### class MultipleStrategyRiskManagement(RiskManagementModel): ''' Provides an implementation of IRiskManagementModel that limits the drawdown per holding to the specified percentage REF: https://github.com/QuantConnect/Lean/blob/master/Algorithm.Framework/ Risk/MaximumDrawdownPercentPerSecurity.py ''' def __init__(self, algo): ''' Initializes a new instance of the MaximumDrawdownPercentPerSecurity class Args: max_dd_pct: The maximum percentage drawdown allowed for any single security holding ''' self.algo = algo self.zscore_dd_pct = -0.05 self.ttm_dd_pct = -0.15 def ManageRisk(self, algorithm, targets): ''' Manages the algorithm's risk at each time step Args: algorithm: The algorithm instance targets: The current portfolio targets to be assessed for risk DO NOT USE algorithm - QuantConnect.Algorithm.QCAlgorithm object INSTEAD USE self.algo - main.MeanReversionAlgorithm object ''' algo = self.algo targets = [] # This is for zscore only skip_zscore = False # Skip if not regular trading hours if not algo.bm_hours.IsOpen(algo.Time,extendedMarketHours=False): # But allowed at market close if algo.Time.hour == 16 and algo.Time.minute == 0: pass elif algo.Time.hour == 15 and algo.Time.minute == 59: pass else: skip_zscore = True # Debugging if algo.LiveMode: algo.MyLog(f"Checking risk management model...") # Loop through securities for kvp in algo.Securities: security = kvp.Value if not security.Invested: continue symbol_object = security.Symbol # symbol = str(symbol_object).split(" ")[0] pnl = security.Holdings.UnrealizedProfitPercent # Check for the ZScore strategy if symbol_object in self.algo.zscore_symbols \ or symbol_object in self.algo.previous_zscore_symbols: # Skip if not time for update if skip_zscore: continue max_dd = self.zscore_dd_pct # Otherwise TTM strategy else: # Skip if not time for update if not algo.update_models: continue max_dd = self.ttm_dd_pct # # Updated here... # else: # algo.update_models = False # Check for stop triggered if (pnl < max_dd): # Cancel insights algo.Insights.Cancel([symbol_object]) # Liquidate targets.append(PortfolioTarget(symbol_object, 0)) return targets
from AlgorithmImports import * import talib from datetime import timedelta ############################################################################### class TTMAlphaModel(AlphaModel): Name = "TTM" def __init__(self, algo, period=20): self.algo = algo self.period = period self.prev_squeeze = {} def Update(self, algorithm, data): insights = [] algo = self.algo # Get current time current_time = algorithm.Time # Skip if not time for update if not algo.update_models: return insights # Debugging if algo.LiveMode: algo.MyLog(f"Checking for TTM signals...") # Get historical data for universe universe = algorithm.UniverseManager.ActiveSecurities for security in universe: try: # Ignore SPY if security.Value.Symbol.Value == 'SPY': continue # Optimal for long signals history = algorithm.History(security.Value.Symbol, 55, Resolution.Daily) # Optimal for short signals history2 = algorithm.History(security.Value.Symbol, 30, Resolution.Daily) # Calculate indicators bb_upper, _, bb_lower = talib.BBANDS(history['close'], timeperiod=self.period) bb_upper2, _, bb_lower2 = talib.BBANDS(history2['close'], timeperiod=self.period) kama = talib.KAMA(history['close'], timeperiod=self.period) kama2 = talib.KAMA(history2['close'], timeperiod=self.period) atr = talib.ATR(history['high'], history['low'], history['close'], timeperiod=20) atr2 = talib.ATR(history2['high'], history2['low'], history2['close'], timeperiod=20) mom = talib.MOM(history['close'], timeperiod=20) mom2 = talib.MOM(history2['close'], timeperiod=20) if len(mom) < 5: continue if len(mom2) < 5: continue smoothed_mom = mom.rolling(5).mean() smoothed_mom2 = mom2.rolling(5).mean() kc_upper = kama + (1.5 * atr) kc_lower = kama - (1.5 * atr) kc_upper2 = kama2 + (1.5 * atr2) kc_lower2 = kama2 - (1.5 * atr2) # Calculate TTM Squeeze if bb_upper[-1] < kc_upper[-1] and bb_lower[-1] > kc_lower[-1]: squeeze = True else: squeeze = False if bb_upper[-2] < kc_upper[-2] and bb_lower[-2] > kc_lower[-2]: prev_squeeze = True else: prev_squeeze = False if bb_upper2[-1] < kc_upper2[-1] and bb_lower2[-1] > kc_lower2[-1]: squeeze2 = True else: squeeze2 = False if bb_upper2[-2] < kc_upper2[-2] and bb_lower2[-2] > kc_lower2[-2]: prev_squeeze2 = True else: prev_squeeze2 = False mom_bullish = smoothed_mom[-1] > smoothed_mom[-2] and smoothed_mom[-1] > 0 and smoothed_mom[-2] > 0 #Blue mom_bullish_stop = smoothed_mom[-1] < smoothed_mom[-2] #Dark Blue mom_bearish = smoothed_mom2[-1] < smoothed_mom2[-2] and smoothed_mom2[-1] < 0 and smoothed_mom2[-2] < 0 #Red mom_bearish_stop = smoothed_mom2[-1] > smoothed_mom2[-2] #Yellow # Check for signals if mom_bullish: if squeeze and prev_squeeze: insights.append(Insight.Price(security.Value.Symbol, timedelta(30), InsightDirection.Up)) algo.MyLog(f'TTM Up Insight for {security.Value.Symbol.Value}') if mom_bearish: if squeeze2 and prev_squeeze2: insights.append(Insight.Price(security.Value.Symbol, timedelta(30), InsightDirection.Down)) algo.MyLog(f'TTM Down Insight for {security.Value.Symbol.Value}') # Only check for flat signal if it's not a zscore symbol! if symbol_object in algo.zscore_symbols or symbol_object in algo.previous_zscore_symbols: continue elif algorithm.Portfolio[security.Value.Symbol].Invested: if algorithm.Portfolio[security.Value.Symbol].IsLong and mom_bullish_stop: insights.append(Insight.Price(security.Value.Symbol, timedelta(1), InsightDirection.Flat)) algo.MyLog(f'TTM Flat Insight for {security.Value.Symbol.Value}') #algorithm.Liquidate(security.Value.Symbol.Value, "Liquidated exit short") elif algorithm.Portfolio[security.Value.Symbol].IsShort and mom_bearish_stop: insights.append(Insight.Price(security.Value.Symbol, timedelta(1), InsightDirection.Flat)) algo.MyLog(f'TTM Flat Insight for {security.Value.Symbol.Value}') #algorithm.Liquidate(security.Value.Symbol.Value, "Liquidated exit short") except: continue return insights
# Imports from AlgorithmImports import * from scipy.stats import norm import datetime as dt # File imports - n/a # Inactive order statuses INACTIVE_ORDERS = \ [OrderStatus.Canceled, OrderStatus.Invalid, OrderStatus.Filled] ############################################################################### class ZScoreAlphaModel(AlphaModel): '''Alpha model that uses an Z score to create insights''' Name = "ZScore" def __init__(self, algo, lookupPeriod = 30, resolution = Resolution.Daily): '''Initializes a new instance of the ZScoreAlphaModel class Args: lookupPeriod: Look up period of history resolution: Resoultion of the history ''' self.algo = algo self.lookupPeriod = lookupPeriod self.resolution = resolution # self.predictionInterval = \ # Time.Multiply(Extensions.ToTimeSpan(resolution), lookupPeriod) self.symbolDataBySymbol = [] def Update(self, algorithm, data): ''' Updates this alpha model with the latest data from the algorithm. This is called each time the algorithm receives data for subscribed securities. Args: algorithm: The algorithm instance data: The new data available Returns: The new insights generated DO NOT USE algorithm - QuantConnect.Algorithm.QCAlgorithm object INSTEAD USE self.algo - main.MeanReversionAlgorithm object ''' algo = self.algo insights = [] # Skip if not regular trading hours if not algo.Securities[algo.bm].Exchange.Hours.IsOpen(algo.Time,extendedMarketHours=False): # But allowed at market close if algo.Time.hour == 16 and algo.Time.minute == 0: pass elif algo.Time.hour == 15 and algo.Time.minute == 59: pass else: return insights # Skip if the Zscore algo is not being traded if not algo.trade_zscore: return insights # Debugging if algo.LiveMode: algo.MyLog(f"Checking for ZScore signals...") # Get historical data df = algo.History( self.symbolDataBySymbol, self.lookupPeriod, self.resolution ) if df.empty: return insights # Make all of them into a single time index. df = df.close.unstack(level=0) # Mean of the stocks df_mean = df.mean() # standard deviation df_std = df.std() # get last prices df = df.iloc[-1] # calculate z_score z_score = (df.subtract(df_mean)).divide(df_std) signals = [] algo.previous_zscore_symbols = algo.zscore_symbols for symbol in z_score.index: if z_score[symbol] > 3: insights.append( Insight.Price( symbol=symbol, period=timedelta(hours=1), direction=InsightDirection.Down, magnitude=1, confidence=1, sourceModel=None, weight=z_score[symbol]-3 ) ) algo.MyLog( f'Down Insight for {symbol}, Zscore: {z_score[symbol]}') algo.zscore_symbols.append(symbol) signals.append(symbol) algo.zscore_symbols = signals algo.signals = signals return insights def OnSecuritiesChanged(self, algorithm, changes): ''' Event fired each time the we add/remove securities from the data feed Args: algorithm: The algorithm instance changes: The security additions and removals from the algorithm ''' for added in changes.AddedSecurities: if added.Symbol not in self.symbolDataBySymbol: if added.Symbol.Value == 'SPY': continue # Ignore SPY self.symbolDataBySymbol.append(added.Symbol) for removed in changes.RemovedSecurities: if removed.Symbol in self.symbolDataBySymbol: data = self.symbolDataBySymbol.remove(removed.Symbol)