Overall Statistics |
Total Orders 5300 Average Win 0.23% Average Loss -0.21% Compounding Annual Return 4.720% Drawdown 22.700% Expectancy 0.056 Start Equity 640000 End Equity 870697.30 Net Profit 36.046% Sharpe Ratio 0.123 Sortino Ratio 0.119 Probabilistic Sharpe Ratio 1.612% Loss Rate 50% Win Rate 50% Profit-Loss Ratio 1.09 Alpha -0.01 Beta 0.696 Annual Standard Deviation 0.131 Annual Variance 0.017 Information Ratio -0.182 Tracking Error 0.114 Treynor Ratio 0.023 Total Fees $3639.99 Estimated Strategy Capacity $3400000.00 Lowest Capacity Asset DESP WO1JW2NPW6ZP Portfolio Turnover 4.33% |
#region imports from AlgorithmImports import * #endregion # https://www.quantconnect.com/docs/algorithm-reference/machine-learning # https://quantpedia.com/strategies/post-earnings-announcement-drift-combined-with-strong-momentum/ # # The investment universe consists of all stocks from NYSE, AMEX and NASDAQ with a price greater than $5. Each quarter, all stocks are # sorted into deciles based on their 12 months past performance. The investor then uses only stocks from the top momentum decile and # goes long on each stock 5 days before the earnings announcement and closes the long position at the close of the announcement day. # Subsequently, at the close of the announcement day, he/she goes short and he/she closes his short position on the 5th day after the # earnings announcement. # # QC Implementation: # - Investment universe consist of stocks, which have earnings date. from pandas.tseries.offsets import BDay from collections import OrderedDict class PostEarningsAnnouncement(QCAlgorithm): def Initialize(self): # self.SetStartDate(2008, 1, 1) # Earnings days data begin at 2008. self.SetStartDate(2017, 12, 30) # self.SetStartDate(2021, 1, 1) # self.SetStartDate(2022, 4, 29) # self.SetStartDate(2021, 11, 24) # self.SetStartDate(2022, 4, 1) # self.SetEndDate(2021, 11, 30) self.SetEndDate(2024, 8, 31) # self.SetCash(100000) # self.SetCash(1000000) self.SetCash(640000) # Cash in paper trading account self.coarse_count = 1000 # Earning data parsing self.earnings = {} # days_before_earnings = [] self.days_before_earnings = [] self.earnings_universe = [] # Stored earnings symbols # earnings_set = set() self.earnings_set = set() # earnings_data = self.Download('data.quantpedia.com/backtesting_data/economic/earning_dates.csv') earnings_data = self.Download("https://www.dropbox.com/s/tacy2gwti1mu879/quantpedia_marketwatch_earnings_cal_3.8.csv?dl=1") # For Testing Only # earnings_data = self.Download("https://www.dropbox.com/s/vu8b27y9nqz1eut/quantpedia_marketwatch_earnings_cal_3.7.csv?dl=1") # earnings_data = self.Download("https://www.dropbox.com/s/lx1sl7rxw9hw257/quantpedia_marketwatch_earnings_cal_3.8.csv?dl=1") # earnings_data = self.Download("https://www.dropbox.com/s/3631k5cc31z8pg5/quantpedia_marketwatch_earnings_cal_3.9.csv?dl=1") # earnings_data = self.Download("https://www.dropbox.com/s/lrbgzuazci2vjgn/quantpedia_marketwatch_earnings_cal_3.9.csv?dl=1") # earnings_data = self.Download("https://www.dropbox.com/s/lrmbn6z1s40qv04/quantpedia_marketwatch_earnings_cal_4.0.csv?dl=1") # earnings_data_slices = earnings_data.split('\r\n') earnings_data_slices = earnings_data.split('\n') # This splittig works with the MarketWatch Scrape # self.Debug(str(earnings_data_slices)) # self.Log(str(earnings_data_slices)) for earnings_date in earnings_data_slices: data = earnings_date.split(';') date = data[0] # self.Log(str(date)) if date == '' : continue date = datetime.strptime(date, "%Y-%m-%d").date() self.earnings[date] = [] # days_before_earnings.append(date - BDay(5)) self.days_before_earnings.append(date - BDay(5)) index = 1 while index < len(data): ticker = data[index] self.earnings[date].append(ticker) # earnings_set.add(ticker) self.earnings_set.add(ticker) index += 1 # for ticker in earnings_set: for ticker in self.earnings_set: symbol = self.AddEquity(ticker, Resolution.Daily).Symbol # symbol = self.AddEquity(ticker, Resolution.Minute).Symbol # For Minute Resolution self.earnings_universe.append(symbol) # self.Debug(str(self.Time) + " Count of Earnings Universe 1 " + str(len(self.earnings_universe))) self.Log(str(self.Time) + " Count of Earnings Universe 1 " + str(len(self.earnings_universe))) self.symbol = self.AddEquity('SPY', Resolution.Daily).Symbol # self.symbol = self.AddEquity('SPY', Resolution.Minute).Symbol # For Minute Resolution # To Chart a second Benchmark # self.MKT = self.AddEquity("SPY", Resolution.Daily).Symbol self.mkt = [] # Use to Plot Benchmark # self.AddEquity("SPY", Resolution.Daily) # self.benchmarkTicker = 'SPY' self.AddEquity("FTLS", Resolution.Daily) self.benchmarkTicker = 'FTLS' self.SetBenchmark(self.benchmarkTicker) # self.initBenchmarkPrice = None # Use if Plotting Long or Short Position of Benchmark self.initBenchmarkPrice = 0 # self.benchmarkExposure = -0.5 # self.benchmarkExposure = -1 self.benchmarkExposure = 1 self.period = 12 * 21 # Need twelve months performance # Daily price data self.data = {} self.selected_symbols = [] # 50 equally weighted brackets for traded symbols. self.managed_symbols_size = 50 self.managed_symbols = [] # This is critical to enable Dropbox to feed dates out of order from the scrape # Another possible solution would be to use self.Schedule.On(self.DateRules.EveryDay() , self.TimeRules.AfterMarketOpen(self.symbol), self.DaysBefore) # days_before_earnings = sorted(days_before_earnings) self.days_before_earnings = sorted(self.days_before_earnings) # self.selection_flag = False self.selection_flag = True # To trade immediately with no delay # self.first_run_flag = True # For Minute Resolution self.UniverseSettings.Resolution = Resolution.Daily # self.UniverseSettings.Resolution = Resolution.Minute # For Minute Resolution self.AddUniverse(self.CoarseSelectionFunction) # Events on earnings days, before and after earning days. self.Schedule.On(self.DateRules.EveryDay(self.symbol), self.TimeRules.AfterMarketOpen(self.symbol), self.DaysBefore) # self.Schedule.On(self.DateRules.On(days_before_earnings), self.TimeRules.AfterMarketOpen(self.symbol), self.DaysBefore) # self.Schedule.On(self.DateRules.On(self.days_before_earnings), self.TimeRules.AfterMarketOpen(self.symbol), self.DaysBefore) self.Schedule.On(self.DateRules.MonthStart(self.symbol), self.TimeRules.AfterMarketOpen(self.symbol), self.Selection) # Schedule re-download of dropbox file # self.Schedule.On(self.DateRules.EveryDay(self.symbol), self.TimeRules.AfterMarketOpen(self.symbol, 90), self.EveryDayAfterMarketOpen) # self.Schedule.On(self.DateRules.EveryDay(self.symbol), self.TimeRules.AfterMarketOpen(self.symbol), self.EveryDayAfterMarketOpen) # self.Schedule.On(self.DateRules.EveryDay(self.symbol), self.TimeRules.BeforeMarketClose(self.symbol), self.EveryDayAfterMarketOpen) # self.Schedule.On(self.DateRules.EveryDay(self.symbol), self.TimeRules.BeforeMarketClose(self.symbol, 30), self.EveryDayAfterMarketOpen) # self.Schedule.On(self.DateRules.On(2021, 11, 29), self.TimeRules.BeforeMarketClose(self.symbol), self.EveryDayAfterMarketOpen) # self.Schedule.On(self.DateRules.Every(DayOfWeek.Monday), self.TimeRules.AfterMarketOpen(self.symbol, 0), self.EveryDayAfterMarketOpen) # self.Schedule.On(self.DateRules.WeekStart(self.symbol), self.TimeRules.BeforeMarketClose(self.symbol), self.EveryDayAfterMarketOpen) self.settings.daily_precise_end_time = False def OnSecuritiesChanged(self, changes): for security in changes.AddedSecurities: # security.SetFeeModel(CustomFeeModel(self)) # Only for Backtesting security.SetFeeModel(CustomFeeModel()) # Only for Backtesting security.SetLeverage(5) def CoarseSelectionFunction(self, coarse): # Update stock prices for stock in coarse: symbol = stock.Symbol # Store monthly price. if symbol in self.data: self.data[symbol].update(stock.AdjustedPrice) if not self.selection_flag: return Universe.Unchanged self.selection_flag = False # For Scalable Version sortedByDollarVolume = sorted(coarse, key=lambda x: x.DollarVolume, reverse=True) # filtered = [ x.Symbol for x in sortedByDollarVolume if x.Price > 10 and x.DollarVolume > 10000000 ] selected = [x.Symbol for x in sortedByDollarVolume if x.HasFundamentalData and x.Market == 'usa' and x.Price > 5 and x.DollarVolume > 10000000 # My addition to help with scale and x.Symbol in self.earnings_universe] selected = selected[:2000] # For Original Version # selected = [x.Symbol for x in coarse if x.HasFundamentalData # and x.Market == 'usa' and x.Price > 5 # # and x.DollarVolume > 10000000 # My addition to help with scale # and x.Symbol in self.earnings_universe] # self.Log(str(self.Time) + " Mid Level Selected Symbols Length " + str(len(selected))) # Warming up prices for rolling window. for symbol in selected: if symbol in self.data: continue self.data[symbol] = SymbolData(self.period) history = self.History(symbol, self.period, Resolution.Daily) if history.empty: self.Log(f"Not enough data for {symbol} yet") continue closes = history.loc[symbol].close # for time, close in closes.iteritems(): for time, close in closes.items(): self.data[symbol].update(close) momentum = {} # Calculate momentum for each stock in self.earnings_universe for symbol in selected: if symbol in self.data and self.data[symbol].is_ready(): momentum[symbol] = self.data[symbol].performance() if len(momentum) < 10: self.selected_symbols = [] return Universe.Unchanged # Momentum sorting decile = int(len(momentum) / 10) sorted_by_mom = [x[0] for x in sorted(momentum.items(), key=lambda item: item[1])] # The investor uses only stocks from the top momentum decile self.selected_symbols = sorted_by_mom[-decile:] # self.Debug(str(self.Time) + " Selected Symbols Length " + str(len(self.selected_symbols))) # self.Log(str(self.Time) + " Selected Symbols Length " + str(len(self.selected_symbols))) return self.selected_symbols def DaysBefore(self): # Every day check if 5 days from now is any earnings day. earnings_date = (self.Time + BDay(5)).date() date_to_liquidate = (earnings_date + BDay(6)).date() if earnings_date in self.earnings: # Iterate over momentum selected symbols. for symbol in self.selected_symbols: ticker = symbol.Value # Is there any symbol which has earnings in 5 days. if ticker in self.earnings[earnings_date]: # self.Debug(str(self.Time) + "Ticker: " + str(ticker)) # If there's a place for new trade. if len(self.managed_symbols) < self.managed_symbols_size: # if len(self.managed_symbols) < self.managed_symbols_size or self.first_run_flag: # For Minute Resolution self.SetHoldings(symbol, 1 / self.managed_symbols_size) # NOTE: Must offset date to switch position by one day due to midnight execution of OnData function. # Alternatively, there's is a possibility to switch to BeforeMarketClose function. self.managed_symbols.append(ManagedSymbol(symbol, earnings_date + BDay(1), date_to_liquidate)) # self.Debug(str(self.Time) + " Managed Symbols Length " + str(len(self.managed_symbols))) # self.Log(str(self.Time) + " Managed Symbols Length " + str(len(self.managed_symbols))) # if len(self.managed_symbols) > 0: # self.Log(str(self.Time) + " Managed Symbols: " + str([symbol.symbol.Value for symbol in self.managed_symbols])) # self.first_run_flag = False # For Minute Resolution def OnData(self, data): self.UpdateBenchmarkValue() self.Plot('Strategy Equity', self.benchmarkTicker, self.benchmarkValue) # To Chart a Second Benchmark mkt_price = self.Securities[self.symbol].Price self.mkt.append(mkt_price) mkt_perf = 640000 * self.mkt[-1] / self.mkt[0] self.Plot('Strategy Equity', self.symbol, mkt_perf) # To Plot all Line Charts in separate Chart # Portfolio Value self.Plot("Equity", 'Earnings', self.Portfolio.TotalPortfolioValue) # Benchmarks self.Plot('Equity', self.benchmarkTicker, self.benchmarkValue) self.Plot('Equity', self.symbol, mkt_perf) # Switch positions on earnings days. today = self.Time.date() # current_time = self.Time # For Minute Resolution symbols_to_delete = [] for symbol in self.managed_symbols: # Switch from long to short. if symbol.date_to_switch == today: self.SetHoldings(symbol.symbol, -1 / self.managed_symbols_size) # if (current_time.hour == 9 and current_time.minute == 45): # For Minute Resolution # if (current_time.hour == 9 and current_time.minute == 32): # For Minute Resolution # self.SetHoldings(symbol.symbol, -1 / self.managed_symbols_size) # For Minute Resolution # Liquidate elif symbol.date_to_liquidate <= today: self.Liquidate(symbol.symbol) symbols_to_delete.append(symbol) # if (current_time.hour == 9 and current_time.minute == 31): # For Minute Resolution # Allocate 0% of portfolio value to Symbol and do not liquidate other holdings before starting # If you want to liquidate other holdings before starting, set the bool to True # self.SetHoldings(symbol.symbol, 0, False, "Liquidated") # symbols_to_delete.append(symbol) # self.SetHoldings(symbol.symbol, 0, False, "Liquidated") # Remove symbols from management. for symbol in symbols_to_delete: self.managed_symbols.remove(symbol) # self.Log(str(type(self.managed_symbols))) # DRAFT of code to clear out symbols if price data is missing on above attempt all_symbols = [ x.Value for x in self.Portfolio.Keys ] for ticker in all_symbols: # if self.Portfolio[ticker].Invested and ticker not in [symbol.symbol.Value for symbol in self.managed_symbols]: # self.SetHoldings(ticker, 0, False, "Liquidated") if self.Portfolio[ticker].Invested and ticker in [symbol.symbol.Value for symbol in self.managed_symbols]: # self.Log(str(ticker)) # self.Log(str(self.Time) + " Managed Symbols: " + str(ticker)) continue def UpdateBenchmarkValue(self): ''' Simulate buy and hold the Benchmark ''' # if self.initBenchmarkPrice is None: if self.initBenchmarkPrice == 0: # Use if Plotting Short Position of Benchmark self.initBenchmarkCash = self.Portfolio.Cash self.initBenchmarkPrice = self.Benchmark.Evaluate(self.Time) self.benchmarkValue = self.initBenchmarkCash else: currentBenchmarkPrice = self.Benchmark.Evaluate(self.Time) # self.benchmarkValue = (currentBenchmarkPrice / self.initBenchmarkPrice) * self.initBenchmarkCash # Use if Plotting Short Position of Benchmark lastReturn = ((currentBenchmarkPrice / self.initBenchmarkPrice) - 1) * self.benchmarkExposure self.benchmarkValue = (1 + lastReturn) * self.initBenchmarkCash def Selection(self): # Quarter selection if self.Time.month in [3, 6, 9, 12]: # This works only if MonthStart is used self.selection_flag = True def EveryDayAfterMarketOpen(self): #Nice to have to help prevent variable from growing too large. #If we trim the Dropbox file, we will naturally have a smaller amount of data stored in this variable. self.earnings = {} #Nice to have to help prevent variable from growing too large. #If we trim the Dropbox file, we will naturally have a smaller amount of data stored in this variable. # self.Debug(str(self.Time) + " Count of Earnings Universe " + str(len(self.earnings_universe))) # self.Log(str(self.Time) + " Count of Earnings Universe " + str(len(self.earnings_universe))) self.earnings_universe = [] # Stored earnings symbols # earnings_data = None # Daily price data # self.data = {} # days_before_earnings = [] #Nice to have to help prevent variable from growing too large. #If we trim the Dropbox file, we will naturally have a smaller amount of data stored in this variable. self.earnings_set = set() # Ultimately, this is used to subscribe to data through AddEquity function # earnings_set = set() earnings_data = self.Download("https://www.dropbox.com/s/tacy2gwti1mu879/quantpedia_marketwatch_earnings_cal_3.8.csv?dl=1") # For Testing Only # earnings_data = self.Download("https://www.dropbox.com/s/lx1sl7rxw9hw257/quantpedia_marketwatch_earnings_cal_3.8.csv?dl=1") # earnings_data = self.Download("https://www.dropbox.com/s/3631k5cc31z8pg5/quantpedia_marketwatch_earnings_cal_3.9.csv?dl=1") # earnings_data = self.Download("https://www.dropbox.com/s/lrbgzuazci2vjgn/quantpedia_marketwatch_earnings_cal_3.9.csv?dl=1") # earnings_data = self.Download("https://www.dropbox.com/s/lrmbn6z1s40qv04/quantpedia_marketwatch_earnings_cal_4.0.csv?dl=1") # earnings_data_slices = earnings_data.split('\r\n') earnings_data_slices = earnings_data.split('\n') # This splittig works with the MarketWatch Scrape # self.Debug(str(earnings_data_slices)) # self.Log(str(earnings_data_slices)) # for earnings_date in earnings_data_slices[-10:]: for earnings_date in earnings_data_slices: data = earnings_date.split(';') date = data[0] # self.Log(str(date)) if date == '' : continue date = datetime.strptime(date, "%Y-%m-%d").date() self.earnings[date] = [] # days_before_earnings.append(date - BDay(5)) self.days_before_earnings.append(date - BDay(5)) index = 1 while index < len(data): ticker = data[index] self.earnings[date].append(ticker) # earnings_set.add(ticker) self.earnings_set.add(ticker) index += 1 # self.Debug(str(self.Time) + "Last Earnings Date: " + str(list(self.earnings.items())[-1])) # self.Debug(str(self.Time) + "Last Ticker: " + str(list(self.earnings_set)[-1])) # self.Debug(str(self.Time) + "Last Ticker: " + str(list(earnings_set)[-1])) # for ticker in earnings_set: for ticker in self.earnings_set: symbol = self.AddEquity(ticker, Resolution.Daily).Symbol # symbol = self.AddEquity(ticker, Resolution.Minute).Symbol # For Minute Resolution self.earnings_universe.append(symbol) # This is critical to enable Dropbox to feed dates out of order from the scrape # Another possible solution would be to use self.Schedule.On(self.DateRules.EveryDay() , self.TimeRules.AfterMarketOpen(self.symbol), self.DaysBefore) # days_before_earnings = sorted(days_before_earnings) self.days_before_earnings = sorted(self.days_before_earnings) self.selection_flag = True # VERY IMPORTANT to trade immediately with updated earnings_universe # self.Debug(str(self.Time) + "Purchasing AAPL: " + str(slice["SPY"].Price)) # self.Debug(str(self.Time) + "Reached End of Dropbox Update: " + str(self.days_before_earnings[-1:])) class SymbolData(): def __init__(self, period): self.Closes = RollingWindow[float](period) def update(self, close): self.Closes.Add(close) def is_ready(self): return self.Closes.IsReady def performance(self): closes = [x for x in self.Closes] return (closes[0] - closes[-1]) / closes[-1] class ManagedSymbol(): def __init__(self, symbol, date_to_switch, date_to_liquidate): self.symbol = symbol self.date_to_switch = date_to_switch self.date_to_liquidate = date_to_liquidate # Custom fee model # class CustomFeeModel(FeeModel): class CustomFeeModel: def GetOrderFee(self, parameters): fee = parameters.Security.Price * parameters.Order.AbsoluteQuantity * 0.00005 return OrderFee(CashAmount(fee, "USD"))