Overall Statistics |
Total Trades 89 Average Win 0.08% Average Loss -0.06% Compounding Annual Return -0.548% Drawdown 1.100% Expectancy -0.219 Net Profit -0.582% Sharpe Ratio -0.695 Probabilistic Sharpe Ratio 4.556% Loss Rate 64% Win Rate 36% Profit-Loss Ratio 1.16 Alpha -0.006 Beta 0.002 Annual Standard Deviation 0.008 Annual Variance 0 Information Ratio -1.9 Tracking Error 0.146 Treynor Ratio -2.46 Total Fees $0.00 Estimated Strategy Capacity $0 Lowest Capacity Asset AAPL.MyCustomData 2S |
# Standard library imports from AlgorithmImports import * import datetime as DT from dateutil.parser import parse # Import from files from notes_and_inputs import * ################################################################################ class MyCustomData(PythonData): """ Custom Data Class REFs: https://www.quantconnect.com/forum/discussion/4079/python-best-practise-for-using-consolidator-on-custom-data/p1 """ def GetSource(self, config, date, isLiveMode): # Get file specific to the asset symbol symbol = config.Symbol.Value # Must use dictionary.get() method because this will be called on initialization # without a valid symbol # symbol must also always be all caps, because QC converts it to all caps! file = CSV_FILES.get(symbol, '') return SubscriptionDataSource( file, SubscriptionTransportMedium.RemoteFile) def Reader(self, config, line, date, isLiveMode): # New object asset = MyCustomData() asset.Symbol = config.Symbol # try: # Example File Format: # <Date> <Time> <Open> <High> <Low> <Close> <Volume> # 2/1/2018 10:30:00 13.59 13.78 13.41 13.67 19817603 # If first character is not a digit, return if not (line.strip() and line[0].isdigit()): return None data = line.split(',') # Get the date date = parse(data[0]) time = parse(data[1]).time() # Combine date/time to get DT.datetime object asset.Time = DT.datetime.combine(date, time) # Set the value used for filling positions / Using the open price value = float(data[2]) asset.Value = value # Get the prices asset["Open"] = float(data[2]) asset["High"] = float(data[3]) asset["Low"] = float(data[4]) asset["Close"] = float(data[5]) asset["Volume"] = float(data[6]) # except ValueError: # # Do nothing # return None return asset
# Standard library imports from AlgorithmImports import * import datetime as DT # Import from files from notes_and_inputs import * from symbol_data import * from custom_data import * ################################################################################ class EquitiesStrategyAlgorithm(QCAlgorithm): def Initialize(self): """Initialize algorithm.""" # Set backtest details self.set_backtest_details() # Initialize algo parameters self.initialize_parameters() # Add all desired instruments to the algo self.add_instruments() # Warm up the algorithm # self.SetWarmUp(WARMUP_DAYS) #------------------------------------------------------------------------------- def set_backtest_details(self): """Set the backtest details.""" # Set the start and end date (if applicable) self.SetStartDate( BACKTEST_START_DT.year, BACKTEST_START_DT.month, BACKTEST_START_DT.day) if END_DATE: self.SetEndDate(END_DT.year, END_DT.month, END_DT.day) # Set the starting cash amount self.SetCash(CASH) # Set the timezone for algo logs self.SetTimeZone(TIMEZONE) # Setup trading framework # Transaction and submit/execution rules will use IB models # Cannot use the built-in models with the custom data if DATA_SOURCE == 'QC': self.SetBrokerageModel(BrokerageName.InteractiveBrokersBrokerage, AccountType.Margin) # Will override the desired reality methods with this function # Configure all algorithm securities self.SetSecurityInitializer(self.custom_security_initializer) #------------------------------------------------------------------------------- def initialize_parameters(self): """Read all algo input parameters and set up all others required.""" # Read user inputs self.min_p1 = MIN_P1 self.min_p2 = MIN_P2 self.level_a = LEVEL_A self.level_b = LEVEL_B self.level_c = LEVEL_C self.stop_ix = STOP_IX self.length = LENGTH self.stop_ix_r = STOP_IX_R self.stop_dollar = STOP_DOLLAR self.atr_length_r = ATR_LENGTH_R self.risk_pct = RISK_PCT self.lookback = LOOKBACK #------------------------------------------------------------------------------- def add_instruments(self): """Add desired instrument data to the algo.""" # Create a dictionary to hold all symbol data objects self.symbol_data = {} # Loop through all equities for equity in EQUITIES: # Check if 'QC' data is used if DATA_SOURCE == 'QC': # Add price data to the algo and save QC symbol object equity_symbol = self.AddEquity(equity, Resolution.Minute).Symbol # Otherwise 'CUSTOM' data is used else: equity_symbol = self.AddData(MyCustomData, equity).Symbol # Create symbol data object for the equity self.symbol_data[equity] = SymbolData(self, equity, equity_symbol) #------------------------------------------------------------------------------- def custom_security_initializer(self, security): """ Define models to be used for securities as they are added to the algorithm's universe. """ # Define the data normalization mode security.SetDataNormalizationMode(DataNormalizationMode.Adjusted) #------------------------------------------------------------------------------- def OnOrderEvent(self, orderEvent): """Built-in event handler for orders.""" # Skip if not filled if orderEvent.Status != OrderStatus.Filled: return # Get the order details order = self.Transactions.GetOrderById(orderEvent.OrderId) order_qty = order.Quantity symbol = str(order.Symbol) # Remove the .MyCustomData if custom data is used if DATA_SOURCE == 'CUSTOM': symbol = symbol.split(".")[0] # Get current qty qty = self.Portfolio[symbol].Quantity # Get the current order's average fill price avg_fill = round(orderEvent.FillPrice, 4) # Get the symbol data object for the symbol symbol_data = self.symbol_data[symbol] # Get active stop order id if symbol_data.stop_order: stop_order_id = symbol_data.stop_order.OrderId else: stop_order_id = 0 # Check for filled stop order if orderEvent.OrderId == stop_order_id: # Filled stop loss order - set to None symbol_data.stop_order = None # Log message when desired if PRINT_ORDERS: self.Log(f"{symbol} filled stop order for {order_qty} shares @ " f"{avg_fill}") # Set mp to 0 # symbol_data.mp = 0 return # Check for a market exit order elif qty == 0: # Filled exit order # symbol_data.stop_order = None # Log message when desired if PRINT_ORDERS: self.Log(f"{symbol} filled exit order for {order_qty} shares @ " f"{avg_fill}") # Set mp to 0 # symbol_data.mp = 0 return # Otherwise qty not 0, so entry order else: # Filled entry order # Get and save the cost basis for the position cost_basis = avg_fill symbol_data.cost_basis = cost_basis # Log message when desired if PRINT_ORDERS: if qty > 0: self.Log(f"{symbol} filled long entry order for " f"{order_qty} shares @ {avg_fill}, new qty = {qty}") elif qty < 0: self.Log(f"{symbol} filled short entry order for " f"{order_qty} shares @ {avg_fill}, new qty = {qty}") # Immediately place new stop market order symbol_data.PlaceStopOrder(qty)
""" Matt Custom Strategy Version 1.0.2 Platform: QuantConnect By: Aaron Eller For: Matt Blonc www.excelintrading.com aaron@excelintrading.com Revision Notes: 1.0.0 (09/08/2021) - Initial 1.0.1 (09/10/2021) - Added 'CUSTOM_TIMES' BAR option. 1.0.2 (09/17/2021) - Added DATA_SOURCE input and custom data logic. - Modified symbol_data.set_indicators() to handle custom data consolidators. - Modified set_backtest_details() and OnOrderEvent() to handle custom data symbols. 1.0.3 (09/22/2021) - Updated PlaceStopOrder() to use atr_r. - Updated up/down count and up/down support/resistance logic. References: -Creating a Custom Indicator https://www.quantconnect.com/forum/discussion/3383/custom-indicator-in-python-algorithm/p1 """ ################################################################################ import datetime as DT # USER INPUTS # Backtest details START_DATE = "09-01-2020" # MM-DD-YYYY format END_DATE = None #"12-31-2021" # MM-DD-YYYY format (or None for to current date) CASH = 400000 # starting portfolio cash amount TIMEZONE = "US/Eastern" # e.g. "US/Eastern", "US/Central", "US/Pacific" # Set the equities to trade # Be sure to use ALL CAPS EQUITIES = ['AAPL'] #, 'T', 'AA'] #------------------------------------------------------------------------------- # BAR TO TRACK FOR EACH EQUITY # Tell algorithm to use QC data or custom data DATA_SOURCE = 'CUSTOM' # must be 'QC' or 'CUSTOM' #------------------------------------------------------------------------------- # The following inputs are used for 'QC' data: # Use the following format: '5 min', '1 hr', '1 day' # Or to use CUSTOM_TIMES below, set BAR = 'CUSTOM_TIMES' BAR = 'CUSTOM_TIMES' # Define the custom intraday minutely bars to track # List specific bar start/stop times # Only used when BAR above is set to be 'CUSTOM_TIMES' CUSTOM_TIMES = [ '0930-1030', '1030-1130', '1130-1230', '1230-1330', '1330-1430', '1430-1530', '1530-1600', ] #------------------------------------------------------------------------------- # The following inputs are used for 'CUSTOM' data # Create a dictionary to hold all links to csv custom data CSV_FILES = {} # Add link for all EQUITIES listed above # Note that the link must automatically download the data as a csv file # If using dropbox remember to add the &dl=1 to trigger a download CSV_FILES['AAPL'] = 'https://www.dropbox.com/s/sx0nhylvd1olbo4/AAPL%2060%20Minutes.csv?dl=1' # Set period for custom data CUSTOM_DATA_PERIOD = DT.timedelta(minutes=60) # 'days', 'minutes', 'hours' are all possible arguments for DT.timedelta #------------------------------------------------------------------------------- # INDICATOR INPUTS MIN_P1 = 3 MIN_P2 = 8 LEVEL_A = .7 LEVEL_B = .8 LEVEL_C = 1.7 STOP_IX = 6 LENGTH = 40 LOOKBACK = 34 # default setting for MaxBarsBack in TradeStation = 50 # POSITION SIZING INPUTS STOP_IX_R = 2 ATR_LENGTH_R = 160 STOP_DOLLAR = 1100 RISK_PCT = 0.0025 #------------------------------------------------------------------------------- # Turn on/off logs PRINT_SIGNALS = True # print entry/exit signals PRINT_ORDERS = True # print order details ################################################################################ ############################ END OF ALL USER INPUTS ############################ ################################################################################ ################################################################################ # VALIDATE USER INPUTS - DO NOT CHANGE BELOW!!! # Verify start date try: START_DT = DT.datetime.strptime(START_DATE, '%m-%d-%Y') except: raise ValueError("Invalid START_DATE format ({}). Must be in MM-DD-YYYY " "format.".format(START_DATE)) # Verify end date try: if END_DATE: END_DT = DT.datetime.strptime(END_DATE, '%m-%d-%Y') except: raise ValueError("Invalid END_DATE format ({}). Must be in MM-DD-YYYY " "format or set to None to run to date.".format(END_DATE)) #------------------------------------------------------------------------------- # Verify BAR # First check if 'CUSTOM_TIMES' is used if BAR == 'CUSTOM_TIMES': # Create empty list of times to fill BAR_TIMES = [] # Loop through CUSTOM_TIMES for time_str in CUSTOM_TIMES: try: # Get bar start hour and minutes start_str = time_str.split('-')[0] start_hr = int(start_str[:2]) start_min = int(start_str[-2:]) # Get bar end hour and minutes end_str = time_str.split('-')[1] end_hr = int(end_str[:2]) end_min = int(end_str[-2:]) # Create a datetime.time object for start and end times time_start = DT.time( hour=start_hr, minute=start_min, second=0, microsecond=0) time_end = DT.time( hour=end_hr, minute=end_min, second=0, microsecond=0) # Add (start time, end time) tuple to BAR_DTS list BAR_TIMES.append((time_start, time_end)) # Set BAR_UNIT to be minutes BAR_UNIT = 'min' except: raise ValueError("Invalid CUSTOM_TIMES entry: {}".format(time_str)) else: try: # Get the bar integer and bar unit BAR_INT = int(BAR.split(" ")[0]) BAR_UNIT = BAR.split(" ")[1].lower() # Verify bar unit is valid if BAR_UNIT not in ['min', 'hr', 'day']: raise ValueError( "Invalid BAR ({}). Unit must be 'min', 'hr', or 'day'.".format( BAR)) # Check for hourly bar unit elif BAR_UNIT == 'hr': # Convert hr bar int to be minutes BAR_INT *= 60 # Change the BAR_UNIT to now be 'min' BAR_UNIT = 'min' # Only allow '1 day' bars elif BAR_UNIT == 'day' and BAR_INT != 1: raise ValueError( "Invalid BAR ({}). Multiple 'day' bars are not allowed.".format( BAR)) except: raise ValueError( "Invalid BAR: {}. Use '5 min', '1 hr', or '1 day' format".format( BAR)) # Verify data source if DATA_SOURCE == 'QC': pass elif DATA_SOURCE == 'CUSTOM': # Make sure all equities have a data source for equity in EQUITIES: if equity not in CSV_FILES: raise ValueError("Using 'CUSTOM' data and no CSV_FILES link defined" " for {}.".format(equity)) else: raise ValueError("Invalid DATA_SOURCE ({}). Must be 'QC' or " "'CUSTOM'.".format(DATA_SOURCE)) #------------------------------------------------------------------------------- # Calculate the minimum number of bars required to warm up the indicators WARMUP_DAYS = int(1*max(ATR_LENGTH_R, LENGTH)) # Set the start date based on the desired days to warm up the algo # approximately 252 market days per 365 calendar days # CALENDAR_DAYS = int(WARMUP_DAYS*(365/252)) CALENDAR_DAYS = 1 BACKTEST_START_DT = START_DT - DT.timedelta(days=CALENDAR_DAYS)
# Standard library imports import datetime as DT import math import numpy as np import pytz import random # Import from files from notes_and_inputs import * ################################################################################ class SymbolData(object): """Class to store data for a specific security symbol.""" def __init__(self, algo, symbol, security_symbol): """Initialize SymbolData object.""" self.algo = algo self.symbol = symbol self.security = security_symbol # Set the indicators for the security self.set_indicators() #------------------------------------------------------------------------------- def set_indicators(self): """Set up the security's indicators to be updated on the desired BAR.""" # Set up the ATRs for the equity # self.atr = AverageTrueRange( # self.algo.length, MovingAverageType.Exponential) self.atr = AverageTrueRange( self.algo.length, MovingAverageType.Simple) # self.atr_r = AverageTrueRange( # self.algo.atr_length_r, MovingAverageType.Exponential) self.atr_r = AverageTrueRange( self.algo.atr_length_r, MovingAverageType.Simple) # REF: https://lean-api-docs.netlify.app/MovingAverageTypeExtensions_8cs_source.html # Check if QC data is used if DATA_SOURCE == 'QC': # Set up the desired bar to update the indicators # Create a trade bar consolidator if BAR_UNIT == 'min': if BAR == 'CUSTOM_TIMES': c = TradeBarConsolidator(self.CustomCalendar) else: c = TradeBarConsolidator(timedelta(minutes=BAR_INT)) elif BAR_UNIT == 'day': c = TradeBarConsolidator(self.DailyUSEquityCalendar) # Event handler to be called on each new consolidated bar c.DataConsolidated += self.OnDataConsolidated # Link the consolidator with our contract and add it to the manager self.algo.SubscriptionManager.AddConsolidator(self.security, c) # Otherwise custom data else: # Create a tradebar consolidator based on custom data c = self.algo.ResolveConsolidator(self.symbol, CUSTOM_DATA_PERIOD) # Event handler to be called on each new consolidated bar c.DataConsolidated += self.OnDataConsolidated # Link the consolidator with our contract and add it to the manager self.algo.SubscriptionManager.AddConsolidator(self.symbol, c) # Keep a rolling window of the last length bars self.lookback = self.algo.lookback self.bars = RollingWindow[TradeBar](self.lookback) # Keep a link to the active stop loss order self.stop_order = None # Save the positions active cost basis # Will use this actual amount for the stop loss instead of using the # assumed fill price at the bar's open self.cost_basis = None # Initialize other variables required for the algo self.mp = 0 self.up_s_exit = -99999 self.dn_s_exit = 99999 self.up_f_exit = -99999 self.dn_f_exit = 99999 self.up_signal = 0 self.dn_signal = 0 self.up_fix = 0 self.dn_fix = 0 self.up_ct = 1 self.dn_ct = 1 self.up_res = 99999 self.dn_res = 99999 self.up_sup = -99999 self.dn_sup = -99999 self.p1_up_ready = False self.p1_dn_ready = False self.up_f = -99999 self.dn_f = 99999 self.p1_up_f = 99999 self.p1_dn_f = -99999 self.p1_up_e = 0 self.p1_dn_e = 0 self.p1_up_noise_a = 0 self.p1_dn_noise_a = 0 self.p1_up_noise_b = 0 self.p1_dn_noise_b = 0 self.p1_up_noise_c = 0 self.p1_dn_noise_c = 0 self.p1_up_entry = 99999 self.p1_dn_entry = -99999 self.reversal = 0 self.p2_up_ready = False self.p2_dn_ready = False self.p2_up_e = 0 self.p2_dn_e = 0 self.p2_up_noise_a = 0 self.p2_dn_noise_a = 0 self.p2_up_noise_b = 0 self.p2_dn_noise_b = 0 self.p2_up_noise_c = 0 self.p2_dn_noise_c = 0 self.up_swing = 0 self.dn_swing = 0 self.up_e = 0 self.dn_e = 0 self.up_noise_a = 0 self.dn_noise_a = 0 self.up_noise_b = 0 self.dn_noise_b = 0 self.up_noise_c = 0 self.dn_noise_c = 0 self.up_entry = 99999 self.dn_entry = -99999 self.up_ap = 99999 self.dn_ap = 99999 self.up_am = -99999 self.dn_am = -99999 self.up_bp = 99999 self.dn_bp = 99999 self.up_bm = -99999 self.dn_bm = -99999 self.up_cp = 99999 self.dn_cp = 99999 self.up_cm = -99999 self.dn_cm = -99999 self.contracts = 0 #------------------------------------------------------------------------------- def OnDataConsolidated(self, sender, bar): """Event handler for desired custom bars.""" # Get the symbol of the new custom bar symbol = str(bar.Symbol) # Get previous atr value before updating it try: previous_atr = self.atr.Current.Value except: # Exception happens on the very first bar fed to the algo previous_atr = None # Update the 2 ATRs self.atr.Update(bar) self.atr_r.Update(bar) # Do not continue if an atr is not ready or not to backtest start date if not self.atr.IsReady or not self.atr_r.IsReady \ or self.algo.Time < START_DT: # Add bar to rolling window and return self.bars.Add(bar) return # Check for a previous bar to compare to if len(list(self.bars)) > 0: # Get last bar for reference # Most recent bar is at beginning of the rolling window, so use [0] last_bar = self.bars[0] # Get previous values before updating them # These are all of the TradeStation variables with [1] reference previous_mp = self.mp previous_reversal = self.reversal previous_p1_up_ready = self.p1_up_ready previous_p1_dn_ready = self.p1_dn_ready previous_up_ap = self.up_ap previous_dn_am = self.dn_am previous_up_swing = self.up_swing previous_dn_swing = self.dn_swing previous_up_cm = self.up_cm previous_dn_cp = self.dn_cp previous_p1_up_entry = self.p1_up_entry previous_p1_dn_entry = self.p1_dn_entry previous_p1_up_f = self.p1_up_f previous_p1_dn_f = self.p1_dn_f previous_p1_up_e = self.p1_up_e previous_p1_dn_e = self.p1_dn_e previous_p1_up_noise_a = self.p1_up_noise_a previous_p1_up_noise_b = self.p1_up_noise_b previous_p1_up_noise_c = self.p1_up_noise_c previous_p1_dn_noise_a = self.p1_dn_noise_a previous_p1_dn_noise_b = self.p1_dn_noise_b previous_p1_dn_noise_c = self.p1_dn_noise_c # 1. Market Position # Exit # up_s_exit, up_f_exit, dn_s_exit, and dn_f_exit have not changed # yet, so not saving the "previous values" if previous_mp == 1 and bar.Low <= self.up_s_exit: self.mp = 0 # Log message when desired if PRINT_SIGNALS: self.algo.Log("Previous MP=1, Low <= UpSExit, so now MP=0") if previous_mp == 1 and last_bar.Low < self.up_f_exit: self.mp = 0 if PRINT_SIGNALS: self.algo.Log("Previous MP=1, Previous Low < UpFExit, so " "now MP=0") if previous_mp == -1 and bar.High >= self.dn_s_exit: self.mp = 0 # Log message when desired if PRINT_SIGNALS: self.algo.Log("Previous MP=-1, High >= DnSExit, so now MP=0") if previous_mp == -1 and last_bar.High > self.dn_f_exit: self.mp = 0 if PRINT_SIGNALS: self.algo.Log("Previous MP=-1, Previous High > DnFExit, so " "now MP=0") # Entry # up_signal and dn_signal have not changed yet, so not saving the # "previous values" if previous_mp < 1 and self.up_signal > 0: self.mp = 1 if PRINT_SIGNALS: self.algo.Log("Previous MP<1, Previous UpSignal, so now " "MP=1") if previous_mp > -1 and self.dn_signal < 0: self.mp = -1 if PRINT_SIGNALS: self.algo.Log("Previous MP>-1, Previous DnSignal, so now " "MP=-1") # 2. Index atr = self.atr.Current.Value self.up_fix = 0 self.dn_fix = 0 # Up: check for new higher high if bar.High > last_bar.High: self.up_ct = 1 # Loop through a list of historical bars historical_bars = list(self.bars) # NOTE: rolling window to list has bars in newest to oldest order # Loop through a list of historical bars for historical_bar in historical_bars: if (bar.High >= historical_bar.High) \ and (self.up_ct < self.lookback): # Increment up count self.up_ct += 1 else: break # Check if up count is only 1 if self.up_ct == 1: bars = [bar] else: # Get last up_ct bars-1 bars = historical_bars[:self.up_ct-1] # And add the current bar bars.append(bar) # Get up support and resistance # self.up_res = max([bar.High for bar in bars]) self.up_res = bar.High self.up_sup = min([bar.Low for bar in bars]) # Update up fix self.up_fix = (self.up_res-self.up_sup)/atr # Down: check for a new lower low if bar.Low < last_bar.Low: self.dn_ct = 1 # Loop through a list of historical bars historical_bars = list(self.bars) # NOTE: rolling window to list has bars in newest to oldest order # Loop through a list of historical bars for historical_bar in historical_bars: if (bar.Low <= historical_bar.Low) \ and (self.dn_ct < self.lookback): # Increment down count self.dn_ct += 1 else: break # Check if down count is only 1 if self.dn_ct == 1: bars = [bar] else: # Get last dn_ct bars-1 bars = historical_bars[:self.dn_ct] # And add the current bar bars.append(bar) # Get down support and resistance self.dn_res = max([bar.High for bar in bars]) # self.dn_sup = min([bar.Low for bar in bars]) self.dn_sup = bar.Low # Update dn fix self.dn_fix = (self.dn_res-self.dn_sup)/atr # 3. Setup # Up P1 if self.dn_ct >= self.lookback: self.p1_up_ready = False else: if self.dn_fix >= self.algo.min_p1: if self.up_f <= self.dn_res: self.p1_up_ready = True self.p1_up_f = self.dn_res self.p1_up_e = (self.dn_res+self.dn_sup)*0.50 dist = self.dn_res-self.dn_sup self.p1_up_noise_a = dist * self.algo.level_a self.p1_up_noise_b = dist * self.algo.level_b self.p1_up_noise_c = dist * self.algo.level_c self.p1_up_entry = self.p1_up_e \ + self.p1_up_noise_a*0.50 # Down P1 if self.up_ct >= self.lookback: self.p1_dn_ready = False else: if self.up_fix >= self.algo.min_p1: if self.dn_f >= self.up_sup: self.p1_dn_ready = True self.p1_dn_f = self.up_sup self.p1_dn_e = (self.up_res+self.up_sup)*0.50 dist = self.up_res-self.up_sup self.p1_dn_noise_a = dist * self.algo.level_a self.p1_dn_noise_b = dist * self.algo.level_b self.p1_dn_noise_c = dist * self.algo.level_c self.p1_dn_entry = self.p1_dn_e \ - self.p1_dn_noise_a*0.50 # Up P2 if self.up_fix >= self.algo.min_p2: if previous_reversal == 0: self.reversal = 1 if previous_reversal == -1: self.reversal = 1 if self.mp < 1: self.p2_up_ready = True self.p2_up_e = bar.High - self.algo.min_p2*atr*0.50 self.p2_up_noise_a = \ self.algo.min_p2*atr*self.algo.level_a self.p2_up_noise_b = \ self.algo.min_p2*atr*self.algo.level_b self.p2_up_noise_c = \ self.algo.min_p2*atr*self.algo.level_c # Down P2 if self.dn_fix >= self.algo.min_p2: if previous_reversal == 0: self.reversal = -1 if previous_reversal == 1: self.reversal = -1 if self.mp > -1: self.p2_dn_ready = True self.p2_dn_e = bar.Low + self.algo.min_p2*atr*0.50 self.p2_dn_noise_a = \ self.algo.min_p2*atr*self.algo.level_a self.p2_dn_noise_b = \ self.algo.min_p2*atr*self.algo.level_b self.p2_dn_noise_c = \ self.algo.min_p2*atr*self.algo.level_c # 4. Entry self.up_signal = 0 self.dn_signal = 0 # Up P1 if previous_p1_up_ready and bar.High >= previous_p1_up_entry: self.up_swing = 1 self.up_signal = 1 self.up_f = previous_p1_up_f self.up_e = previous_p1_up_e self.up_noise_a = previous_p1_up_noise_a self.up_noise_b = previous_p1_up_noise_b self.up_noise_c = previous_p1_up_noise_c self.up_entry = previous_p1_up_entry self.up_ap = self.up_e + self.up_noise_a*0.50 self.up_bp = self.up_e + self.up_noise_b*0.50 self.up_cp = self.up_e + self.up_noise_c*0.50 self.up_am = self.up_e - self.up_noise_a*0.50 self.up_bm = self.up_e - self.up_noise_b*0.50 self.up_cm = self.up_e - self.up_noise_c*0.50 self.p1_up_ready = False self.p2_up_ready = False # Down P1 if previous_p1_dn_ready and bar.Low <= previous_p1_dn_entry: self.dn_swing = -1 self.dn_signal = -1 self.dn_f = previous_p1_dn_f self.dn_e = previous_p1_dn_e self.dn_noise_a = previous_p1_dn_noise_a self.dn_noise_b = previous_p1_dn_noise_b self.dn_noise_c = previous_p1_dn_noise_c self.dn_entry = previous_p1_dn_entry self.dn_ap = self.dn_e + self.dn_noise_a*0.50 self.dn_bp = self.dn_e + self.dn_noise_b*0.50 self.dn_cp = self.dn_e + self.dn_noise_c*0.50 self.dn_am = self.dn_e - self.dn_noise_a*0.50 self.dn_bm = self.dn_e - self.dn_noise_b*0.50 self.dn_cm = self.dn_e - self.dn_noise_c*0.50 self.p1_dn_ready = False self.p2_dn_ready = False # Up P2 if self.p2_up_ready: self.up_swing = 2 self.up_signal = 2 self.up_f = -99999 self.up_e = self.p2_up_e self.up_noise_a = self.p2_up_noise_a self.up_noise_b = self.p2_up_noise_b self.up_noise_c = self.p2_up_noise_c self.up_entry = bar.High self.up_ap = self.up_e + self.up_noise_a*0.50 self.up_bp = self.up_e + self.up_noise_b*0.50 self.up_cp = self.up_e + self.up_noise_c*0.50 self.up_am = self.up_e - self.up_noise_a*0.50 self.up_bm = self.up_e - self.up_noise_b*0.50 self.up_cm = self.up_e - self.up_noise_c*0.50 self.p1_up_ready = False self.p2_up_ready = False # Down P2 if self.p2_dn_ready: self.dn_swing = -2 self.dn_signal = -2 self.dn_f = 99999 self.dn_e = self.p2_dn_e self.dn_noise_a = self.p2_dn_noise_a self.dn_noise_b = self.p2_dn_noise_b self.dn_noise_c = self.p2_dn_noise_c self.dn_entry = bar.Low self.dn_ap = self.dn_e + self.dn_noise_a*0.50 self.dn_bp = self.dn_e + self.dn_noise_b*0.50 self.dn_cp = self.dn_e + self.dn_noise_c*0.50 self.dn_am = self.dn_e - self.dn_noise_a*0.50 self.dn_bm = self.dn_e - self.dn_noise_b*0.50 self.dn_cm = self.dn_e - self.dn_noise_c*0.50 self.p1_dn_ready = False self.p2_dn_ready = False # 5. Re-entry # Up if self.up_signal == 0 and self.mp < 1 and previous_mp < 1: if self.up_swing > 0 and bar.High >= previous_up_ap: self.up_signal = 3 # Down if self.dn_signal == 0 and self.mp > -1 and previous_mp > -1: if self.dn_swing < 0 and bar.Low <= previous_dn_am: self.dn_signal = -3 # 6. End # Up if self.up_signal == 0 and previous_up_swing > 0: if (self.dn_fix >= min(self.algo.min_p1, self.algo.min_p2)) \ and (bar.Low < previous_up_cm): self.up_swing = 0 self.up_f = -99999 # Down if self.dn_signal == 0 and previous_dn_swing < 0: if (self.up_fix >= min(self.algo.min_p1, self.algo.min_p2)) \ and (bar.High > previous_dn_cp): self.dn_swing = 0 self.dn_f = 99999 # 7. Trailing # Up if self.up_swing > 0: self.up_ap = max(self.up_ap, bar.High) self.up_bp = max(self.up_bp, bar.High) self.up_cp = max(self.up_cp, bar.High) self.up_am = self.up_ap-self.up_noise_a self.up_bm = self.up_bp-self.up_noise_b self.up_cm = self.up_cp-self.up_noise_c # Down if self.dn_swing < 0: self.dn_am = min(self.dn_am, bar.Low) self.dn_bm = min(self.dn_bm, bar.Low) self.dn_cm = min(self.dn_cm, bar.Low) self.dn_ap = self.dn_am+self.dn_noise_a self.dn_bp = self.dn_bm+self.dn_noise_b self.dn_cp = self.dn_cm+self.dn_noise_c # 8. Exit # Up if self.up_swing > 0: if self.up_entry >= self.up_bm: self.up_f_exit = min(self.up_entry, self.up_bm) else: self.up_f_exit = max(self.up_entry, self.up_cm) # Down if self.dn_swing < 0: if self.dn_entry <= self.dn_bp: self.dn_f_exit = max(self.dn_entry, self.dn_bp) else: self.dn_f_exit = min(self.dn_entry, self.dn_cp) # 9. Stop Exit # Instead of delaying the stop exit like TradeStation logic does, # the stop loss order is placed immediately after an entry order is # filled # 10. Mixed signals if self.up_signal > 0 and self.dn_signal < 0: self.up_signal = 0 self.dn_signal = 0 # 11. Entry/Re-entry/Exit current_shares = int(self.algo.Portfolio[self.security].Quantity) if self.mp < 1 and self.up_signal >= 1: # Get the signal if self.up_signal == 1: signal = "L1" elif self.up_signal == 2: signal = "L2" elif self.up_signal == 3: signal = "L3" # Get desired position size self.GetPositionSize() # Get order qty desired_shares = self.contracts order_qty = desired_shares-current_shares # Log message when desired if PRINT_SIGNALS: self.algo.Log( f"{self.symbol} {signal} Entry Signal: " f"Have {current_shares} shares, want {desired_shares}, " f"so order shares = {order_qty}" ) # Cancel previous stop order (if one) self.CancelStopOrder() # Place market buy order self.algo.MarketOrder(self.security, order_qty) elif self.mp > -1 and self.dn_signal <= -1: # Get the signal if self.dn_signal == -1: signal = "S1" elif self.dn_signal == -2: signal = "S2" elif self.dn_signal == -3: signal = "S3" # Get desired position size self.GetPositionSize() # Get order qty desired_shares = -self.contracts order_qty = desired_shares-current_shares # Log message when desired if PRINT_SIGNALS: self.algo.Log( f"{self.symbol} {signal} Entry Signal: " f"Have {current_shares} shares, want {desired_shares}, " f"so order shares = {order_qty}" ) # Cancel previous stop order (if one) self.CancelStopOrder() # Place market sell order self.algo.MarketOrder(self.security, order_qty) # Exit # Check for active long position if self.mp == 1 and current_shares > 0: # NOTE: stop loss order is placed immediately after an entry # order is filled # Check for exit signal if bar.Low < self.up_f_exit: # Log message when desired if PRINT_SIGNALS: self.algo.Log( f"{self.symbol} LXF Exit Signal: " f"Sell {current_shares} shares" ) self.ExitPosition(current_shares) # Check for active short position elif self.mp == -1 and current_shares < 0: # NOTE: stop loss order is placed immediately after an entry # order is filled # Check for exit signal if bar.High > self.dn_f_exit: # Log message when desired if PRINT_SIGNALS: self.algo.Log( f"{self.symbol} SXF Exit Signal: " f"Buy to cover {current_shares} shares" ) self.ExitPosition(current_shares) # Add bar to rolling window self.bars.Add(bar) #------------------------------------------------------------------------------- def ExitPosition(self, current_shares): """Immediately exit an active position.""" self.CancelStopOrder() # Place a market order to close position self.algo.MarketOrder(self.security, -current_shares) # # Set mp to 0 # self.mp = 0 #------------------------------------------------------------------------------- def CancelStopOrder(self): """Cancel the open stop order.""" if self.stop_order: # Log message when desired if PRINT_ORDERS: self.algo.Log(f"Cancelling {self.symbol} stop order.") self.stop_order.Cancel() # Set back to None self.stop_order = None #------------------------------------------------------------------------------- def PlaceStopOrder(self, current_shares): """Place the stop loss order.""" # Get the previous completee bar's atr atr = self.atr_r.Current.Value # Check for long position if current_shares > 0: # Use up stop stop = self.up_s_exit = \ round(self.cost_basis-self.algo.stop_ix*atr,2) position = 'long' elif current_shares < 0: # Use down stop stop = self.dn_s_exit = \ round(self.cost_basis+self.algo.stop_ix*atr,2) position = 'short' else: # Don't continue if no position return # Place stop market order for -current shares at the desired stop price self.stop_order = \ self.algo.StopMarketOrder(self.security, -current_shares, stop) # Log message when desired if PRINT_ORDERS: self.algo.Log( f"{self.symbol} {position} stop order for {-current_shares}" f" shares placed at {stop}" ) #------------------------------------------------------------------------------- def GetPositionSize(self): """Get the desired position size.""" atr_r = self.atr_r.Current.Value # risk = self.algo.risk_pct*self.algo.Portfolio.TotalPortfolioValue risk = self.algo.stop_dollar # NOTE: big point value is 1 for equities n_rough = risk/(self.algo.stop_ix_r*atr_r) nr = int(n_rough) # self.contracts = round(nr, 0) self.contracts = 100 #------------------------------------------------------------------------------- def DailyUSEquityCalendar(self, dt): """ Set up daily consolidator calendar info for the US equity market. This should return a start datetime object that is timezone unaware with a valid date/time for the desired securities' exchange's time zone. Useful Refs: datetime.replace() method: https://docs.python.org/3/library/datetime.html#datetime.datetime.replace """ # Need to handle case where algo initializes and this function is called # for the first time. When that occurs, the current datetime when you # requested a backtest is passed to this function as dt. This is the # only time that dt has a timezone attached to it. if dt.tzinfo: # Since this doesn't matter, we'll pass dt as start and one day # as the timedelta until end_dt start_dt = dt end_dt = start_dt + timedelta(1) return CalendarInfo(start_dt, end_dt-start_dt) # Set start time to be 930am ET start = dt.replace(hour=9, minute=30, second=0, microsecond=0) # Set end time to be 16:00 ET end = dt.replace(hour=16, minute=0, second=0, microsecond=0) # return start datetime and period return CalendarInfo(start, end-start) #------------------------------------------------------------------------------- def CustomCalendar(self, dt): """ Set up custom consolidator calendar info. This should return a start datetime object that is timezone unaware with a valid date/time for the desired securities' exchange's time zone. Useful Refs: datetime.replace() method: https://docs.python.org/3/library/datetime.html#datetime.datetime.replace """ # Need to handle case where algo initializes and this function is called # for the first time. When that occurs, the current datetime when you # requested a backtest is passed to this function as dt. This is the # only time that dt has a timezone attached to it. if dt.tzinfo: # Since this doesn't matter, we'll pass dt as start and one day # as the timedelta until end_dt start_dt = dt end_dt = start_dt + timedelta(1) return CalendarInfo(start_dt, end_dt-start_dt) # Set start and end to None start = None end = None # Loop through all BAR_TIMES for tup in BAR_TIMES: # Check for dt time before the start time if dt.time() <= tup[0]: # Set the start time to be the bar's start time start = dt.replace(hour=tup[0].hour, minute=tup[0].minute) # Set the end time end = dt.replace(hour=tup[1].hour, minute=tup[1].minute) # Break from loop break # Make sure that start and end are set if not start or not end: # Get the first bar start and end times start_time = BAR_TIMES[0][0] end_time = BAR_TIMES[0][1] # Set start to the next day's first start time start = dt + timedelta(1) start = start.replace( hour=start_time.hour, minute=start_time.minute, second=0, microsecond=0) # Set the end time end = start.replace( hour=end_time.hour, minute=end_time.minute, second=0, microsecond=0) # return start datetime and period return CalendarInfo(start, end-start)