Overall Statistics |
Total Trades 533 Average Win 0.36% Average Loss -0.15% Compounding Annual Return 3.072% Drawdown 8.400% Expectancy 0.201 Net Profit 8.877% Sharpe Ratio 0.687 Probabilistic Sharpe Ratio 25.770% Loss Rate 65% Win Rate 35% Profit-Loss Ratio 2.46 Alpha 0 Beta 0 Annual Standard Deviation 0.032 Annual Variance 0.001 Information Ratio 0.687 Tracking Error 0.032 Treynor Ratio 0 Total Fees $2595.97 Estimated Strategy Capacity $5800000000.00 Lowest Capacity Asset ZC XUBT0M6O6LNP |
# Standard library imports from AlgorithmImports import * import datetime as DT from dateutil.parser import parse # Import from files from notes_and_inputs import * ################################################################################ class MyCustomData(PythonData): """ Custom Data Class REFs: https://www.quantconnect.com/forum/discussion/4079/python-best-practise-for-using-consolidator-on-custom-data/p1 """ def GetSource(self, config, date, isLiveMode): # Get file specific to the asset symbol symbol = config.Symbol.Value # Must use dictionary.get() method because this will be called on initialization # without a valid symbol # symbol must also always be all caps, because QC converts it to all caps! file = CSV_FILES.get(symbol, '') return SubscriptionDataSource( file, SubscriptionTransportMedium.RemoteFile) def Reader(self, config, line, date, isLiveMode): # New object asset = MyCustomData() asset.Symbol = config.Symbol # try: # Example File Format: # <Date> <Time> <Open> <High> <Low> <Close> <Volume> # 2/1/2018 10:30:00 13.59 13.78 13.41 13.67 19817603 # If first character is not a digit, return if not (line.strip() and line[0].isdigit()): return None data = line.split(',') # Get the date date = parse(data[0]) time = parse(data[1]).time() # Combine date/time to get DT.datetime object asset.Time = DT.datetime.combine(date, time) # Set the value used for filling positions / Using the open price value = float(data[2]) asset.Value = value # Get the prices asset["Open"] = float(data[2]) asset["High"] = float(data[3]) asset["Low"] = float(data[4]) asset["Close"] = float(data[5]) asset["Volume"] = float(data[6]) # except ValueError: # # Do nothing # return None return asset
# Standard library imports from AlgorithmImports import * import datetime as DT # Import from files from notes_and_inputs import * from symbol_data import * from custom_data import * ################################################################################ class EquitiesStrategyAlgorithm(QCAlgorithm): def Initialize(self): """Initialize algorithm.""" # Set backtest details self.set_backtest_details() # Initialize algo parameters self.initialize_parameters() # Add all desired instruments to the algo self.add_instruments() self.SetWarmUp(DT.timedelta(days = 305)) #------------------------------------------------------------------------------- def set_backtest_details(self): """Set the backtest details.""" # Set the start and end date (if applicable) self.SetStartDate( BACKTEST_START_DT.year, BACKTEST_START_DT.month, BACKTEST_START_DT.day ) if END_DATE: self.SetEndDate(END_DT.year, END_DT.month, END_DT.day) # Set the starting cash amount self.SetCash(CASH) # Set the timezone for algo logs self.SetTimeZone(TIMEZONE) # Setup trading framework # Transaction and submit/execution rules will use IB models # Cannot use the built-in models with the custom data if DATA_SOURCE == 'QC': self.SetBrokerageModel( BrokerageName.InteractiveBrokersBrokerage, AccountType.Margin ) # Will override the desired reality methods with this function # Configure all algorithm securities self.SetSecurityInitializer(self.custom_security_initializer) #------------------------------------------------------------------------------- def initialize_parameters(self): """Read all algo input parameters and set up all others required.""" # Read user inputs self.min_p1 = MIN_P1 self.min_p2 = MIN_P2 self.level_a = LEVEL_A self.level_b = LEVEL_B self.level_c = LEVEL_C self.stop_ix = STOP_IX self.length = LENGTH self.stop_ix_r = STOP_IX_R self.stop_dollar = STOP_DOLLAR self.atr_length_r = ATR_LENGTH_R self.risk_pct = RISK_PCT self.lookback = LOOKBACK # Create an empty list to hold order info # This will hold (contract, qty, tag) tuples for the desired orders when # data was not available at the time an order was initially desired. self.orders = [] #------------------------------------------------------------------------------- def add_instruments(self): """Add desired instrument data to the algo.""" # Create a dictionary to hold all symbol data objects self.symbol_data = {} # Loop through all futures for root in FUTURES: # Check if 'QC' data is used if DATA_SOURCE == 'QC': # Set the continuous future contract mapping based on input if CONTINUOUS_MAPPING == 'LastTradingDay': contract_mapping = DataMappingMode.LastTradingDay elif CONTINUOUS_MAPPING == 'FirstDayMonth': contract_mapping = DataMappingMode.FirstDayMonth elif CONTINUOUS_MAPPING == 'OpenInterest': contract_mapping = DataMappingMode.OpenInterest # Set the continuous future pricing based on input if CONTINUOUS_PRICE_SCALE == 'BackwardsPanamaCanal': price_scaling = DataNormalizationMode.BackwardsPanamaCanal elif CONTINUOUS_PRICE_SCALE == 'BackwardsRatio': price_scaling = DataNormalizationMode.BackwardsRatio elif CONTINUOUS_PRICE_SCALE == 'Raw': price_scaling = DataNormalizationMode.Raw # Add future data and save link to the continuous future f = self.AddFuture( root, extendedMarketHours=True, # defaults to False dataNormalizationMode = price_scaling, dataMappingMode = contract_mapping, contractDepthOffset = 0 ) # Set future contract filter days = int(10*ROLL_DAYS_BEFORE_EXPIRY) f.SetFilter(DT.timedelta(0), DT.timedelta(days)) # Set data resolution based on input # Will be referenced in SymbolData class. if DATA_RESOLUTION == 'SECOND': self.resolution = Resolution.Second elif DATA_RESOLUTION == 'MINUTE': self.resolution = Resolution.Minute elif DATA_RESOLUTION == 'HOUR': self.resolution = Resolution.Hour # Otherwise 'CUSTOM' data is used else: f = self.AddData(MyCustomData, root) # Create symbol data object for the equity self.symbol_data[root] = SymbolData(self, root, f) #------------------------------------------------------------------------------- def custom_security_initializer(self, security): """ Define models to be used for securities as they are added to the algorithm's universe. """ # Define the data normalization mode security.SetDataNormalizationMode(DataNormalizationMode.Adjusted) #------------------------------------------------------------------------------- def OnData(self, data): """Built-in event handler for new data.""" # Check if there are any orders to try to place if len(self.orders) > 0: # Process orders in the queue self.ProcessOrders() #------------------------------------------------------------------------------- def ProcessOrders(self): """Process any orders in the queue once data is available.""" # Loop through copy of order tuples for tup in self.orders[:]: # Get individual order tuple values contract = tup[0] qty = tup[1] tag = tup[2] # Check if data is available for the contract if self.Securities[contract].HasData: # Get root symbol and it's SymbolData class instance root = contract.ID.Symbol symbol_data = self.symbol_data[root] # Log message when desired self.Log( f"Algo now has data for {contract}. Placing {tag} order " f"for {qty} contracts." ) # Place the desired MarketOrder self.MarketOrder(contract, qty, tag=tag) # Remove from list self.orders.remove(tup) #------------------------------------------------------------------------------- def OnOrderEvent(self, orderEvent): """Built-in event handler for orders.""" # Skip if not filled if orderEvent.Status != OrderStatus.Filled: return # Get the order details order = self.Transactions.GetOrderById(orderEvent.OrderId) order_qty = order.Quantity # Get current qty qty = self.Portfolio[order.Symbol].Quantity # Get the current order's average fill price avg_fill = round(orderEvent.FillPrice, 4) # Get the symbol_data object for the root symbol if DATA_SOURCE == 'QC': symbol = str(order.Symbol) root = order.Symbol.ID.Symbol symbol_data = self.symbol_data[root] else: # Remove the .MyCustomData symbol = str(order.Symbol).split(".")[0] symbol_data = self.symbol_data[symbol] # Get active stop order id if symbol_data.stop_order: stop_order_id = symbol_data.stop_order.OrderId else: stop_order_id = 0 # Check for filled stop order if orderEvent.OrderId == stop_order_id: # Filled stop loss order - set to None symbol_data.stop_order = None # Log message when desired if PRINT_ORDERS: self.Log( f"{symbol} filled stop order for {order_qty} shares @ " f"{avg_fill}" ) # Set mp to 0 # symbol_data.mp = 0 return # Check for a market exit order elif qty == 0: # Filled exit order # symbol_data.stop_order = None # Log message when desired if PRINT_ORDERS: self.Log(f"{symbol} filled exit order for {order_qty} shares @ " f"{avg_fill}") # Set mp to 0 # symbol_data.mp = 0 return # Otherwise qty not 0, so entry order else: # Filled entry order # Get and save the cost basis for the position cost_basis = avg_fill symbol_data.cost_basis = cost_basis # Log message when desired if PRINT_ORDERS: if qty > 0: self.Log( f"{symbol} filled long entry order for {order_qty} " f"shares @ {avg_fill}, new qty = {qty}" ) elif qty < 0: self.Log( f"{symbol} filled short entry order for {order_qty} " f"shares @ {avg_fill}, new qty = {qty}" ) # Immediately place new stop market order symbol_data.PlaceStopOrder(qty)
#region imports from AlgorithmImports import * #endregion """ Matt Custom Strategy Version 1.0.7 for Futures Platform: QuantConnect By: Aaron Eller For: Matt Blonc www.excelintrading.com aaron@excelintrading.com Revision Notes: 1.0.0 (09/08/2021) - Initial 1.0.1 (09/10/2021) - Added 'CUSTOM_TIMES' BAR option. 1.0.2 (09/17/2021) - Added DATA_SOURCE input and custom data logic. - Modified symbol_data.set_indicators() to handle custom data consolidators. - Modified set_backtest_details() and OnOrderEvent() to handle custom data symbols. - Modified to work with futures. 1.0.3 (09/22/2021) - Updated PlaceStopOrder() to use atr_r. - Updated up/down count and up/down support/resistance logic. 1.0.4 (09/23/2021) - Added logic to prevent reversal entry and exit on same bar that results in doubling the position size. 1.0.5 (09/24/2021) - Updated to work with custom future's data. 1.0.6 (12/29/2022) - Added CONTINUOUS_MAPPING and CONTINUOUS_PRICE_SCALE. - Updated SymbolData.CustomCalendar(). - Replaced SymbolData.DailyUSEquityCalendar() with DailyFutureCalendar. - Added OPEN_TIMES and CLOSE_TIMES inputs necessary when custom data is used. 1.0.7 (01/06/2023) - Changed 'QC' data to subscribe to the active contract to be traded. - Changed contract roll time check to be handled in SymbolData class via scheduled function rather than in QCAlgorithm.OnData(). - Added DATA_RESOLUTION input and logic. - Added ROLL_TO_NEW_CONTRACT input and logic. This also added QCAlgorithm.orders list and logic. - Updated logic to ignore orders for zero quantity to avoid errors. References: -Continuous Futures https://www.quantconnect.com/forum/discussion/12644/ continuous-futures-support/p1 -QC (Lean) Class List https://lean-api-docs.netlify.app/annotated.html -Creating a Custom Indicator https://www.quantconnect.com/forum/discussion/3383/custom-indicator-in-python-algorithm/p1 """ ################################################################################ import datetime as DT # USER INPUTS # Backtest details START_DATE = "01-01-2019" # MM-DD-YYYY format END_DATE = "10-21-2021" #"12-31-2021" # MM-DD-YYYY format (or None for to current date) CASH = 400000 # starting portfolio cash amount TIMEZONE = "US/Central" # e.g. "US/Eastern", "US/Central", "US/Pacific" # Set the futures to trade # Be sure to use ALL CAPS FUTURES = ['ZC','ZS'] # Must tell QC the multiplier if CUSTOM data is used MULTIPLIERS = {} MULTIPLIERS['CL'] = 1000 # MULTIPLIERS['ZF'] = ? # Set the number of days until expiry to trade the next contract ROLL_DAYS_BEFORE_EXPIRY = 10 #------------------------------------------------------------------------------- # CONTINUOUS FUTURES INPUTS # Define the data resolution for the futures to be fed to the algorithm # Must be "SECOND", "MINUTE", or "HOUR" for futures # NOTE: 'SECOND' is very slow so 'MINUTE' is optimal for faster testing DATA_RESOLUTION = 'HOUR' # Define the desired continuous futures' contract mapping # This is when contracts are rolled # Must be one of the following: # "LastTradingDay": The contract maps on the previous day of expiration of the # front month. # "FirstDayMonth": The contract maps on the first date of the delivery month # of the front month. If the contract expires prior to this date, then it # rolls on the contract's last trading date instead. # "OpenInterest": The contract maps when the back month contract has a higher # traded volume that the current front month. CONTINUOUS_MAPPING = 'OpenInterest' # Define the desired continuous futures' price scaling # Must be one of the following: # "BackwardsPanamaCanal": Eliminates price jumps between two consecutive # contracts, adding a factor based on the difference of their prices. Last # contract is the true one, factor 0 # "BackwardsRatio": Eliminates price jumps between two consecutive contracts, # multiplying the prices by their ratio. Last contract is the true one, # factor 1. # "Raw": No price adjustment is made. CONTINUOUS_PRICE_SCALE = 'BackwardsPanamaCanal' # When the algo switches to a new contract, should we immediately take the same # position that we had with the old contract in the new one? ROLL_TO_NEW_CONTRACT = True #------------------------------------------------------------------------------- # BAR TO TRACK FOR EACH EQUITY # Tell algorithm to use QC data or custom data DATA_SOURCE = 'QC' # must be 'QC' or 'CUSTOM' # Must tell QC the open/close times if CUSTOM data is used OPEN_TIMES = {} CLOSE_TIMES = {} OPEN_TIMES['CL'] = DT.time(18,0) CLOSE_TIMES['CL'] = DT.time(17,0) #------------------------------------------------------------------------------- # The following inputs are used for 'QC' data: # Use the following format: '5 min', '1 hr', '1 day' # Or to use CUSTOM_TIMES below, set BAR = 'CUSTOM_TIMES' BAR = '1 hr' # Define the custom intraday minutely bars to track # List specific bar start/stop times # Only used when BAR above is set to be 'CUSTOM_TIMES' CUSTOM_TIMES = [ '1800-2200', # 4hrs at market open '2200-0200', # 4hrs '0200-0600', # 4hrs '0600-1000', # 4hrs '1000-1400', # 4hrs '1400-1700', # 3hrs to end at market close ] #------------------------------------------------------------------------------- # The following inputs are used for 'CUSTOM' data # Create a dictionary to hold all links to csv custom data CSV_FILES = {} # Add link for all EQUITIES listed above # Note that the link must automatically download the data as a csv file # If using dropbox remember to add the &dl=1 to trigger a download CSV_FILES['CL'] = 'https://www.dropbox.com/s/oe1zop5jsmy3uij/QCL%23C.csv?dl=1' # Set period for custom data CUSTOM_DATA_PERIOD = DT.timedelta(minutes=60) # 'days', 'minutes', 'hours' are all possible arguments for DT.timedelta #------------------------------------------------------------------------------- # INDICATOR INPUTS MIN_P1 = 3 MIN_P2 = 8 LEVEL_A = .7 LEVEL_B = .8 LEVEL_C = 1.7 STOP_IX = 6 LENGTH = 40 LOOKBACK = 305 # default setting for MaxBarsBack in Multicharts # POSITION SIZING INPUTS # STOP_IX_R = 2 # ATR_LENGTH_R = 160 STOP_IX_R = 6 ATR_LENGTH_R = 40 RISK_PCT = 0.0014 STOP_DOLLAR = 1400 #------------------------------------------------------------------------------- # Turn on/off logs PRINT_CX_UPDATES = True # print logs for updating futures' contracts PRINT_SIGNALS = True # print entry/exit signals PRINT_ORDERS = True # print order details ################################################################################ ############################ END OF ALL USER INPUTS ############################ ################################################################################ ################################################################################ # VALIDATE USER INPUTS - DO NOT CHANGE BELOW!!! # Verify start date try: START_DT = DT.datetime.strptime(START_DATE, '%m-%d-%Y') except: raise ValueError("Invalid START_DATE format ({}). Must be in MM-DD-YYYY " "format.".format(START_DATE)) # Verify end date try: if END_DATE: END_DT = DT.datetime.strptime(END_DATE, '%m-%d-%Y') except: raise ValueError("Invalid END_DATE format ({}). Must be in MM-DD-YYYY " "format or set to None to run to date.".format(END_DATE)) #------------------------------------------------------------------------------- # Verify BAR # First check if 'CUSTOM_TIMES' is used if BAR == 'CUSTOM_TIMES': # Create empty list of times to fill BAR_TIMES = [] # Loop through CUSTOM_TIMES for time_str in CUSTOM_TIMES: try: # Get bar start hour and minutes start_str = time_str.split('-')[0] start_hr = int(start_str[:2]) start_min = int(start_str[-2:]) # Get bar end hour and minutes end_str = time_str.split('-')[1] end_hr = int(end_str[:2]) end_min = int(end_str[-2:]) # Create a datetime.time object for start and end times time_start = DT.time( hour=start_hr, minute=start_min, second=0, microsecond=0) time_end = DT.time( hour=end_hr, minute=end_min, second=0, microsecond=0) # Add (start time, end time) tuple to BAR_DTS list BAR_TIMES.append((time_start, time_end)) # Set BAR_UNIT to be minutes BAR_UNIT = 'min' except: raise ValueError("Invalid CUSTOM_TIMES entry: {}".format(time_str)) else: try: # Get the bar integer and bar unit BAR_INT = int(BAR.split(" ")[0]) BAR_UNIT = BAR.split(" ")[1].lower() # Verify bar unit is valid if BAR_UNIT not in ['min', 'hr', 'day']: raise ValueError( "Invalid BAR ({}). Unit must be 'min', 'hr', or 'day'.".format( BAR)) # Check for hourly bar unit elif BAR_UNIT == 'hr': # Convert hr bar int to be minutes BAR_INT *= 60 # Change the BAR_UNIT to now be 'min' BAR_UNIT = 'min' # Only allow '1 day' bars elif BAR_UNIT == 'day' and BAR_INT != 1: raise ValueError( "Invalid BAR ({}). Multiple 'day' bars are not allowed.".format( BAR)) except: raise ValueError( "Invalid BAR: {}. Use '5 min', '1 hr', or '1 day' format".format( BAR)) # Verify data source if DATA_SOURCE == 'QC': pass elif DATA_SOURCE == 'CUSTOM': # Make sure all equities have a data source for future in FUTURES: if future not in CSV_FILES: raise ValueError("Using 'CUSTOM' data and no CSV_FILES link defined" " for {}.".format(equity)) else: raise ValueError("Invalid DATA_SOURCE ({}). Must be 'QC' or " "'CUSTOM'.".format(DATA_SOURCE)) #------------------------------------------------------------------------------- # Verify DATA_RESOLUTION input DATA_RESOLUTION = DATA_RESOLUTION.upper() resolutions = ['SECOND', 'MINUTE', 'HOUR'] if DATA_RESOLUTION not in resolutions: raise ValueError( f"Invalid DATA_RESOLUTION ({DATA_RESOLUTION}). Must be: {resolutions}" ) #------------------------------------------------------------------------------- # Verify CONTINUOUS_MAPPING input maps = ["LastTradingDay", "FirstDayMonth", "OpenInterest"] if CONTINUOUS_MAPPING not in maps: raise ValueError( f"Invalid CONTINUOUS_MAPPING ({CONTINUOUS_MAPPING}). Must be: {maps}" ) #------------------------------------------------------------------------------- # Verify CONTINUOUS_PRICE_SCALE input scales = ["BackwardsPanamaCanal", "BackwardsRatio", "Raw"] if CONTINUOUS_PRICE_SCALE not in scales: raise ValueError( f"Invalid CONTINUOUS_PRICE_SCALE ({CONTINUOUS_PRICE_SCALE}). Must be: " f"{scales}" ) #------------------------------------------------------------------------------- # Calculate the minimum number of bars required to warm up the indicators WARMUP_DAYS = int(1*max(ATR_LENGTH_R, LENGTH)) # Set the start date based on the desired days to warm up the algo # approximately 252 market days per 365 calendar days # CALENDAR_DAYS = int(WARMUP_DAYS*(365/252)) CALENDAR_DAYS = 1 BACKTEST_START_DT = START_DT - DT.timedelta(days=CALENDAR_DAYS)
#region imports from AlgorithmImports import * #endregion # Standard library imports import datetime as DT import math import numpy as np import pytz import random # Import from files from notes_and_inputs import * ################################################################################ class SymbolData(object): """Class to store data for a specific security symbol.""" def __init__(self, algo, root, continuous_contract): """Initialize SymbolData object.""" # Save the parameters self.algo = algo self.symbol = root self.continuous_contract = continuous_contract self.symbol_object = continuous_contract.Symbol self.security = continuous_contract.Symbol # Add strategy variables self.add_strategy_variables() # Get the exchange info self.get_exchange_info() # Add indicators self.add_indicators() # Schedule functions self.schedule_functions() #------------------------------------------------------------------------------- def add_strategy_variables(self): """Add strategy variables required.""" # Create a variables to hold the contract details self.consolidator = None self.contract = None self.min_tick = None # Save the multiplier from the inputs if custom data is used if DATA_SOURCE == 'CUSTOM': self.multiplier = MULTIPLIERS[self.symbol] self.have_contract_specs = True else: # We cannot initially get the contract specs, because we don't have # access to an actual contract upon initialization. # So set have_contract_specs variable False self.have_contract_specs = False #------------------------------------------------------------------------------- def get_exchange_info(self): """Get the security's exchange info.""" # Get the security symbol security = self.security # Get the SecurityExchangeHours Class object for the symbol self.exchange_hours = self.algo.Securities[security].Exchange.Hours if DATA_SOURCE == 'CUSTOM': # Create a datetime I know the market was open # Add the open time for the root symbol open_time = OPEN_TIMES[self.symbol] open_dt = DT.datetime( 2021, 1, 4, hour=open_time.hour, minute=open_time.minute ) close_time = CLOSE_TIMES[self.symbol] if close_time > open_time: close_dt = DT.datetime( 2021, 1, 4, hour=close_time.hour, minute=close_time.minute ) else: close_dt = DT.datetime( 2021, 1, 5, hour=close_time.hour, minute=close_time.minute ) # Create a datetime I know the market was open (that next Monday) dt = DT.datetime(2021, 1, 4) else: # Create a datetime I know the market was closed (Saturday) dt = DT.datetime(2021, 1, 2) # Get typical open time from the SecurityExchangeHours Class open_dt = self.exchange_hours.GetNextMarketOpen( dt, extendedMarket=True) # Create a datetime I know the market was open (that next Monday) dt = DT.datetime(2021, 1, 4) # Get next close datetime and time close_dt = self.exchange_hours.GetNextMarketClose( dt, extendedMarket=True) self.mkt_open = open_dt.time() self.mkt_close = close_dt.time() # Check if there is a trading halt that needs to be considered if DATA_SOURCE == 'QC': next_open_dt = self.exchange_hours.GetNextMarketOpen( close_dt, extendedMarket=True) next_open_time = next_open_dt.time() if self.mkt_open != next_open_time: # This open time is continuing after a trading halt # Get the next period's close time and save as main close time close_dt = \ self.exchange_hours.GetNextMarketClose( next_open_dt, extendedMarket=True) self.mkt_close = close_dt.time() # Get the typical period in minutes self.period_minutes = (close_dt-open_dt).seconds/60 # Get the exchange timezone self.mkt_tz = pytz.timezone(str(self.exchange_hours.TimeZone)) # Create pytz timezone objects for the exchange tz and local tz exchange_tz = self.mkt_tz local_tz = pytz.timezone(TIMEZONE) # Get the difference in the timezones # REF: http://pytz.sourceforge.net/#tzinfo-api # for pytz timezone.utcoffset() method # 3600 seconds/hour exchange_utc_offset_hrs = int(exchange_tz.utcoffset(dt).seconds/3600) local_utc_offset_hrs = int(local_tz.utcoffset(dt).seconds/3600) self.offset_hrs = exchange_utc_offset_hrs-local_utc_offset_hrs # NOTE: offset hours are very helpful if you want to schedule functions # around market open/close times # Get the market open and close times for the local time zone self.mkt_open_local_tz = \ (open_dt-DT.timedelta(hours=self.offset_hrs)).time() self.mkt_close_local_tz = \ (close_dt-DT.timedelta(hours=self.offset_hrs)).time() #------------------------------------------------------------------------------- def add_indicators(self): """Set up the security's indicators to be updated on the desired BAR.""" # Set up the ATRs for the equity # self.atr = AverageTrueRange( # self.algo.length, MovingAverageType.Exponential) self.atr = AverageTrueRange( self.algo.length, MovingAverageType.Simple ) # self.atr_r = AverageTrueRange( # self.algo.atr_length_r, MovingAverageType.Exponential) self.atr_r = AverageTrueRange( self.algo.atr_length_r, MovingAverageType.Simple ) # REF: https://lean-api-docs.netlify.app/MovingAverageTypeExtensions_8cs_source.html # Keep a rolling window of the last length bars self.lookback = self.algo.lookback self.bars = RollingWindow[TradeBar](self.lookback) # Keep a link to the active stop loss order self.stop_order = None # Save the positions active cost basis # Will use this actual amount for the stop loss instead of using the # assumed fill price at the bar's open self.cost_basis = None # Initialize other variables required for the algo self.mp = 0 self.up_s_exit = -99999 self.dn_s_exit = 99999 self.up_f_exit = -99999 self.dn_f_exit = 99999 self.up_signal = 0 self.dn_signal = 0 self.up_fix = 0 self.dn_fix = 0 self.up_ct = 1 self.dn_ct = 1 self.up_res = 99999 self.dn_res = 99999 self.up_sup = -99999 self.dn_sup = -99999 self.p1_up_ready = False self.p1_dn_ready = False self.up_f = -99999 self.dn_f = 99999 self.p1_up_f = 99999 self.p1_dn_f = -99999 self.p1_up_e = 0 self.p1_dn_e = 0 self.p1_up_noise_a = 0 self.p1_dn_noise_a = 0 self.p1_up_noise_b = 0 self.p1_dn_noise_b = 0 self.p1_up_noise_c = 0 self.p1_dn_noise_c = 0 self.p1_up_entry = 99999 self.p1_dn_entry = -99999 self.reversal = 0 self.p2_up_ready = False self.p2_dn_ready = False self.p2_up_e = 0 self.p2_dn_e = 0 self.p2_up_noise_a = 0 self.p2_dn_noise_a = 0 self.p2_up_noise_b = 0 self.p2_dn_noise_b = 0 self.p2_up_noise_c = 0 self.p2_dn_noise_c = 0 self.up_swing = 0 self.dn_swing = 0 self.up_e = 0 self.dn_e = 0 self.up_noise_a = 0 self.dn_noise_a = 0 self.up_noise_b = 0 self.dn_noise_b = 0 self.up_noise_c = 0 self.dn_noise_c = 0 self.up_entry = 99999 self.dn_entry = -99999 self.up_ap = 99999 self.dn_ap = 99999 self.up_am = -99999 self.dn_am = -99999 self.up_bp = 99999 self.dn_bp = 99999 self.up_bm = -99999 self.dn_bm = -99999 self.up_cp = 99999 self.dn_cp = 99999 self.up_cm = -99999 self.dn_cm = -99999 self.contracts = 0 self.counter = 0 ##New #------------------------------------------------------------------------------- def schedule_functions(self): """Schedule required functions for the class.""" if DATA_SOURCE == 'QC': # Schedule event to check for time to roll future contracts # Rather than checking for data.SymbolChangedEvents in OnData (slow) # check once per day, 1 minute after the open roll_time = ( DT.datetime.combine( DT.datetime.today(), self.mkt_open_local_tz ) + DT.timedelta(minutes=1) ).time() self.algo.Schedule.On( self.algo.DateRules.EveryDay(self.symbol_object), self.algo.TimeRules.At(roll_time.hour, roll_time.minute), self.contract_roll_check ) else: # 'CUSTOM' data # Create a tradebar consolidator based on custom data c = self.algo.ResolveConsolidator(self.symbol, CUSTOM_DATA_PERIOD) # Event handler to be called on each new consolidated bar c.DataConsolidated += self.OnDataConsolidated # Link the consolidator with our contract and add it to the manager self.algo.SubscriptionManager.AddConsolidator(self.symbol, c) #------------------------------------------------------------------------------- def contract_roll_check(self): """ Check if the current contract needs to be rolled. This is only called if we are using 'QC' data. """ # Catch if the current contract has not be set yet (start) if not self.contract: # Update the current contract self.update_current_contract() return # Check for change in the current contract if self.contract != self.continuous_contract.Mapped: # Save old contract and new contract new_contract = self.continuous_contract.Mapped old_contract = self.contract # Get the qty of the old contract (old contract still referenced) qty = self.current_qty # Log message when desired self.algo.Log( f"{self.symbol} contract to trade changing from {old_contract}" f" to {new_contract}" ) # Update the current contract self.update_current_contract(new_contract) # Check for an open position for the old contract if qty != 0: # Log message when desired self.algo.Log(f"Closing {qty} of {old_contract}") # Place order to exit the old contract self.algo.MarketOrder(old_contract, -qty, tag='roll exit') # Check if we want to roll to new contract if ROLL_TO_NEW_CONTRACT: # Log message when desired self.algo.Log(f"Opening {qty} of {new_contract}") tag = 'roll entry' # Check if there is data available for the new contract if self.algo.Securities[new_contract].HasData: # Place order to enter the new contract self.algo.MarketOrder(new_contract, qty, tag=tag) else: # Otherwise add the desired order to the list of orders # to process asap once we have data self.algo.orders.append((new_contract, qty, tag)) #------------------------------------------------------------------------------- def update_current_contract(self, new_contract=None): """ Update the current contract to the new desired contract. This is only called if we are using 'QC' data. """ # Check if new_contract is not passed, use the mapped contract if not new_contract: if self.continuous_contract.Mapped: new_contract = self.continuous_contract.Mapped else: return # Remove the previous trade bar consolidator (if one) # Have not updated self.contract yet, so referencing old contract if self.consolidator: self.algo.SubscriptionManager.RemoveConsolidator( self.contract, self.consolidator ) # Check if need to get the contract specs if not self.have_contract_specs: self.get_contract_specs() self.have_contract_specs = True # We need to subscribe to the new contract's data! self.algo.AddFutureContract( new_contract, self.algo.resolution, extendedMarketHours=True, # defaults to False ) # Create the desried trade bar consolidator self.future_calendar_initialized = False if BAR_UNIT == 'min': if BAR == 'CUSTOM_TIMES': c = TradeBarConsolidator(self.CustomCalendar) else: c = TradeBarConsolidator(timedelta(minutes=BAR_INT)) elif BAR_UNIT == 'day': c = TradeBarConsolidator(self.DailyFutureCalendar) # Create an event handler to be called on each new consolidated bar c.DataConsolidated += self.OnDataConsolidated # Link the consolidator with our contract and add it to the manager self.algo.SubscriptionManager.AddConsolidator(new_contract, c) # Save the contract consolidator for future reference self.consolidator = c self.contract = new_contract #------------------------------------------------------------------------------- def get_contract_specs(self): """ Get the future's contract specs. This is only called if we are using 'QC' data. """ if self.continuous_contract.Mapped: # Get the SymbolProperties of the current mapped contract contract = self.continuous_contract.Mapped symbol_properties = self.algo.Securities[contract].SymbolProperties # Get and save the contract specs self.min_tick = symbol_properties.MinimumPriceVariation self.multiplier = symbol_properties.ContractMultiplier self.description = symbol_properties.Description self.tick_value = self.min_tick*self.multiplier self.point_value = self.tick_value/self.min_tick # Log message when desired self.algo.Log( f"get_contract_specs() for {self.symbol}, min tick=" f"{self.min_tick}, multiplier={self.multiplier}, description=" f"{self.description}, tick value={self.tick_value}, point value=" f"{self.point_value}" ) #------------------------------------------------------------------------------- def OnDataConsolidated(self, sender, bar): """Event handler for desired custom bars.""" # Skip if 'QC' data and the bar is not for our current contract if DATA_SOURCE == 'QC' and (bar.Symbol != self.contract): return # Ignore bar's with no volume -> can be invalid around holidays if bar.Volume == 0: return # Get the symbol of the new custom bar symbol = str(bar.Symbol) # Get previous atr value before updating it try: previous_atr = self.atr.Current.Value except: # Exception happens on the very first bar fed to the algo previous_atr = None # Update the 2 ATRs self.atr.Update(bar) self.atr_r.Update(bar) # Do not continue if an atr is not ready or not to backtest start date if not self.atr.IsReady or not self.atr_r.IsReady \ or self.algo.Time < START_DT: # Add bar to rolling window and return self.bars.Add(bar) return # Check for a previous bar to compare to if len(list(self.bars)) > 0: # Get last bar for reference # Most recent bar is at beginning of the rolling window, so use [0] last_bar = self.bars[0] # Get previous values before updating them # These are all of the TradeStation variables with [1] reference previous_mp = self.mp previous_reversal = self.reversal previous_p1_up_ready = self.p1_up_ready previous_p1_dn_ready = self.p1_dn_ready previous_up_ap = self.up_ap previous_dn_am = self.dn_am previous_up_swing = self.up_swing previous_dn_swing = self.dn_swing previous_up_cm = self.up_cm previous_up_bm = self.up_bm previous_dn_cp = self.dn_cp previous_dn_bp = self.up_bp previous_p1_up_entry = self.p1_up_entry previous_p1_dn_entry = self.p1_dn_entry previous_p1_up_f = self.p1_up_f previous_p1_dn_f = self.p1_dn_f previous_p1_up_e = self.p1_up_e previous_p1_dn_e = self.p1_dn_e previous_p1_up_noise_a = self.p1_up_noise_a previous_p1_up_noise_b = self.p1_up_noise_b previous_p1_up_noise_c = self.p1_up_noise_c previous_p1_dn_noise_a = self.p1_dn_noise_a previous_p1_dn_noise_b = self.p1_dn_noise_b previous_p1_dn_noise_c = self.p1_dn_noise_c # 1. Market Position # Exit # up_s_exit, up_f_exit, dn_s_exit, and dn_f_exit have not changed # yet, so not saving the "previous values" if previous_mp == 1 and last_bar.Low < self.up_f_exit: self.mp = 0 if PRINT_SIGNALS: self.algo.Log(f"{bar.Time} Previous MP=1, Previous Low < UpFExit, so " "now MP=0") if previous_mp == -1 and last_bar.High > self.dn_f_exit: self.mp = 0 if PRINT_SIGNALS: self.algo.Log(f"{bar.Time} Previous MP=-1, Previous High > DnFExit, so " "now MP=0") ##EL CODE NOT CONVERTED # if (RRRatio<>0) then begin # if (MP[1]=1 and High[1]>=UpTExit[1]) then MP=0; # if (MP[1]=-1 and Low[1]<=DnTExit[1]) then MP=0; # end; # Entry # up_signal and dn_signal have not changed yet, so not saving the # "previous values" if previous_mp < 1 and self.up_signal > 0: self.mp = 1 if PRINT_SIGNALS: self.algo.Log(f"{bar.Time} Previous MP<1, Previous UpSignal, so now " "MP=1") if previous_mp > -1 and self.dn_signal < 0: self.mp = -1 if PRINT_SIGNALS: self.algo.Log(f"{bar.Time} Previous MP>-1, Previous DnSignal, so now " "MP=-1") # Stop # up_s_exit and dn_s_exithave not changed yet, so not saving the # "previous values" if self.mp == 1 and bar.Low <= self.up_s_exit: self.mp = 0 if PRINT_SIGNALS: self.algo.Log(f"{bar.Time} current MP=1, previous Low < UpSExit, so now " "MP=0") if self.mp == -1 and bar.High >= self.dn_s_exit: self.mp = 0 if PRINT_SIGNALS: self.algo.Log(f"{bar.Time} current MP=-1, previous High < DnSExit, so now " "MP=0") # 2. Fractal Index if round(self.atr.Current.Value,2) != 0: atr = round(self.atr.Current.Value,2) else: atr = self.atr.Current.Value self.up_fix = 0 self.dn_fix = 0 ############# Up FX: check for a new higher high############################################################################# if bar.High > last_bar.High: self.up_ct = 1 # Loop through a list of historical bars historical_bars = list(self.bars) # NOTE: rolling window to list has bars in newest to oldest order # Loop through a list of historical bars for historical_bar in historical_bars: if (bar.High >= historical_bar.High) \ and (self.up_ct < self.lookback): # Increment up count self.up_ct += 1 else: break # Check if up count is only 1 if self.up_ct == 1: bars = [bar] else: # Get last up_ct bars-1 bars = historical_bars[:self.up_ct-1] # And add the current bar bars.append(bar) self.up_sup = 99999999 #loop for counter in range(0, self.up_ct, 1): if self.bars[counter].Low < self.up_sup: self.up_sup = self.bars[counter].Low # Get up support and resistance # self.up_res = max([bar.High for bar in bars]) self.up_res = bar.High # Update up fIX self.up_fix = (self.up_res-self.up_sup)/atr ############# Down FX: check for a new lower low############################################################################# if bar.Low < last_bar.Low: self.dn_ct = 1 # Loop through a list of historical bars historical_bars = list(self.bars) # NOTE: rolling window to list has bars in newest to oldest order # Loop through a list of historical bars for historical_bar in historical_bars: if (bar.Low <= historical_bar.Low) \ and (self.dn_ct < self.lookback): # Increment down count self.dn_ct += 1 else: break # Check if down count is only 1 if self.dn_ct == 1: bars = [bar] else: # Get last dn_ct bars-1 bars = historical_bars[:self.dn_ct] # And add the current bar bars.append(bar) self.dn_res = -99999999 #loop for counter in range(0, self.dn_ct, 1): if self.bars[counter].High > self.dn_res: self.dn_res = self.bars[counter].High # Get down support and resistance self.dn_sup = bar.Low # Update dn fix self.dn_fix = (self.dn_res-self.dn_sup)/atr # 3. Setup # Up P1 if self.dn_ct >= self.lookback: self.p1_up_ready = False else: if self.dn_fix >= self.algo.min_p1: if self.up_f <= self.dn_res: self.p1_up_ready = True self.p1_up_f = self.dn_res self.p1_up_e = (self.dn_res+self.dn_sup)*0.50 dist = self.dn_res-self.dn_sup self.p1_up_noise_a = dist * self.algo.level_a self.p1_up_noise_b = dist * self.algo.level_b self.p1_up_noise_c = dist * self.algo.level_c self.p1_up_entry = self.p1_up_e \ + self.p1_up_noise_a*0.50 # Down P1 if self.up_ct >= self.lookback: self.p1_dn_ready = False else: if self.up_fix >= self.algo.min_p1: if self.dn_f >= self.up_sup: self.p1_dn_ready = True self.p1_dn_f = self.up_sup self.p1_dn_e = (self.up_res+self.up_sup)*0.50 dist = self.up_res-self.up_sup self.p1_dn_noise_a = dist * self.algo.level_a self.p1_dn_noise_b = dist * self.algo.level_b self.p1_dn_noise_c = dist * self.algo.level_c self.p1_dn_entry = self.p1_dn_e \ - self.p1_dn_noise_a*0.50 # Up P2 if self.up_fix >= self.algo.min_p2: if previous_reversal == 0: self.reversal = 1 if previous_reversal == -1: self.reversal = 1 if self.mp < 1: self.p2_up_ready = True self.p2_up_e = bar.High - self.algo.min_p2*atr*0.50 self.p2_up_noise_a = \ self.algo.min_p2*atr*self.algo.level_a self.p2_up_noise_b = \ self.algo.min_p2*atr*self.algo.level_b self.p2_up_noise_c = \ self.algo.min_p2*atr*self.algo.level_c # Down P2 if self.dn_fix >= self.algo.min_p2: if previous_reversal == 0: self.reversal = -1 if previous_reversal == 1: self.reversal = -1 if self.mp > -1: self.p2_dn_ready = True self.p2_dn_e = bar.Low + self.algo.min_p2*atr*0.50 self.p2_dn_noise_a = \ self.algo.min_p2*atr*self.algo.level_a self.p2_dn_noise_b = \ self.algo.min_p2*atr*self.algo.level_b self.p2_dn_noise_c = \ self.algo.min_p2*atr*self.algo.level_c # 4. Entry self.up_signal = 0 self.dn_signal = 0 # Up P1 if previous_p1_up_ready and bar.High >= previous_p1_up_entry: self.up_swing = 1 self.up_signal = 1 self.up_f = previous_p1_up_f self.up_e = previous_p1_up_e self.up_noise_a = previous_p1_up_noise_a self.up_noise_b = previous_p1_up_noise_b self.up_noise_c = previous_p1_up_noise_c self.up_entry = previous_p1_up_entry self.up_ap = self.up_e + self.up_noise_a*0.50 self.up_bp = self.up_e + self.up_noise_b*0.50 self.up_cp = self.up_e + self.up_noise_c*0.50 self.up_am = self.up_e - self.up_noise_a*0.50 self.up_bm = self.up_e - self.up_noise_b*0.50 self.up_cm = self.up_e - self.up_noise_c*0.50 self.p1_up_ready = False self.p2_up_ready = False # Down P1 if previous_p1_dn_ready and bar.Low <= previous_p1_dn_entry: self.dn_swing = -1 self.dn_signal = -1 self.dn_f = previous_p1_dn_f self.dn_e = previous_p1_dn_e self.dn_noise_a = previous_p1_dn_noise_a self.dn_noise_b = previous_p1_dn_noise_b self.dn_noise_c = previous_p1_dn_noise_c self.dn_entry = previous_p1_dn_entry self.dn_ap = self.dn_e + self.dn_noise_a*0.50 self.dn_bp = self.dn_e + self.dn_noise_b*0.50 self.dn_cp = self.dn_e + self.dn_noise_c*0.50 self.dn_am = self.dn_e - self.dn_noise_a*0.50 self.dn_bm = self.dn_e - self.dn_noise_b*0.50 self.dn_cm = self.dn_e - self.dn_noise_c*0.50 self.p1_dn_ready = False self.p2_dn_ready = False # Up P2 if self.p2_up_ready: self.up_swing = 2 self.up_signal = 2 self.up_f = -99999 self.up_e = self.p2_up_e self.up_noise_a = self.p2_up_noise_a self.up_noise_b = self.p2_up_noise_b self.up_noise_c = self.p2_up_noise_c self.up_entry = bar.High self.up_ap = self.up_e + self.up_noise_a*0.50 self.up_bp = self.up_e + self.up_noise_b*0.50 self.up_cp = self.up_e + self.up_noise_c*0.50 self.up_am = self.up_e - self.up_noise_a*0.50 self.up_bm = self.up_e - self.up_noise_b*0.50 self.up_cm = self.up_e - self.up_noise_c*0.50 self.p1_up_ready = False self.p2_up_ready = False # Down P2 if self.p2_dn_ready: self.dn_swing = -2 self.dn_signal = -2 self.dn_f = 99999 self.dn_e = self.p2_dn_e self.dn_noise_a = self.p2_dn_noise_a self.dn_noise_b = self.p2_dn_noise_b self.dn_noise_c = self.p2_dn_noise_c self.dn_entry = bar.Low self.dn_ap = self.dn_e + self.dn_noise_a*0.50 self.dn_bp = self.dn_e + self.dn_noise_b*0.50 self.dn_cp = self.dn_e + self.dn_noise_c*0.50 self.dn_am = self.dn_e - self.dn_noise_a*0.50 self.dn_bm = self.dn_e - self.dn_noise_b*0.50 self.dn_cm = self.dn_e - self.dn_noise_c*0.50 self.p1_dn_ready = False self.p2_dn_ready = False # 5. Re-entry # Up if self.up_signal == 0 and self.mp < 1 and previous_mp < 1: if self.up_swing > 0 and bar.High >= previous_up_ap: self.up_signal = 3 # Down if self.dn_signal == 0 and self.mp > -1 and previous_mp > -1: if self.dn_swing < 0 and bar.Low <= previous_dn_am: self.dn_signal = -3 # 6. End # Up if self.up_signal == 0 and previous_up_swing > 0: if (self.dn_fix >= min(self.algo.min_p1, self.algo.min_p2)) \ and (bar.Low < min(previous_up_bm, previous_up_cm)): self.up_swing = 0 self.up_f = -99999 # Down if self.dn_signal == 0 and previous_dn_swing < 0: if (self.up_fix >= min(self.algo.min_p1, self.algo.min_p2)) \ and (bar.High > max(previous_dn_bp,previous_dn_cp)): self.dn_swing = 0 self.dn_f = 99999 # 7. Trailing # Up if self.up_swing > 0: self.up_ap = max(self.up_ap, bar.High) self.up_bp = max(self.up_bp, bar.High) self.up_cp = max(self.up_cp, bar.High) self.up_am = self.up_ap-self.up_noise_a self.up_bm = self.up_bp-self.up_noise_b self.up_cm = self.up_cp-self.up_noise_c # Down if self.dn_swing < 0: self.dn_am = min(self.dn_am, bar.Low) self.dn_bm = min(self.dn_bm, bar.Low) self.dn_cm = min(self.dn_cm, bar.Low) self.dn_ap = self.dn_am+self.dn_noise_a self.dn_bp = self.dn_bm+self.dn_noise_b self.dn_cp = self.dn_cm+self.dn_noise_c # 8. Exit # Up if self.up_swing > 0: if self.up_entry >= self.up_bm: self.up_f_exit = min(self.up_entry, self.up_bm) else: self.up_f_exit = max(self.up_entry, self.up_cm) # Down if self.dn_swing < 0: if self.dn_entry <= self.dn_bp: self.dn_f_exit = max(self.dn_entry, self.dn_bp) else: self.dn_f_exit = min(self.dn_entry, self.dn_cp) # 9. Stop Exit # Instead of delaying the stop exit like TradeStation logic does, # the stop loss order is placed immediately after an entry order is # filled # 10. Mixed signals if self.up_signal > 0 and self.dn_signal < 0: self.up_signal = 0 self.dn_signal = 0 # 11. Entry/Re-entry/Exit current_shares = self.current_qty entry = False if self.mp < 1 and self.up_signal >= 1: # Get the signal if self.up_signal == 1: signal = "L1" elif self.up_signal == 2: signal = "L2" elif self.up_signal == 3: signal = "L3" # Get desired position size self.GetPositionSize() # Get order qty desired_shares = self.contracts order_qty = desired_shares-current_shares # Log message when desired if PRINT_SIGNALS: self.algo.Log( f"{self.symbol} {signal} Entry Signal: " f"Have {current_shares} cxs, want {desired_shares}, " f"so order cxs = {order_qty}" ) # Cancel previous stop order (if one) self.CancelStopOrder() # Place market buy order if order qty not 0 if order_qty != 0: if DATA_SOURCE == 'QC': self.algo.MarketOrder(self.contract, order_qty) else: self.algo.MarketOrder(self.security, order_qty) entry = True elif self.mp > -1 and self.dn_signal <= -1: # Get the signal if self.dn_signal == -1: signal = "S1" elif self.dn_signal == -2: signal = "S2" elif self.dn_signal == -3: signal = "S3" # Get desired position size self.GetPositionSize() # Get order qty desired_shares = -self.contracts order_qty = desired_shares-current_shares # Log message when desired if PRINT_SIGNALS: self.algo.Log( f"{self.symbol} {signal} Entry Signal: " f"Have {current_shares} shares, want {desired_shares}, " f"so order shares = {order_qty}" ) # Cancel previous stop order (if one) self.CancelStopOrder() # Place market sell order if order qty not 0 if order_qty != 0: if DATA_SOURCE == 'QC': self.algo.MarketOrder(self.contract, order_qty) else: self.algo.MarketOrder(self.security, order_qty) entry = True # Exit if not entry: # Check for active long position if self.mp == 1 and current_shares > 0: # NOTE: stop loss order is placed immediately after an entry # order is filled # Check for exit signal if bar.Low < self.up_f_exit: # Log message when desired if PRINT_SIGNALS: self.algo.Log( f"{self.symbol} LXF Exit Signal: " f"Sell {current_shares} shares" ) self.ExitPosition(current_shares) # Check for active short position elif self.mp == -1 and current_shares < 0: # NOTE: stop loss order is placed immediately after an entry # order is filled # Check for exit signal if bar.High > self.dn_f_exit: # Log message when desired if PRINT_SIGNALS: self.algo.Log( f"{self.symbol} SXF Exit Signal: " f"Buy to cover {current_shares} shares" ) self.ExitPosition(current_shares) # Add bar to rolling window self.bars.Add(bar) #------------------------------------------------------------------------------- def ExitPosition(self, current_shares): """Immediately exit an active position.""" self.CancelStopOrder() # Place a market order to close position if DATA_SOURCE == 'QC': self.algo.MarketOrder(self.contract, -current_shares) else: self.algo.MarketOrder(self.security, -current_shares) # # Set mp to 0 # self.mp = 0 #------------------------------------------------------------------------------- def CancelStopOrder(self): """Cancel the open stop order.""" if self.stop_order: # Log message when desired if PRINT_ORDERS: self.algo.Log(f"Cancelling {self.symbol} stop order.") self.stop_order.Cancel() # Set back to None self.stop_order = None #------------------------------------------------------------------------------- def PlaceStopOrder(self, current_shares): """Place the stop loss order.""" # Get the previous completee bar's atr atr = self.atr_r.Current.Value # Check for long position if current_shares > 0: # Use up stop stop = self.up_s_exit = \ round(self.cost_basis-self.algo.stop_ix*atr,2) position = 'long' elif current_shares < 0: # Use down stop stop = self.dn_s_exit = \ round(self.cost_basis+self.algo.stop_ix*atr,2) position = 'short' else: # Don't continue if no position return # Place stop market order for -current shares at the desired stop price if DATA_SOURCE == 'QC': symbol = self.contract else: symbol = self.security self.stop_order = \ self.algo.StopMarketOrder(symbol, -current_shares, stop) # Log message when desired if PRINT_ORDERS: self.algo.Log( f"{self.symbol} {position} stop order for {-current_shares}" f" shares placed at {stop}" ) #------------------------------------------------------------------------------- def GetPositionSize(self): """Get the desired position size.""" # atr_r = self.atr_r.Current.Value # # risk = self.algo.risk_pct*self.algo.Portfolio.TotalPortfolioValue # risk = self.algo.stop_dollar # n_rough = risk/(self.algo.stop_ix_r*atr_r*self.multiplier) # nr = int(n_rough) # self.contracts = round(nr, 0) if round(self.atr_r.Current.Value,2) != 0: atr_r = round(self.atr_r.Current.Value,2) else: atr_r = self.atr_r.Current.Value sizing = 1500 self.contracts = math.trunc(round(sizing/(self.algo.stop_ix_r*atr_r*self.multiplier),2)) #------------------------------------------------------------------------------- def DailyFutureCalendar(self, dt): """ Set up daily consolidator calendar info for the US future market. This should return a start datetime object that is timezone unaware with a valid date/time for the desired securities' exchange's time zone. Useful Refs: datetime.replace() method: https://docs.python.org/3/library/datetime.html#datetime.datetime.replace """ # Need to handle case where algo initializes and this function is called # for the first time. if not self.future_calendar_initialized: # Since this doesn't matter, we'll pass dt as start and one day # as the timedelta until end_dt start_dt = dt end_dt = start_dt + DT.timedelta(1) self.future_calendar_initialized = True return CalendarInfo(start_dt, end_dt-start_dt) # Create a datetime.datetime object to represent the market open for the # **EXCHANGE** timezone start = dt.replace(hour=self.mkt_open.hour, minute=self.mkt_open.minute, second=0, microsecond=0) # Set end based on typical daily period end = start + DT.timedelta(minutes=self.period_minutes) # Catch when start is after the passed dt # QC now throws an error in this case if start > dt: # To handle the QC error, pass period for no data # Set the end to be the next desired start end = start # And set start to dt to avoid QC throwing error start = dt # This will result in the next dt being the desired start time # Return the start datetime and the consolidation period return CalendarInfo(start, end-start) #------------------------------------------------------------------------------- def CustomCalendar(self, dt): """ Set up custom consolidator calendar info. This should return a start datetime object that is timezone unaware with a valid date/time for the desired securities' exchange's time zone. Useful Refs: datetime.replace() method: https://docs.python.org/3/library/datetime.html#datetime.datetime.replace """ # Need to handle case where algo initializes and this function is called # for the first time. if not self.future_calendar_initialized: # Since this doesn't matter, we'll pass dt as start and one day # as the timedelta until end_dt start_dt = dt end_dt = start_dt + DT.timedelta(1) self.future_calendar_initialized = True return CalendarInfo(start_dt, end_dt-start_dt) # Set start and end to None start = None end = None # Loop through all BAR_TIMES for tup in BAR_TIMES: # Check for dt time before the end time if dt.time() < tup[1]: # Set the start time to be the bar's start time start = dt.replace(hour=tup[0].hour, minute=tup[0].minute) # Set the end time to be the bar's end time end = dt.replace(hour=tup[1].hour, minute=tup[1].minute) # Break from loop break # Make sure that start and end are set if not start or not end: # Get the first bar start and end times start_time = BAR_TIMES[0][0] end_time = BAR_TIMES[0][1] # Set the start time to be the bar's start time start = dt.replace(hour=start_time.hour, minute=start_time.minute) # Set the end time to be the bar's end time end = dt.replace(hour=end_time.hour, minute=end_time.minute) # Make sure that the end is after the start # If not, add a day to the end if end < start: end = end + DT.timedelta(1) # Catch when start is after the passed dt # QC now throws an error in this case if start > dt: # To handle the QC error, pass period for no data # Set the end to be the next desired start end = start # And set start to dt to avoid QC throwing error start = dt # This will result in the next dt being the desired start time # return start datetime and period return CalendarInfo(start, end-start) #------------------------------------------------------------------------------- @property def current_qty(self): """Return the current contract quantity held in the portfolio.""" # Check if the current contract is set if self.contract: if DATA_SOURCE == 'QC': return int(self.algo.Portfolio[self.contract].Quantity) else: return int(self.algo.Portfolio[self.security].Quantity) # Otherwise return 0 return 0