Overall Statistics |
Total Trades 616 Average Win 1.80% Average Loss -0.42% Compounding Annual Return 138.261% Drawdown 18.800% Expectancy 0.385 Net Profit 46.647% Sharpe Ratio 2.381 Probabilistic Sharpe Ratio 72.535% Loss Rate 74% Win Rate 26% Profit-Loss Ratio 4.33 Alpha 0 Beta 0 Annual Standard Deviation 0.4 Annual Variance 0.16 Information Ratio 2.381 Tracking Error 0.4 Treynor Ratio 0 Total Fees $2356.66 Estimated Strategy Capacity $1900000.00 Lowest Capacity Asset BLU X7LKIX7ELMQT Portfolio Turnover 84.34% |
# Imports from AlgorithmImports import * from scipy.stats import norm import datetime as dt # File imports from notes_and_inputs import * # Inactive order statuses INACTIVE_ORDERS = \ [OrderStatus.Canceled, OrderStatus.Invalid, OrderStatus.Filled] ############################################################################### class MyZScoreAlphaModel(AlphaModel): '''Alpha model that uses an Z score to create insights''' def __init__(self, short_entry_std, long_entry_std, algo): """ Initializes a new instance of the ZScoreAlphaModel class. Args: short_entry_std: Standard deviation zscore to trigger short signals algo: QCAlgorithm instance """ self.algo = algo # self.lookupPeriod = lookup_period self.short_entry_std = short_entry_std self.long_entry_std = long_entry_std self.symbols = [] def Update(self, algorithm, data): """ Updates this alpha model with the latest data from the algorithm. This is called each time the algorithm receives data for subscribed securities. Args: algorithm: The algorithm instance data: The new data available Returns: The new insights generated DO NOT USE algorithm - QuantConnect.Algorithm.QCAlgorithm object INSTEAD USE self.algo - main.MeanReversionAlgorithm object """ insights = [] algo = self.algo # # Skip if minute not in allowed list # if algo.Time.minute not in self.algo.allowed_minutes: # return insights # Skip if not on a new hour if algo.Time.minute != 0: return insights # Skip at market close 16:00 elif algo.Time.hour == 16: return insights # Create an empty list of signals signals = [] # Loop though SymbolData class instances for ticker, sd in algo.symbol_data.items(): # Check if we do not want to reenter a trade for the symbol if algo.do_not_reenter.get(sd.symbol_object): # Skip if the required time has not passed stop_time = algo.do_not_reenter.get(sd.symbol_object) delta = dt.timedelta(days=algo.WAIT_DAYS_FOR_REENTRY) restart_time = stop_time + delta if algo.Time < restart_time: continue else: # Pop off the symbol object from the dictionary algo.do_not_reenter.pop(sd.symbol_object) # Check for short alpha signal if sd.target_weight < 0: insights.append( Insight.Price( symbol=sd.symbol_object, period=timedelta(minutes=self.algo.BAR_MINUTES), direction=InsightDirection.Down, magnitude=1, #magnitude[symbol], this isn't used confidence=1, #confidence[symbol], this isn't used sourceModel=None, weight=sd.target_weight ) ) symbol = str(sd.symbol_object).split(" ")[0] signals.append(symbol) if algo.LiveMode: algo.MyLog( f'Short Insight for {symbol}: ' f'Zscore={sd.zscore}, ' f'Weight: {sd.target_weight}' ) # Check for long alpha signal if sd.target_weight > 0: insights.append( Insight.Price( symbol=sd.symbol_object, period=timedelta(minutes=self.algo.BAR_MINUTES), direction=InsightDirection.Up, magnitude=1, #magnitude[symbol], this isn't used confidence=1, #confidence[symbol], this isn't used sourceModel=None, weight=sd.target_weight ) ) symbol = str(sd.symbol_object).split(" ")[0] signals.append(symbol) if algo.LiveMode: algo.MyLog( f'Long Insight for {symbol}: ' f'Zscore={sd.zscore}, ' f'Weight: {sd.target_weight}' ) algo.signals = signals return insights def OnSecuritiesChanged(self, algorithm, changes): """ Event fired each time the we add/remove securities from the data feed. Args: algorithm: The algorithm instance changes: The security additions and removals from the algorithm """ for added in changes.AddedSecurities: if added.Symbol not in self.symbols: self.symbols.append(added.Symbol) for removed in changes.RemovedSecurities: if removed.Symbol in self.symbols: self.symbols.remove(removed.Symbol) ############################################################################### class MyPortfolioConstructionModel(PortfolioConstructionModel): ''' Provides an implementation of IPortfolioConstructionModel that gives equal weighting to all securities. The target percent holdings of each security is 1/N where N is the number of securities. For insights of direction InsightDirection.Up, long targets are returned and for insights of direction InsightDirection.Down, short targets are returned. ''' def __init__( self, algo, rebalance=Resolution.Hour, portfolio_bias=PortfolioBias.LongShort ): ''' Initialize a new instance of EqualWeightingPortfolioConstructionModel Args: algo: QCAlgorithm instance. rebalance: Rebalancing parameter. If it is a timedelta, date rules or Resolution, it will be converted into a function. If None will be ignored. The function returns the next expected rebalance time for a given algorithm UTC DateTime. The function returns null if unknown, in which case the function will be called again in the next loop. Returning current time will trigger rebalance. portfolio_bias: Specifies the bias of the portfolio (Short, LongShort, Long) ''' super().__init__() self.portfolio_bias = portfolio_bias self.algo = algo # If the argument is an instance of Resolution or Timedelta # Redefine rebalancingFunc rebalancingFunc = rebalance if isinstance(rebalance, int): rebalance = Extensions.ToTimeSpan(rebalance) if isinstance(rebalance, timedelta): rebalancingFunc = lambda dt: dt + rebalance if rebalancingFunc: self.SetRebalancingFunc(rebalancingFunc) def DetermineTargetPercent(self, activeInsights): ''' Will determine the target percent for each insight Args: activeInsights: The active insights to generate a target for ''' # if self.algo.Time.day >= 10: # print('debug') result = {} # give equal weighting to each security count = sum( x.Direction != InsightDirection.Flat \ and self.respect_portfolio_bias(x) for x in activeInsights ) if count == 0: percent = 0 else: percent = min(self.algo.MAX_WEIGHT, 1.0/count) for insight in activeInsights: result[insight] = ( insight.Direction if self.respect_portfolio_bias(insight) \ else InsightDirection.Flat ) * percent return result def respect_portfolio_bias(self, insight): ''' Method that will determine if a given insight respects the portfolio bias. Args: insight: The insight to create a target for ''' return self.portfolio_bias == PortfolioBias.LongShort \ or insight.Direction == self.portfolio_bias ############################################################################### class MyMaximumDrawdownPercentPerSecurity(RiskManagementModel): ''' Provides an implementation of IRiskManagementModel that limits the drawdown per holding to the specified percentage REF: https://github.com/QuantConnect/Lean/blob/master/Algorithm.Framework/ Risk/MaximumDrawdownPercentPerSecurity.py ''' def __init__(self, algo, max_dd_pct=0.05, constant_check_multiple=2.0): ''' Initializes a new instance of the MaximumDrawdownPercentPerSecurity class Args: max_dd_pct: The maximum percentage drawdown allowed for any single security holding ''' self.algo = algo self.max_dd_pct = -abs(max_dd_pct) self.constant_check_multiple = constant_check_multiple def ManageRisk(self, algorithm, targets): ''' Manages the algorithm's risk at each time step Args: algorithm: The algorithm instance targets: The current portfolio targets to be assessed for risk DO NOT USE algorithm - QuantConnect.Algorithm.QCAlgorithm object INSTEAD USE self.algo - main.MeanReversionAlgorithm object ''' algo = self.algo targets = [] # Always check for a larger drawdown # Loop through securities for kvp in algo.Securities: security = kvp.Value symbol_object = security.Symbol symbol = str(symbol_object).split(" ")[0] if not security.Invested: continue pnl = security.Holdings.UnrealizedProfitPercent if (pnl < self.constant_check_multiple*self.max_dd_pct): # liquidate targets.append(PortfolioTarget(symbol_object, 0)) # Add symbol to do not re-enter dictionary algo.do_not_reenter[symbol_object] = algo.Time # Log message if algo.LiveMode: algo.MyLog( f"{symbol} max risk hit!, pnl={pnl}, so " f"liquidating the position" ) # # Skip if minute not in allowed list # if algo.Time.minute not in self.algo.allowed_minutes: # return targets # Skip if not on a new hour if algo.Time.minute != 0: return targets # # Skip at market close 16:00 # elif algo.Time.hour == 16: # return targets # Loop through securities for kvp in algo.Securities: security = kvp.Value symbol_object = security.Symbol symbol = str(symbol_object).split(" ")[0] if not security.Invested: continue pnl = security.Holdings.UnrealizedProfitPercent if (pnl < self.max_dd_pct) \ or (symbol not in self.algo.signals): # liquidate # if symbol == 'XON': # print('debug') targets.append(PortfolioTarget(symbol_object, 0)) # Add symbol to do not re-enter dictionary if pnl < self.max_dd_pct: algo.do_not_reenter[symbol_object] = 0 # Log message if algo.LiveMode: algo.MyLog( f"{symbol} max risk hit!, pnl={pnl}, so " f"liquidating the position" ) return targets ############################################################################### class MyOrderExecutionModel(ExecutionModel): ''' Provides an implementation of IExecutionModel that submits orders to achieve the desired portfolio targets. REF: https://github.com/QuantConnect/Lean/blob/master/Algorithm/Execution/ ImmediateExecutionModel.py ''' def __init__(self, algo, max_spread_pct, limit_order_pct): '''Initializes a new instance of the ImmediateExecutionModel class''' self.targets_collection = PortfolioTargetCollection() self.algo = algo self.max_spread_pct = abs(max_spread_pct) self.limit_order_pct = limit_order_pct self.limit_orders = [] def Execute(self, algorithm, targets): ''' Submits orders for the specified portfolio targets. Args: algorithm: The algorithm instance targets: The portfolio targets to be ordered DO NOT USE algorithm - QuantConnect.Algorithm.QCAlgorithm object INSTEAD USE self.algo - main.MeanReversionAlgorithm object ''' algo = self.algo # # Skip if minute not in allowed list # if algo.Time.minute not in self.algo.allowed_minutes: # return # Skip if not on a new hour if algo.Time.minute != 0: return # # Skip at market close 16:00 # elif algo.Time.hour == 16: # return # First cancel any open limit orders # Loop through a copy of limit orders for ticket in self.limit_orders[:]: # Get the status of the order status = ticket.Status # Remove from list if no longer active if status in INACTIVE_ORDERS: self.limit_orders.remove(ticket) # Otherwise try to cancel the order else: response = ticket.Cancel('Cancel from Execution Model') if response.IsSuccess: # Remove from list self.limit_orders.remove(ticket) else: raise ValueError( f"MyOrderExecutionModel error trying to cancel open " f"order: {ticket}" ) # For performance we check count value, OrderByMarginImpact and # ClearFulfilled are expensive to call self.targets_collection.AddRange(targets) if not self.targets_collection.IsEmpty: for target in self.targets_collection.OrderByMarginImpact(algo): symbol = target.Symbol # Calculate remaining quantity to be ordered remaining_qty = OrderSizing.GetUnorderedQuantity(algo, target) # Check order entry conditions if remaining_qty != 0: # Get the current quantity current_qty = self.algo.Portfolio[symbol].Quantity # Check for rebalance order if current_qty != 0 \ and (abs(remaining_qty) != abs(current_qty)): # Skip if no rebalancing if not REBALANCE: continue # Get security information security = algo.Securities[symbol] # Use a market order if the spread is favorable if self.favorable_spread(security): # # Make sure we have enough margin for the order # price = security.Price # value = price*remaining_qty # margin_remaining = algo.Portfolio.MarginRemaining # max_shares = int(margin_remaining/price) # if abs(remaining_qty) > max_shares: # # Reduce the number of shares to max_shares # if remaining_qty < 0: # sell order # order_shares = -max_shares # else: # buy order # order_shares = max_shares # else: # order_shares = remaining_qty # Place the order order = algo.MarketOrder( symbol, remaining_qty, asynchronous=True ) # # Check for order rejected # if order.Status == OrderStatus.Invalid: # # Now let's try to cut the order in 1/2 # order_shares = int(order_shares/2) # # Try to place the order again # order2 = algo.MarketOrder( # symbol, order_shares, asynchronous=True # ) # # Check for order rejected # if order2.Status == OrderStatus.Invalid: # # Not sure what to do now other than logs # pass # Otherwise use a limit order else: if remaining_qty > 0: # buy order price = security.Price*(1+self.limit_order_pct) else: # sell order price = security.Price*(1-self.limit_order_pct) # Round the price to the nearest tick value ticker = target.Symbol.Value sd = algo.symbol_data.get(ticker) if sd: price = sd.round_price(price) else: price = round(price,2) # Place the limit order order = algo.LimitOrder(symbol, remaining_qty, price) self.limit_orders.append(order) # # Get the target security # security = algo.Securities[target.Symbol] # # Get the current price # price = security.Price # # Get the current quantity # open_quantity = sum( # [x.Quantity \ # for x in algo.Transactions.GetOpenOrders(target.Symbol) # ] # ) # current_qty = int(security.Holdings.Quantity + open_quantity) # # Calculate estimated remaining quantity to be ordered # desired_qty = target.Quantity # estimated_order_qty = int(desired_qty-current_qty) # # Calculate the desired order value # desired_value = price*desired_qty # # Current model is using price to calculate desired qty # # Instead we will adjust the price # bid = security.BidPrice # ask = security.AskPrice # mid = (bid+ask)*0.5 # # Check for buy order # if estimated_order_qty > 0: # # Buy at ask if close enough to last price # if abs(ask-price)/price < 0.01: # price = ask # # Check for sell order # elif estimated_order_qty < 0: # # Sell at bid if close enough to last price # if abs(bid-price)/price < 0.01: # price = bid # # Round the price to the nearest tick value # ticker = target.Symbol.Value # sd = algo.symbol_data.get(ticker) # if sd: # price = sd.round_price(price) # else: # price = round(price,2) # # Recalculate the order quantity using the new price # desired_qty = int(desired_value/price) # order_qty = int(desired_qty-current_qty) # # Verify an order is required # if order_qty != 0: # aboveMinimumPortfolio = \ # BuyingPowerModelExtensions.AboveMinimumOrderMarginPortfolioPercentage( # security.BuyingPowerModel, # security, # order_qty, # algo.Portfolio, # algo.Settings.MinimumOrderMarginPortfolioPercentage # ) # if aboveMinimumPortfolio: # algo.LimitOrder(target.Symbol, order_qty, price) self.targets_collection.ClearFulfilled(algo) def favorable_spread(self, security): '''Determines if the spread is in desirable range.''' # Price has to be larger than zero to avoid zero division error, # or negative price causing the spread percentage < 0 by error # Has to be in opening hours of exchange to avoid extreme spread # in OTC period. if security.Exchange.ExchangeOpen \ and security.Price > 0 \ and security.AskPrice > 0 \ and security.BidPrice > 0: spread_pct = (security.AskPrice-security.BidPrice) / security.Price return spread_pct <= self.max_spread_pct else: return False ############################################################################### class CustomSecurityInitializer(BrokerageModelSecurityInitializer): def __init__( self, brokerage_model: IBrokerageModel, security_seeder: ISecuritySeeder ) -> None: super().__init__(brokerage_model, security_seeder) def Initialize(self, security: Security) -> None: """ Define models to be used for securities as they are added to the algorithm's universe. """ # First, call the superclass definition # Sets the reality models of each security using the default models # of the brokerage model super().Initialize(security) # Define the data normalization mode if DATA_MODE == 'ADJUSTED': security.SetDataNormalizationMode(DataNormalizationMode.Adjusted) else: security.SetDataNormalizationMode(DataNormalizationMode.Raw) # Define the fee model to use for the security # security.SetFeeModel() # Define the slippage model to use for the security # security.SetSlippageModel() # Define the fill model to use for the security # security.SetFillModel() # Define the buying power model to use for the security # security.SetBuyingPowerModel() # Next, overwrite some of the reality models # security.SetFeeModel(ConstantFeeModel(0, "USD"))
############################################################################### # Standard library imports import datetime as dt import pickle import traceback # QuantConnect specific imports from AlgorithmImports import * import QuantConnect as qc # File imports from notes_and_inputs import * from custom_models import * from symbol_data import SymbolData ############################################################################### class MeanReversionAlgorithm(QCAlgorithm): def Initialize(self): """Initialize algorithm.""" self.SetBacktestDetails() self.AddVariables() self.AddInstrumentData() self.ScheduleFunctions() self.SetFrameworkModels() #------------------------------------------------------------------------------- def SetBacktestDetails(self): """Set the backtest details.""" self.SetStartDate(START_DT.year, START_DT.month, START_DT.day) if END_DATE: self.SetEndDate(END_DT.year, END_DT.month, END_DT.day) self.SetCash(CASH) self.SetTimeZone(TIMEZONE) # Setup trading framework # Transaction and submit/execution rules will use IB models # brokerages: '''https://github.com/QuantConnect/Lean/blob/master/Common/Brokerages /BrokerageName.cs''' # account types: AccountType.Margin, AccountType.Cash self.SetBrokerageModel( BrokerageName.InteractiveBrokersBrokerage, AccountType.Margin ) # Configure all universe securities # This sets the data normalization mode, which we will verify here if self.LiveMode: # Make sure we are using 'RAW' data normalization if DATA_MODE != 'RAW': raise ValueError( "We must use 'RAW' data normalization when running live!" ) # You can also set custom fee, slippage, fill, and buying power models self.SetSecurityInitializer( CustomSecurityInitializer( self.BrokerageModel, FuncSecuritySeeder(self.GetLastKnownPrices) ) ) #------------------------------------------------------------------------------- def AddVariables(self): """Add variables used by the algo.""" self.symbol_data = {} self.signals = [] self.do_not_reenter = {} # Try to get the parameters from the GetParameter() list. If it doesn't # exist, it will get the value from the notes_and_inputs.py sheet. # Universe variables try: self.HIGHEST_DOLLAR_VOLUME = \ int(self.GetParameter("HIGHEST_DOLLAR_VOLUME")) except: self.HIGHEST_DOLLAR_VOLUME = HIGHEST_DOLLAR_VOLUME try: self.MIN_TIME_IN_UNIVERSE = \ int(self.GetParameter("MIN_TIME_IN_UNIVERSE")) except: self.MIN_TIME_IN_UNIVERSE = MIN_TIME_IN_UNIVERSE # Alpha model variables try: self.Z_MODEL_PERIOD = float(self.GetParameter("Z_MODEL_PERIOD")) except: self.Z_MODEL_PERIOD = Z_MODEL_PERIOD try: self.ZSCORE_SHORT_ENTRY_STD = \ float(self.GetParameter("ZSCORE_SHORT_ENTRY_STD")) except: self.ZSCORE_SHORT_ENTRY_STD = ZSCORE_SHORT_ENTRY_STD try: self.ZSCORE_LONG_ENTRY_STD = \ float(self.GetParameter("ZSCORE_LONG_ENTRY_STD")) except: self.ZSCORE_LONG_ENTRY_STD = ZSCORE_LONG_ENTRY_STD try: # Get from list for optimization testing i = int(self.GetParameter("BAR_MINUTES")) bar_list = [1,2,3,5,10,15,20,30,60] # 9 total -> 0-8 values self.BAR_MINUTES = bar_list[i] except: self.BAR_MINUTES = BAR_MINUTES try: self.MA_PERIOD_FAST = int(self.GetParameter("MA_PERIOD_FAST")) except: self.MA_PERIOD_FAST = MA_PERIOD_FAST try: self.MA_SLOW_MULTIPLE = int(self.GetParameter("MA_SLOW_MULTIPLE")) except: self.MA_SLOW_MULTIPLE = MA_SLOW_MULTIPLE # Execution model variables try: self.MAX_SPREAD_PCT = float(self.GetParameter("MAX_SPREAD_PCT")) except: self.MAX_SPREAD_PCT = MAX_SPREAD_PCT try: self.LIMIT_ORDER_PCT = float(self.GetParameter("LIMIT_ORDER_PCT")) except: self.LIMIT_ORDER_PCT = LIMIT_ORDER_PCT # Risk model variables try: self.MAX_TRADE_PCT_LOSS = \ float(self.GetParameter("MAX_TRADE_PCT_LOSS")) except: self.MAX_TRADE_PCT_LOSS = MAX_TRADE_PCT_LOSS try: self.WAIT_DAYS_FOR_REENTRY = \ int(self.GetParameter("WAIT_DAYS_FOR_REENTRY")) except: self.WAIT_DAYS_FOR_REENTRY = WAIT_DAYS_FOR_REENTRY # Portfolio Construction model variables try: self.MAX_WEIGHT = int(self.GetParameter("MAX_WEIGHT")) except: self.MAX_WEIGHT = MAX_WEIGHT try: self.MAX_WEIGHT_OVERNIGHT = \ float(self.GetParameter("MAX_WEIGHT_OVERNIGHT")) except: self.MAX_WEIGHT_OVERNIGHT = MAX_WEIGHT_OVERNIGHT # try: # self.PORTFOLIO_SL_PCT = float(self.GetParameter("PORTFOLIO_SL_PCT")) # except: # self.PORTFOLIO_SL_PCT = PORTFOLIO_SL_PCT # try: # self.REBALANCE_MINUTES = int(self.GetParameter("REBALANCE_MINUTES")) # except: # self.REBALANCE_MINUTES = REBALANCE_MINUTES # Save daily portfolio values self.times = [] self.navs = [] # Get desired minutes we can take actions based on BAR_MINUTES self.allowed_minutes = [] for i in range(1,61): # Loop from 1 to 59 # Check if modulus is zero if i % BAR_MINUTES == 0: self.allowed_minutes.append(i) # Replace 60 with 0 if 60 in self.allowed_minutes: self.allowed_minutes.remove(60) self.allowed_minutes.append(0) #------------------------------------------------------------------------------- def AddInstrumentData(self): """Add instrument data to the algo.""" # Add benchmark data self.bm = self.AddEquity(BENCHMARK, Resolution.Hour).Symbol # Add the coarse and fine universe filters self.AddUniverse(self.CoarseSelectionFunction) # Set the universe data properties desired # Use minutely data instead of hourly data! self.UniverseSettings.Resolution = Resolution.Minute # Set the universe extended hours and min time properties self.UniverseSettings.ExtendedMarketHours = USE_EXTENDED_HOURS self.UniverseSettings.MinimumTimeInUniverse = self.MIN_TIME_IN_UNIVERSE #------------------------------------------------------------------------------- def ScheduleFunctions(self): """Scheduling the functions required by the algo.""" # Check if we exit (or reduce size) at the end of the day if END_OF_DAY_EXIT or END_OF_DAY_REDUCE_SIZE: self.Schedule.On( self.DateRules.EveryDay(self.bm), self.TimeRules.BeforeMarketClose(self.bm, 1), self.EndOfDayExit ) #------------------------------------------------------------------------------- def MyLog(self, message): """Add algo time to log if live trading. Otherwise just log message.""" # Log all messages in live trading mode with local time added if self.LiveMode: self.Log(f'{self.Time}: {message}') else: self.Log(message) #------------------------------------------------------------------------------- def EndOfDayExit(self): """Exit any open positions at the end of the day.""" # Liquidate the portfolio if we want a full end of day exit if END_OF_DAY_EXIT: self.MyLog("Time for end of day full liquidation.") self.Liquidate() # Check if we want to reduce the weights at the end of the day elif END_OF_DAY_REDUCE_SIZE: self.MyLog("Time for end of day position size reduction.") # Get the current nav value nav = float(self.Portfolio.TotalPortfolioValue) # Loop through the open positions for symbol in self.Portfolio.Keys: # Check for open position qty = self.Portfolio[symbol].Quantity if qty != 0: # Get the current value and % of portfolio price = self.Portfolio[symbol].Price value = abs(qty)*price pct_portfolio = value/nav # Check if the position size is too large if pct_portfolio > self.MAX_WEIGHT_OVERNIGHT: self.MyLog( f"Reducing {symbol.Value}'s position size to " f"{round(100*self.MAX_WEIGHT_OVERNIGHT,2)}%." ) # Reduce the position size to the max allowed if qty < 0: # short self.SetHoldings(symbol, -self.MAX_WEIGHT_OVERNIGHT) else: # long self.SetHoldings(symbol, self.MAX_WEIGHT_OVERNIGHT) #------------------------------------------------------------------------------- def SetFrameworkModels(self): """Set the desired framework models for the algo to use.""" # Set IAlgorithm Settings self.Settings.RebalancePortfolioOnSecurityChanges = False self.Settings.RebalancePortfolioOnInsightChanges = True # self.Settings.MaxAbsolutePortfolioTargetPercentage = 1.0 self.Settings.MinAbsolutePortfolioTargetPercentage = \ 0.00000000000000000000001 # self.Settings.MinimumOrderMarginPortfolioPercentage = 0 # self.Settings.FreePortfolioValue = 0 # self.Settings.FreePortfolioValuePercentage = 0.025 # self.Settings.LiquidateEnabled = True # self.Settings.DataSubscriptionLimit = 50 # self.Settings.StalePriceTimeSpan = dt.timedelta(minutes=60) # self.Settings.WarmupResolution = self.resolution # Add our ZScoreAlphaModel (generates signals) self.AddAlpha( MyZScoreAlphaModel( short_entry_std = self.ZSCORE_SHORT_ENTRY_STD, long_entry_std = self.ZSCORE_LONG_ENTRY_STD, algo = self ) ) # Use custom models that we can easily update # Set the portfolio construction model (creates target portfolio) self.SetPortfolioConstruction( MyPortfolioConstructionModel(self) ) # Set the execution model (executes the target portfolio) self.SetExecution( MyOrderExecutionModel( algo=self, max_spread_pct=self.MAX_SPREAD_PCT, limit_order_pct=self.LIMIT_ORDER_PCT ) ) # Set the risk management model (manages risk for open positions) self.AddRiskManagement( MyMaximumDrawdownPercentPerSecurity( algo=self, max_dd_pct=self.MAX_TRADE_PCT_LOSS, constant_check_multiple=2.0 ) ) # self.Portfolio.MarginCallModel = MarginCallModel.Null #------------------------------------------------------------------------------- def CoarseSelectionFunction(self, coarse): """ Perform coarse filters on universe. Called once per day. Returns all stocks meeting the desired criteria. Attributes available: .AdjustedPrice .DollarVolume .HasFundamentalData .Price -> always the raw price! .Volume """ # Get the highest volume stocks stocks = [x for x in coarse if x.HasFundamentalData] sorted_by_dollar_volume = sorted( stocks, key=lambda x: x.DollarVolume, reverse=True ) top = self.HIGHEST_DOLLAR_VOLUME symbols = [x.Symbol for x in sorted_by_dollar_volume[:top]] # Print universe details when live mode if self.LiveMode: self.MyLog(f"Coarse filter returned {len(symbols)} stocks.") return symbols #------------------------------------------------------------------------------- def OnSecuritiesChanged(self, changes): """Event handler for changes to our universe.""" # Loop through securities added to the universe for security in changes.AddedSecurities: # Get the security symbol object and ticker (str) symbol_object = security.Symbol ticker = symbol_object.Value # Create a new SymbolData class object for the security if symbol_object != self.bm: self.symbol_data[ticker] = SymbolData(self, symbol_object) if self.LiveMode: self.MyLog(f"Created a new SymbolData object for {ticker}") # Loop through securities removed from the universe for security in changes.RemovedSecurities: # Get the security symbol object and ticker (str) symbol_object = security.Symbol ticker = symbol_object.Value # Liquidate removed securities if security.Invested: self.Liquidate(symbol_object) # Remove from symbol_data dictionary if ticker in self.symbol_data: # Remove bar consolidators for security self.SubscriptionManager.RemoveConsolidator( symbol_object, self.symbol_data[ticker].daily_consolidator ) self.SubscriptionManager.RemoveConsolidator( symbol_object, self.symbol_data[ticker].minute_consolidator ) # Remove ticker from symbol data dictionary self.symbol_data.pop(ticker) if self.LiveMode: self.MyLog(f"Removed SymbolData object for {ticker}") #------------------------------------------------------------------------------- def OnData(self, data): """Built-in event handler for new data.""" # Skip if DATA_MODE is Adjusted if DATA_MODE == 'ADJUSTED': return # Otherwise check for new dividends or splits for the active securities if data.Dividends.Count > 0 or data.Splits.Count > 0: # Loop through the symbol_data objects for ticker, symbol_data in self.symbol_data.items(): # SymbolData.symbol_object symbol_object = symbol_data.symbol_object # Check for a dividend if data.Dividends.get(symbol_object): # Get the dividend info dividend = data.Dividends[symbol_object].Distribution # Get last 2 daily prices hist = self.History([symbol_object], 2, Resolution.Daily) price = hist.iloc[-1]['close'] # [-1] for last previous_close = hist.iloc[0]['close'] # [0] for first # Calculate the dividend adjustment factor af = (previous_close-dividend)/previous_close # Adjust the SymbolData class indicators symbol_data.adjust_indicators(af) self.MyLog( f"Adjusted {ticker} indicators for dividend={dividend}, " f"with an adjustment factor={af}" ) # Check for a split if data.Splits.get(symbol_object): # Make sure the split has occured and not just the warning # split.Type == 0 for warning # split.Type == 1 for split occured if data.Splits[symbol_object].Type == 1: split = data.Splits[symbol_object].SplitFactor # Adjust the SymbolData class indicators symbol_data.adjust_indicators(split) self.MyLog( f"Adjusted {ticker} indicators for split={split}" ) #------------------------------------------------------------------------------- def ResubmitOrder(self, order, msg): """Built-in event handler for orders.""" if type(order) == qc.Orders.MarketOrder: order_type = 'Market' elif type(order) == qc.Orders.LimitOrder: order_type = 'Limit' # Get the limit price limit_price = order.LimitPrice else: self.MyLog( f"Invalid Order, but not a market or limit order! Order type=" f"{type(order)}" ) return # Get the order message, symbol, and qty self.MyLog( f"Invalid {order_type} Order! error: {msg}" ) symbol = order.Symbol order_qty = int(order.Quantity) # Check for insufficient buying power if 'Insufficient buying power' in msg: # Get the initial margin and free margin initial_margin = float( msg.split("Initial Margin: ")[1].split(",")[0] ) free_margin = float( msg.split("Free Margin: ")[1].split(",")[0].strip('.') ) # Get the max allowed position size margin_per_share = abs(initial_margin/order_qty) max_shares = int(abs((0.95*free_margin/margin_per_share))) # Get new qty if order_qty < 0: order_qty = -abs(max_shares) else: order_qty = abs(max_shares) self.MyLog( f"Initial number of shares exceeds margin " f"requirements! Reducing order qty to " f"{order_qty}" ) # Resubmit an order with a reduced qty if type(order) == qc.Orders.MarketOrder: self.MarketOrder(symbol, order_qty, asynchronous=True) elif type(order) == qc.Orders.LimitOrder: self.MyLog(f"Limit price={limit_price}") self.LimitOrder(symbol, order_qty, limit_price) #------------------------------------------------------------------------------- def OnOrderEvent(self, orderEvent): """Built-in event handler for orders.""" # Log message if self.LiveMode: self.MyLog( f"New order event: {orderEvent}, Status={orderEvent.Status}" ) # Catch invalid order if orderEvent.Status == OrderStatus.Invalid: try: # Resubmit a new order order = self.Transactions.GetOrderById(orderEvent.OrderId) # ticket = self.Transactions.GetOrderTicket(orderEvent.OrderId) # response = ticket.GetMostRecentOrderResponse() msg = orderEvent.get_Message() self.ResubmitOrder(order, msg) except: if self.LiveMode: self.MyLog( f'OnOrderEvent() exception: {traceback.format_exc()}' ) #------------------------------------------------------------------------------- def OnEndOfDay(self): """Event called at the end of the day.""" # Save the daily ending portfolio value self.times.append(self.Time) self.navs.append(self.Portfolio.TotalPortfolioValue) #------------------------------------------------------------------------------- def OnEndOfAlgorithm(self): """Built-in event handler for end of the backtest.""" # Save the portfolio values to the object store so we can evaluate # them in the Research environment key = 'ZScore' d = { 'time': self.times, 'value': self.navs, } serialized = pickle.dumps(d) self.ObjectStore.SaveBytes(key, serialized)
from AlgorithmImports import * # from Portfolio.MeanReversionPortfolioConstructionModel import * from scipy.stats import norm, zscore """ AE Changes 1. Trading securities with significant overlap/correlation. E.g. QQQ, TQQQ, SQQQ Added HasFundamentalData filter in CoarseSelectionFunction to only trade stocks. 2. changed number_of_symbols to be an input parameter 3. added z_model_period as input parameter Best performance via 30 symbol s and z model period = 50 """ class MeanReversionPortfolioAlgorithm(QCAlgorithm): # number_of_symbols = 10 def Initialize(self): # Set starting date, cash and ending date of the backtest self.SetStartDate(2022, 12, 1) # baseline from 1-1-2017 # self.SetEndDate(2017, 1, 18) self.SetCash(100000) self.Settings.RebalancePortfolioOnInsightChanges = True self.Settings.RebalancePortfolioOnSecurityChanges = False self.SetSecurityInitializer( lambda security: security.SetMarketPrice( self.GetLastKnownPrice(security) ) ) self.UniverseSettings.Resolution = Resolution.Hour # Requesting data self.AddUniverse(self.CoarseSelectionFunction) # Feed in data for 100 trading hours before the start date # self.SetWarmUp(100, Resolution.Hour) # Get the input parameters self.number_of_symbols = 50 #int(self.GetParameter("number_of_symbols")) self.z_model_period = 30 #int(self.GetParameter("z_model_period")) self.AddAlpha( ZScoreAlphaModel( lookupPeriod = self.z_model_period, resolution = Resolution.Daily ) ) ''' self.SetPortfolioConstruction( InsightWeightingPortfolioConstructionModel(Resolution.Hour) ) ''' self.SetPortfolioConstruction(EqualWeightingPortfolioConstructionModel(Resolution.Hour)) self.Settings.MinAbsolutePortfolioTargetPercentage = \ 0.00000000000000000000001 self.AddRiskManagement(MaximumDrawdownPercentPerSecurity()) #self.AddRiskManagement(TrailingStopRiskManagementModel()) self.SetExecution(ImmediateExecutionModel()) def CoarseSelectionFunction(self, coarse): hasdatasymbols = list(filter(lambda x: x.HasFundamentalData, coarse)) sortedByDollarVolume = sorted( hasdatasymbols, key=lambda x: x.DollarVolume, reverse=True ) highest_volume_symbols = \ [x.Symbol for x in sortedByDollarVolume[:self.number_of_symbols]] return highest_volume_symbols def OnSecuritiesChanged(self, changes): # liquidate removed securities for security in changes.RemovedSecurities: if security.Invested: self.Liquidate(security.Symbol) class ZScoreAlphaModel(AlphaModel): '''Alpha model that uses an Z score to create insights''' def __init__(self, lookupPeriod = 20, resolution = Resolution.Daily, max_weight = 0.2): '''Initializes a new instance of the ZScoreAlphaModel class Args: lookupPeriod: Look up period of history resolution: Resoultion of the history''' self.lookupPeriod = lookupPeriod self.resolution = resolution self.predictionInterval = \ Time.Multiply(Extensions.ToTimeSpan(resolution), lookupPeriod) self.symbolDataBySymbol = [] def Update(self, algorithm, data): '''Updates this alpha model with the latest data from the algorithm. This is called each time the algorithm receives data for subscribed securities Args: algorithm: The algorithm instance data: The new data available Returns: The new insights generated''' insights = [] df = algorithm.History( self.symbolDataBySymbol, self.lookupPeriod, self.resolution ) if df.empty: return insights # Make all of them into a single time index. df = df.close.unstack(level=0) # Mean of the stocks df_mean = df.mean() # standard deviation df_std = df.std() # get last prices df = df.iloc[-1] # calculate z_score z_score = (df.subtract(df_mean)).divide(df_std) magnitude = -z_score*df_std/df confidence = (-z_score).apply(norm.cdf) algorithm.Log(f'{algorithm.Time}: Average Z Score: {z_score.mean()}') algorithm.Log(f'{algorithm.Time}: Max Z Score: {z_score.max()}') algorithm.Log(f'{algorithm.Time}: Min Z Score: {z_score.min()}') # weight = confidence - 1 / (magnitude + 1) weights = {} for symbol in z_score.index: if z_score[symbol] > 3: insights.append( Insight.Price( symbol=symbol, period=timedelta(hours=1), direction=InsightDirection.Down, magnitude=magnitude[symbol], confidence=confidence[symbol], sourceModel=None, weight=z_score[symbol] - 3 ) ) algorithm.Log(f'Down Insight for {symbol}, Zscore: {z_score[symbol]}, Magnitude: {magnitude[symbol]}, Confidence: {confidence[symbol]},Weight: {z_score[symbol] - 3}') # elif z_score[symbol] < -3: # insights.append( # Insight.Price( # symbol=symbol, # period=timedelta(hours=1), # direction=InsightDirection.Up, # magnitude=magnitude[symbol], # confidence=confidence[symbol], # sourceModel=None, # weight= abs(z_score[symbol]) - 3 # ) # ) # algorithm.Log(f'Up Insight for {symbol}, Zscore: {z_score[symbol]}, Magnitude: {magnitude[symbol]}, Confidence: {confidence[symbol]},Weight: {z_score[symbol] - 3}') return insights def OnSecuritiesChanged(self, algorithm, changes): '''Event fired each time the we add/remove securities from the data feed Args: algorithm: The algorithm instance that experienced the change in securities changes: The security additions and removals from the algorithm''' for added in changes.AddedSecurities: if added.Symbol not in self.symbolDataBySymbol: #symbolData = SymbolData(added, self.fastPeriod, self.slowPeriod, algorithm, self.resolution) self.symbolDataBySymbol.append(added.Symbol) for removed in changes.RemovedSecurities: if removed.Symbol in self.symbolDataBySymbol: data = self.symbolDataBySymbol.remove(removed.Symbol)
#region imports from AlgorithmImports import * #endregion ''' ZScore Algorithm Platform: QuantConnect By: Aaron Eller www.excelintrading.com aaron@excelintrading.com Revision Notes: 1.0.0 (01/05/2022) - Initial 1.0.3 (03/27/2023) - Added ResubmitOrder() and OnOrderEvent() to QCAlgorithm. Useful References: -QC (Lean) Class List https://lean-api-docs.netlify.app/annotated.html ''' # BACKTEST DETAILS CASH = 100000 START_DATE = '12-01-2022' # '01-01-2011' # MM-DD-YYYY format END_DATE = None # MM-DD-YYYY format (or None for to date) TIMEZONE = 'US/Eastern' # e.g. 'US/Eastern', 'US/Central', 'US/Pacific' # Set the data normalizaton mode for price data # either 'ADJUSTED' (backtesting), or 'RAW' (necessary for live trading) # To best replicate real life trading - use RAW data normalization DATA_MODE = 'RAW' # Turn on/off using extended market hours USE_EXTENDED_HOURS = False #------------------------------------------------------------------------------ # GLOBAL VARIABLE INPUTS # Universe variables HIGHEST_DOLLAR_VOLUME = 50 MIN_TIME_IN_UNIVERSE = 1 # Alpha model (entry signal) variables Z_MODEL_PERIOD = 30 ZSCORE_SHORT_ENTRY_STD = 3.0 ZSCORE_LONG_ENTRY_STD = 3.0 LONGS_ALLOWED = False SHORTS_ALLOWED = True BAR_MINUTES = 30 MA_PERIOD_FAST = 3 MA_SLOW_MULTIPLE = 4 # MA_SLOW_PERIOD = this multiple x fast # Execution model variables MAX_SPREAD_PCT = 0.03 LIMIT_ORDER_PCT = 0 # Risk model variables MAX_TRADE_PCT_LOSS = 0.05 WAIT_DAYS_FOR_REENTRY = 2 # Position sizing variables MAX_WEIGHT = 1.0 END_OF_DAY_REDUCE_SIZE = False MAX_WEIGHT_OVERNIGHT = 0.10 END_OF_DAY_EXIT = False REBALANCE = True # simpler / faster testing without # Set the benchmark for scheduling functions / not traded BENCHMARK = 'SPY' # PORTFOLIO_SL_PCT = 0.10 # not using # REBALANCE_MINUTES = 0 ############################################################################### ############################ END OF ALL USER INPUTS ########################### ############################################################################### # VALIDATE USER INPUTS - DO NOT CHANGE BELOW!!! import datetime as dt #------------------------------------------------------------------------------ # Verify start date try: START_DT = dt.datetime.strptime(START_DATE, '%m-%d-%Y') except: raise ValueError( f"Invalid START_DATE format ({START_DATE}). Must be in MM-DD-YYYY " f"format." ) # Verify end date try: if END_DATE: END_DT = dt.datetime.strptime(END_DATE, '%m-%d-%Y') except: raise ValueError( f"Invalid END_DATE format ({END_DATE}). Must be in MM-DD-YYYY " "format or set to None to run to date." ) # Verify data normalization mode if DATA_MODE not in ['ADJUSTED', 'RAW']: raise ValueError( f"Invalid DATA_MODE ({DATA_MODE}). Must be 'ADJUSTED' or 'RAW'." )
# Standard library imports import datetime as DT # import numpy as np import pandas as pd import pytz import statistics # QuantConnect specific imports from AlgorithmImports import * # File imports from notes_and_inputs import * ################################################################################ class SymbolData(object): """Class to store data for a specific symbol.""" def __init__(self, algo, symbol_object): """Initialize SymbolData object.""" # Save a reference to the QCAlgorithm class self.algo = algo # Save the .Symbol object and the symbol's string self.symbol_object = symbol_object self.symbol = symbol_object.Value # Get the symbol's exchange market info self.get_exchange_info() # Get the symbol's specifications self.get_contract_specs() # Add strategy specific variables self.add_strategy_variables() # Add the bars and indicators required self.add_bars() self.add_indicators() # We want to immediately check for signals # self.check_for_signals(bar=None) #------------------------------------------------------------------------------- def get_exchange_info(self): """Get the security's exchange info.""" # Get the SecurityExchangeHours Class object for the symbol self.exchange_hours = \ self.algo.Securities[self.symbol_object].Exchange.Hours # Create a datetime I know the market was open for the full day dt = DT.datetime(2021, 1, 4) # Get the next open datetime from the SecurityExchangeHours Class mkt_open_dt = self.exchange_hours.GetNextMarketOpen(dt, False) # Save the typical (regualar session) market open and close times self.mkt_open = mkt_open_dt.time() mkt_close_dt = self.exchange_hours.GetNextMarketClose(dt, False) self.mkt_close = mkt_close_dt.time() # Get the exchange timezone self.mkt_tz = pytz.timezone(str(self.exchange_hours.TimeZone)) # Create pytz timezone objects for the exchange tz and local tz exchange_tz = self.mkt_tz local_tz = pytz.timezone(TIMEZONE) # Get the difference in the timezones # REF: http://pytz.sourceforge.net/#tzinfo-api # for pytz timezone.utcoffset() method # 3600 seconds/hour exchange_utc_offset_hrs = int(exchange_tz.utcoffset(dt).seconds/3600) local_utc_offset_hrs = int(local_tz.utcoffset(dt).seconds/3600) self.offset_hrs = exchange_utc_offset_hrs-local_utc_offset_hrs # NOTE: offset hours are very helpful if you want to schedule functions # around market open/close times # Get the market close time for the local time zone self.mkt_close_local_tz = \ (mkt_close_dt-DT.timedelta(hours=self.offset_hrs)).time() #------------------------------------------------------------------------------- def get_contract_specs(self): """Get the symbol's contract specs.""" # Get the SymbolProperties of the symbol symbol_properties = \ self.algo.Securities[self.symbol_object].SymbolProperties # Get and save the contract specs self.min_tick = symbol_properties.MinimumPriceVariation #------------------------------------------------------------------------------- def add_strategy_variables(self): """Add other required variables for the strategy.""" # Keep track of calendar initialized self.calendar_initialized = False # Initialize the ZScore, mean, and std to 0 self.zscore = 0 self.mean = 0 self.std = 0 # Initialize the target weight to 0 self.target_weight = 0 #------------------------------------------------------------------------------- def add_bars(self): """Add bars required.""" # Create the desired zscore bar consolidator for the symbol daily_consolidator = TradeBarConsolidator(self.daily_calendar) # Create an event handler to be called on each new consolidated bar daily_consolidator.DataConsolidated += self.on_daily_consolidated # Link daily_consolidator with our symbol and add it to the algo manager self.algo.SubscriptionManager.AddConsolidator( self.symbol_object, daily_consolidator ) # Save daily_consolidator link so we can remove it when necessary self.daily_consolidator = daily_consolidator # Create the desired minutely bar consolidator for the symbol minute_consolidator = TradeBarConsolidator( timedelta(minutes=self.algo.BAR_MINUTES) ) # Create an event handler to be called on each new consolidated bar minute_consolidator.DataConsolidated += self.on_minutely_consolidated # Link minute_consolidator with our symbol and add it to the algo manager self.algo.SubscriptionManager.AddConsolidator( self.symbol_object, minute_consolidator ) # Save consolidator link so we can remove it when necessary self.minute_consolidator = minute_consolidator #------------------------------------------------------------------------------- def add_indicators(self): """Add indicators and other required variables.""" # Keep a list of indicators self.indicators = [] # To calculate the ZScore we need to track the last # self.algo.Z_MODEL_PERIOD bar closes. # Also in order to update indicators later in the event of a split or # dividend we need the bars to calculate the indicators # Do both of these with a rolling window for the TradeBars self.min_bars = max([self.algo.Z_MODEL_PERIOD]) self.bar_window = RollingWindow[TradeBar](self.min_bars) # Create 2 moving averages for the smaller bar self.ma_fast = SimpleMovingAverage(self.algo.MA_PERIOD_FAST) slow_period = int(self.algo.MA_PERIOD_FAST*self.algo.MA_SLOW_MULTIPLE) self.ma_slow = SimpleMovingAverage(slow_period) self.indicators.append(self.ma_fast) self.indicators.append(self.ma_slow) # Keep the last slow_period minute bars self.minute_bar_window = RollingWindow[TradeBar](slow_period) # Warm up the indicators with historical data self.warmup_indicators() # Calculate the initial ZScore self.update_zscore() #------------------------------------------------------------------------------- def adjust_indicators(self, adjustment): """Adjust all indicators for splits or dividends.""" # Get a list of the current bars bars = list(self.bar_window) minute_bars = list(self.minute_bar_window) # Current order is newest to oldest (default for rolling window) # Reverse the list to be oldest to newest bars.reverse() minute_bars.reverse() # Reset all indicators self.reset_indicators() # Loop through the bars from oldest to newest for bar in bars: # Adjust the bar by the adjustment factor bar.Open *= adjustment bar.High *= adjustment bar.Low *= adjustment bar.Close *= adjustment # Use the bar to update the indicators # This also adds the bar to the rolling window self.update_daily_indicators(bar) for bar in minute_bars: # Adjust the bar by the adjustment factor bar.Open *= adjustment bar.High *= adjustment bar.Low *= adjustment bar.Close *= adjustment # Use the bar to update the indicators # This also adds the bar to the rolling window self.update_signal_bar_indicators(bar) #------------------------------------------------------------------------------- def reset_indicators(self): """Manually reset all of the indicators.""" # Loop through all indicators and reset them for indicator in self.indicators: indicator.Reset() # Reset the bar window self.bar_window.Reset() self.minute_bar_window.Reset #------------------------------------------------------------------------------- def daily_calendar(self, dt): """ Set up daily consolidator calendar info. This should return a start datetime object that is timezone unaware with a valid date/time for the desired securities' exchange's time zone. Useful Refs: datetime.replace() method: https://docs.python.org/3/library/datetime.html#datetime.datetime.replace """ # Need to handle case where algo initializes and this function is called # for the first time. if not self.calendar_initialized: # Since this doesn't matter, we'll pass dt as start and one day # as the timedelta until end_dt start_dt = dt end_dt = start_dt + DT.timedelta(1) self.calendar_initialized = True return CalendarInfo(start_dt, end_dt-start_dt) # Create a datetime.datetime object to represent the market open for the # **EXCHANGE** timezone start = dt.replace( hour=self.mkt_open.hour, minute=self.mkt_open.minute, second=0, microsecond=0 ) # Get today's end time from the SecurityExchangeHours Class object end = self.exchange_hours.GetNextMarketClose(start, False) # Catch when start is after the passed dt # QC now throws an error in this case if start > dt: # To handle the QC error, pass period for no data # Set the end to be the next desired start end = start # And set start to dt to avoid QC throwing error start = dt # This will result in the next dt being the desired start time # start = dt.replace(hour=9, minute=30, second=0, microsecond=0) # end = dt.replace(hour=16, minute=0, second=0, microsecond=0) # Return the start datetime and the consolidation period return CalendarInfo(start, end-start) #------------------------------------------------------------------------------- def warmup_indicators(self): """Warm up indicators using historical data.""" # Update warmup variable, so we don't try to take any signals self.warming_up = True # Get historical daily trade bars daily_bars = self.algo.History[TradeBar]( self.symbol_object, int(1.5*self.min_bars), Resolution.Daily ) # Loop through the bars and update the consolidator for bar in daily_bars: # self.consolidator1.Update(bar) # don't use method with daily bars # Instead pass the bar directly to the event handler self.on_daily_consolidated(None, bar) # Get historical minute trade bars num_minutes = int(2.0*self.algo.BAR_MINUTES*self.min_bars) minute_bars = self.algo.History[TradeBar]( self.symbol_object, num_minutes, Resolution.Minute ) # Loop through the bars and update the consolidator for bar in minute_bars: self.minute_consolidator.Update(bar) # Update warmup variable back to False self.warming_up = False #------------------------------------------------------------------------------- def on_daily_consolidated(self, sender, bar): """Event handler for daily bars.""" # Manually update all of the daily indicators self.update_daily_indicators(bar) #------------------------------------------------------------------------------- def on_minutely_consolidated(self, sender, bar): """Event handler for desired minute bar.""" # Update the signal bar indicators self.update_signal_bar_indicators(bar) # If not warming up, check for signals if not self.warming_up: self.check_for_signals(bar) #------------------------------------------------------------------------------- def update_daily_indicators(self, bar): """Manually update all of the symbol's daily bar indicators.""" # Add the bar to the bar window self.bar_window.Add(bar) # Update the ZScore self.update_zscore() #------------------------------------------------------------------------------- def update_signal_bar_indicators(self, bar): """Manually update all of the symbol's signal bar indicators.""" # Add the bar to the bar window self.minute_bar_window.Add(bar) # Update the moving averages self.ma_fast.Update(bar.EndTime, bar.Close) self.ma_slow.Update(bar.EndTime, bar.Close) # Update the ZScore - to be sure self.update_zscore() #------------------------------------------------------------------------------- def update_zscore(self): """Manually calculate the z score.""" # Check if we can calculate the ZScore if self.bar_window.IsReady: # Get the bars as a list # Only get the bars for the ZScore period bars = list(self.bar_window)[:self.algo.Z_MODEL_PERIOD] # Now get the closes only bar_closes = [bar.Close for bar in bars] # Calculate the mean self.mean = statistics.mean(bar_closes) # Calculate the standard deviation self.std = statistics.stdev(bar_closes) # Calculate the ZScore self.last_close = bar_closes[0] # ordered in newest to oldest in list # Could update this to save mean and std # then reference them and calculate zscore during the day with latest price # instead of only doing this calculation once at the end of the day. if self.std != 0: self.zscore = (self.last_close-self.mean)/self.std else: self.zscore = 0 #------------------------------------------------------------------------------- def check_for_signals(self, bar): """Check for signals.""" # Ignore checks at 9:31 -> based on stale data if self.algo.Time.hour == 9 and self.algo.Time.minute == 31: return # # Cancel the open entry order if it was not filled # if self.entry_order: # response = self.entry_order.Cancel() # if response.IsSuccess: # self.entry_order = None # Reset the target weight each time we check for new signals # self.target_weight = 0 # Get the current qty qty = self.current_qty # Check for long position if qty > 0: # Check for a valid long exit signal if self.long_exit_signal(bar): # Go flat self.go_flat(bar) # Check for short position elif qty < 0: # Check for a valid short exit signal if self.short_exit_signal(bar): # Go flat self.go_flat(bar) # Otherwise flat else: # Check for a valid long entry signal if self.long_entry_signal(bar): # Go long self.go_long(bar) # Check for a valid short entry signal elif self.short_entry_signal(bar): # Go short self.go_short(bar) #------------------------------------------------------------------------------- def long_entry_signal(self, bar): """Check for a valid long entry signal.""" # Longs must be allowed if not LONGS_ALLOWED: return False # Must be oversold (zscore) if not self.oversold: return False # # Require the fast ma to be above the slow ma # if self.ma_fast.Current.Value <= self.ma_slow.Current.Value: # return False # If all criteria above passes, signal is True return True #------------------------------------------------------------------------------- def long_exit_signal(self, bar): """Check for a valid long exit signal.""" # Valid when long entry is no longer valid return not self.long_entry_signal(bar) #------------------------------------------------------------------------------- def short_entry_signal(self, bar): """Check for a valid short entry signal.""" # if self.symbol == 'NTRB': # print('debug') # Shorts must be allowed if not SHORTS_ALLOWED: return False # Must be overbought (zscore) if not self.overbought: return False # # Require the fast ma to be below the slow ma # if self.ma_fast.Current.Value >= self.ma_slow.Current.Value: # return False # If all criteria above passes, signal is True return True #------------------------------------------------------------------------------- def short_exit_signal(self, bar): """Check for a valid short exit signal.""" # Valid when short entry is no longer valid return not self.short_entry_signal(bar) #------------------------------------------------------------------------------- def go_long(self, bar): """Go long the symbol.""" # Handle the orders at the QCAlgorithm level # Update the target weight target_weight = abs(self.zscore)-self.algo.ZSCORE_LONG_ENTRY_STD self.target_weight = target_weight #------------------------------------------------------------------------------- def go_short(self, bar): """Go short the symbol.""" # Handle the orders at the QCAlgorithm level # Update the target weight target_weight = -(self.zscore-self.algo.ZSCORE_SHORT_ENTRY_STD) self.target_weight = target_weight #------------------------------------------------------------------------------- def go_flat(self, bar): """Go flat the symbol.""" # Handle the orders at the QCAlgorithm level # Update target weight self.target_weight = 0 #------------------------------------------------------------------------------- @property def indicators_ready(self): """Check if all of the indicators used are ready (warmed up).""" # Loop through the indicators & return False if one is not ready for indicator in self.indicators: if not indicator.IsReady: return False # Check the bar windows if not self.bar_window.IsReady: return False # Otherwise True return True #------------------------------------------------------------------------------- @property def current_qty(self): """Return the current quantity held in the portfolio.""" return self.algo.Portfolio[self.symbol_object].Quantity #------------------------------------------------------------------------------- @property def price(self): """Return the current price.""" return self.algo.Portfolio[self.symbol_object].Price #------------------------------------------------------------------------------- @property def overbought(self): """Return's whether the zscore suggests the symbol is overbought.""" return self.zscore > self.algo.ZSCORE_SHORT_ENTRY_STD #------------------------------------------------------------------------------- @property def oversold(self): """Return's whether the zscore suggests the symbol is oversold.""" return self.zscore < -self.algo.ZSCORE_LONG_ENTRY_STD #------------------------------------------------------------------------------- def round_price(self, price): """Round price to the nearest tick value.""" # Get the priced rounded to the nearest tick value price = round(price/self.min_tick)*self.min_tick # Get numbers to the left of the decimal place # if 'e' in str(price): # # handle scientific notation # pass # else: num_left_decimal = str(price).index('.') # Get desired number of decimals num_decimals = len(str(price))-num_left_decimal-1 # Return the truncated price truncated_price = float(str(price)[:num_left_decimal+num_decimals+1]) # self.algo.Debug(f'price {price}, min tick={self.min_tick}, num_decimals=' # f'{num_decimals} rounded to {truncated_price}') return truncated_price #------------------------------------------------------------------------------- def on_order_event(self, order_event): """New order event.""" # Get the order details order = self.algo.Transactions.GetOrderById(order_event.OrderId) order_qty = int(order.Quantity) avg_fill = order_event.FillPrice tag = order.Tag # Get current qty of symbol qty = self.current_qty