Overall Statistics
Total Orders
5300
Average Win
0.23%
Average Loss
-0.21%
Compounding Annual Return
4.720%
Drawdown
22.700%
Expectancy
0.056
Start Equity
640000
End Equity
870697.30
Net Profit
36.046%
Sharpe Ratio
0.123
Sortino Ratio
0.119
Probabilistic Sharpe Ratio
1.612%
Loss Rate
50%
Win Rate
50%
Profit-Loss Ratio
1.09
Alpha
-0.01
Beta
0.696
Annual Standard Deviation
0.131
Annual Variance
0.017
Information Ratio
-0.182
Tracking Error
0.114
Treynor Ratio
0.023
Total Fees
$3639.99
Estimated Strategy Capacity
$3400000.00
Lowest Capacity Asset
DESP WO1JW2NPW6ZP
Portfolio Turnover
4.33%
#region imports
from AlgorithmImports import *
#endregion
# https://www.quantconnect.com/docs/algorithm-reference/machine-learning

# https://quantpedia.com/strategies/post-earnings-announcement-drift-combined-with-strong-momentum/
#
# The investment universe consists of all stocks from NYSE, AMEX and NASDAQ with a price greater than $5. Each quarter, all stocks are
# sorted into deciles based on their 12 months past performance. The investor then uses only stocks from the top momentum decile and 
# goes long on each stock 5 days before the earnings announcement and closes the long position at the close of the announcement day. 
# Subsequently, at the close of the announcement day, he/she goes short and he/she closes his short position on the 5th day after the
# earnings announcement.
#
# QC Implementation:
#   - Investment universe consist of stocks, which have earnings date.

from pandas.tseries.offsets import BDay
from collections import OrderedDict

class PostEarningsAnnouncement(QCAlgorithm):

    def Initialize(self):
        # self.SetStartDate(2008, 1, 1)   # Earnings days data begin at 2008.
        self.SetStartDate(2017, 12, 30)
        # self.SetStartDate(2021, 1, 1)
        # self.SetStartDate(2022, 4, 29)
        # self.SetStartDate(2021, 11, 24)
        # self.SetStartDate(2022, 4, 1)
        # self.SetEndDate(2021, 11, 30)
        self.SetEndDate(2024, 8, 31)

        # self.SetCash(100000)
        # self.SetCash(1000000)
        self.SetCash(640000) # Cash in paper trading account

        self.coarse_count = 1000
        
        # Earning data parsing
        self.earnings = {}
        # days_before_earnings = []
        self.days_before_earnings = []
        self.earnings_universe = [] # Stored earnings symbols
        
        # earnings_set = set()
        self.earnings_set = set()
        # earnings_data = self.Download('data.quantpedia.com/backtesting_data/economic/earning_dates.csv')
        earnings_data = self.Download("https://www.dropbox.com/s/tacy2gwti1mu879/quantpedia_marketwatch_earnings_cal_3.8.csv?dl=1") # For Testing Only
        # earnings_data = self.Download("https://www.dropbox.com/s/vu8b27y9nqz1eut/quantpedia_marketwatch_earnings_cal_3.7.csv?dl=1")
        # earnings_data = self.Download("https://www.dropbox.com/s/lx1sl7rxw9hw257/quantpedia_marketwatch_earnings_cal_3.8.csv?dl=1")
        # earnings_data = self.Download("https://www.dropbox.com/s/3631k5cc31z8pg5/quantpedia_marketwatch_earnings_cal_3.9.csv?dl=1")
        # earnings_data = self.Download("https://www.dropbox.com/s/lrbgzuazci2vjgn/quantpedia_marketwatch_earnings_cal_3.9.csv?dl=1")
        # earnings_data = self.Download("https://www.dropbox.com/s/lrmbn6z1s40qv04/quantpedia_marketwatch_earnings_cal_4.0.csv?dl=1")
        
        # earnings_data_slices = earnings_data.split('\r\n')
        earnings_data_slices = earnings_data.split('\n') # This splittig works with the MarketWatch Scrape
        # self.Debug(str(earnings_data_slices))
        # self.Log(str(earnings_data_slices))
        for earnings_date in earnings_data_slices:
            data = earnings_date.split(';')
            date = data[0]
            # self.Log(str(date))
            if date == '' : continue
            
            date = datetime.strptime(date, "%Y-%m-%d").date()
            self.earnings[date] = []
            # days_before_earnings.append(date - BDay(5))
            self.days_before_earnings.append(date - BDay(5))
            
            index = 1
            while index < len(data):
                ticker = data[index]
                self.earnings[date].append(ticker)
                # earnings_set.add(ticker)
                self.earnings_set.add(ticker)
                index += 1
            
        # for ticker in earnings_set:
        for ticker in self.earnings_set:
            symbol = self.AddEquity(ticker, Resolution.Daily).Symbol
            # symbol = self.AddEquity(ticker, Resolution.Minute).Symbol # For Minute Resolution
            self.earnings_universe.append(symbol)
        
        # self.Debug(str(self.Time) + " Count of Earnings Universe 1 " + str(len(self.earnings_universe)))
        self.Log(str(self.Time) + " Count of Earnings Universe 1 " + str(len(self.earnings_universe)))
        
        self.symbol = self.AddEquity('SPY', Resolution.Daily).Symbol
        # self.symbol = self.AddEquity('SPY', Resolution.Minute).Symbol # For Minute Resolution
        # To Chart a second Benchmark
        # self.MKT = self.AddEquity("SPY", Resolution.Daily).Symbol
        self.mkt = []
        
        # Use to Plot Benchmark
        # self.AddEquity("SPY", Resolution.Daily)
        # self.benchmarkTicker = 'SPY'
        self.AddEquity("FTLS", Resolution.Daily)
        self.benchmarkTicker = 'FTLS'
        self.SetBenchmark(self.benchmarkTicker)
        # self.initBenchmarkPrice = None
        # Use if Plotting Long or Short Position of Benchmark
        self.initBenchmarkPrice = 0
        # self.benchmarkExposure = -0.5
        # self.benchmarkExposure = -1
        self.benchmarkExposure = 1
        
        self.period = 12 * 21 # Need twelve months performance

        # Daily price data
        self.data = {}
        
        self.selected_symbols = []
        
        # 50 equally weighted brackets for traded symbols.
        self.managed_symbols_size = 50
        self.managed_symbols = []
        
        # This is critical to enable Dropbox to feed dates out of order from the scrape
        # Another possible solution would be to use self.Schedule.On(self.DateRules.EveryDay() , self.TimeRules.AfterMarketOpen(self.symbol), self.DaysBefore)
        # days_before_earnings = sorted(days_before_earnings)
        self.days_before_earnings = sorted(self.days_before_earnings)

        # self.selection_flag = False
        self.selection_flag = True # To trade immediately with no delay
        # self.first_run_flag = True # For Minute Resolution
        self.UniverseSettings.Resolution = Resolution.Daily
        # self.UniverseSettings.Resolution = Resolution.Minute # For Minute Resolution
        self.AddUniverse(self.CoarseSelectionFunction)
        
        # Events on earnings days, before and after earning days.
        self.Schedule.On(self.DateRules.EveryDay(self.symbol), self.TimeRules.AfterMarketOpen(self.symbol), self.DaysBefore)
        # self.Schedule.On(self.DateRules.On(days_before_earnings), self.TimeRules.AfterMarketOpen(self.symbol), self.DaysBefore)
        # self.Schedule.On(self.DateRules.On(self.days_before_earnings), self.TimeRules.AfterMarketOpen(self.symbol), self.DaysBefore)
        self.Schedule.On(self.DateRules.MonthStart(self.symbol), self.TimeRules.AfterMarketOpen(self.symbol), self.Selection)
        
        # Schedule re-download of dropbox file
        # self.Schedule.On(self.DateRules.EveryDay(self.symbol), self.TimeRules.AfterMarketOpen(self.symbol, 90), self.EveryDayAfterMarketOpen)
        # self.Schedule.On(self.DateRules.EveryDay(self.symbol), self.TimeRules.AfterMarketOpen(self.symbol), self.EveryDayAfterMarketOpen)
        # self.Schedule.On(self.DateRules.EveryDay(self.symbol), self.TimeRules.BeforeMarketClose(self.symbol), self.EveryDayAfterMarketOpen)
        # self.Schedule.On(self.DateRules.EveryDay(self.symbol), self.TimeRules.BeforeMarketClose(self.symbol, 30), self.EveryDayAfterMarketOpen)
        # self.Schedule.On(self.DateRules.On(2021, 11, 29), self.TimeRules.BeforeMarketClose(self.symbol), self.EveryDayAfterMarketOpen)
        # self.Schedule.On(self.DateRules.Every(DayOfWeek.Monday), self.TimeRules.AfterMarketOpen(self.symbol, 0), self.EveryDayAfterMarketOpen)
        # self.Schedule.On(self.DateRules.WeekStart(self.symbol), self.TimeRules.BeforeMarketClose(self.symbol), self.EveryDayAfterMarketOpen)

        self.settings.daily_precise_end_time = False
    
    def OnSecuritiesChanged(self, changes):
        for security in changes.AddedSecurities:
            # security.SetFeeModel(CustomFeeModel(self)) # Only for Backtesting
            security.SetFeeModel(CustomFeeModel()) # Only for Backtesting
            security.SetLeverage(5)
                
    def CoarseSelectionFunction(self, coarse):
        # Update stock prices
        for stock in coarse:
            symbol = stock.Symbol

            # Store monthly price.
            if symbol in self.data:
                self.data[symbol].update(stock.AdjustedPrice)
        
        if not self.selection_flag:
            return Universe.Unchanged
        self.selection_flag = False
        
        # For Scalable Version
        sortedByDollarVolume = sorted(coarse, key=lambda x: x.DollarVolume, reverse=True)
        # filtered = [ x.Symbol for x in sortedByDollarVolume if x.Price > 10 and x.DollarVolume > 10000000 ]
        selected = [x.Symbol for x in sortedByDollarVolume if x.HasFundamentalData 
                                and x.Market == 'usa' and x.Price > 5
                                and x.DollarVolume > 10000000 # My addition to help with scale
                                and x.Symbol in self.earnings_universe]
        selected = selected[:2000]
        
        # For Original Version
        # selected = [x.Symbol for x in coarse if x.HasFundamentalData 
        #                         and x.Market == 'usa' and x.Price > 5
        #                         # and x.DollarVolume > 10000000 # My addition to help with scale
        #                         and x.Symbol in self.earnings_universe]
                                
        # self.Log(str(self.Time) + " Mid Level Selected Symbols Length " + str(len(selected)))
        
        # Warming up prices for rolling window.
        for symbol in selected:
            if symbol in self.data:
                continue

            self.data[symbol] = SymbolData(self.period)
            history = self.History(symbol, self.period, Resolution.Daily)
            if history.empty:
                self.Log(f"Not enough data for {symbol} yet")
                continue
            closes = history.loc[symbol].close
            # for time, close in closes.iteritems():
            for time, close in closes.items():
                self.data[symbol].update(close)
        
        momentum = {}
        # Calculate momentum for each stock in self.earnings_universe
        for symbol in selected:
            if symbol in self.data and self.data[symbol].is_ready():
                momentum[symbol] = self.data[symbol].performance()
                
        if len(momentum) < 10:
            self.selected_symbols = []
            return Universe.Unchanged
            
        # Momentum sorting
        decile = int(len(momentum) / 10)
        sorted_by_mom = [x[0] for x in sorted(momentum.items(), key=lambda item: item[1])]
        # The investor uses only stocks from the top momentum decile
        self.selected_symbols = sorted_by_mom[-decile:]
        
        # self.Debug(str(self.Time) + " Selected Symbols Length " + str(len(self.selected_symbols)))
        # self.Log(str(self.Time) + " Selected Symbols Length " + str(len(self.selected_symbols)))
        
        return self.selected_symbols

    def DaysBefore(self):
        # Every day check if 5 days from now is any earnings day.
        earnings_date = (self.Time + BDay(5)).date()
        date_to_liquidate = (earnings_date + BDay(6)).date()
        
        if earnings_date in self.earnings:
            # Iterate over momentum selected symbols.
            for symbol in self.selected_symbols:
                ticker = symbol.Value
                # Is there any symbol which has earnings in 5 days.
                if ticker in self.earnings[earnings_date]:
                    # self.Debug(str(self.Time) + "Ticker: " + str(ticker))
                    # If there's a place for new trade.
                    if len(self.managed_symbols) < self.managed_symbols_size:
                    # if len(self.managed_symbols) < self.managed_symbols_size or self.first_run_flag: # For Minute Resolution
                        self.SetHoldings(symbol, 1 / self.managed_symbols_size)
                        
                        # NOTE: Must offset date to switch position by one day due to midnight execution of OnData function.
                        # Alternatively, there's is a possibility to switch to BeforeMarketClose function.
                        self.managed_symbols.append(ManagedSymbol(symbol, earnings_date + BDay(1), date_to_liquidate))
                        
        # self.Debug(str(self.Time) + " Managed Symbols Length " + str(len(self.managed_symbols)))
        # self.Log(str(self.Time) + " Managed Symbols Length " + str(len(self.managed_symbols)))
        
        # if len(self.managed_symbols) > 0:
        #     self.Log(str(self.Time) + " Managed Symbols: " + str([symbol.symbol.Value for symbol in self.managed_symbols]))
        
        # self.first_run_flag = False # For Minute Resolution
                    
    def OnData(self, data):
        
        self.UpdateBenchmarkValue()
        self.Plot('Strategy Equity', self.benchmarkTicker, self.benchmarkValue)
        
        # To Chart a Second Benchmark
        mkt_price = self.Securities[self.symbol].Price
        self.mkt.append(mkt_price)
        mkt_perf = 640000 * self.mkt[-1] / self.mkt[0]
        self.Plot('Strategy Equity', self.symbol, mkt_perf)
        
        # To Plot all Line Charts in separate Chart
        # Portfolio Value
        self.Plot("Equity", 'Earnings', self.Portfolio.TotalPortfolioValue)
        # Benchmarks
        self.Plot('Equity', self.benchmarkTicker, self.benchmarkValue)
        self.Plot('Equity', self.symbol, mkt_perf)
        
        # Switch positions on earnings days.
        today = self.Time.date()
        # current_time = self.Time # For Minute Resolution
        
        symbols_to_delete = []
        for symbol in self.managed_symbols:
            # Switch from long to short.
            if symbol.date_to_switch == today:
                self.SetHoldings(symbol.symbol, -1 / self.managed_symbols_size)
                # if (current_time.hour == 9 and current_time.minute == 45): # For Minute Resolution
                # if (current_time.hour == 9 and current_time.minute == 32): # For Minute Resolution
                    # self.SetHoldings(symbol.symbol, -1 / self.managed_symbols_size) # For Minute Resolution
            
            # Liquidate
            elif symbol.date_to_liquidate <= today:
                self.Liquidate(symbol.symbol)
                symbols_to_delete.append(symbol)
                # if (current_time.hour == 9 and current_time.minute == 31): # For Minute Resolution
                    # Allocate 0% of portfolio value to Symbol and do not liquidate other holdings before starting
                    # If you want to liquidate other holdings before starting, set the bool to True
                    # self.SetHoldings(symbol.symbol, 0, False, "Liquidated")
                
                    # symbols_to_delete.append(symbol)
                    
                # self.SetHoldings(symbol.symbol, 0, False, "Liquidated")
                
        # Remove symbols from management.
        for symbol in symbols_to_delete:
            self.managed_symbols.remove(symbol)
            
        # self.Log(str(type(self.managed_symbols)))
        
        # DRAFT of code to clear out symbols if price data is missing on above attempt
        all_symbols = [ x.Value for x in self.Portfolio.Keys ]
        for ticker in all_symbols:
            
            # if self.Portfolio[ticker].Invested and ticker not in [symbol.symbol.Value for symbol in self.managed_symbols]:
            #     self.SetHoldings(ticker, 0, False, "Liquidated")
                
            if self.Portfolio[ticker].Invested and ticker in [symbol.symbol.Value for symbol in self.managed_symbols]:
                # self.Log(str(ticker))
                # self.Log(str(self.Time) + " Managed Symbols: " + str(ticker))
                continue
            
    def UpdateBenchmarkValue(self):
        ''' Simulate buy and hold the Benchmark '''
        # if self.initBenchmarkPrice is None:
        if self.initBenchmarkPrice == 0: # Use if Plotting Short Position of Benchmark
            self.initBenchmarkCash = self.Portfolio.Cash
            self.initBenchmarkPrice = self.Benchmark.Evaluate(self.Time)
            self.benchmarkValue = self.initBenchmarkCash
        else:
            currentBenchmarkPrice = self.Benchmark.Evaluate(self.Time)
            # self.benchmarkValue = (currentBenchmarkPrice / self.initBenchmarkPrice) * self.initBenchmarkCash
            
            # Use if Plotting Short Position of Benchmark
            lastReturn = ((currentBenchmarkPrice / self.initBenchmarkPrice) - 1) * self.benchmarkExposure
            self.benchmarkValue = (1 + lastReturn) * self.initBenchmarkCash
    
    def Selection(self):
        # Quarter selection
        if self.Time.month in [3, 6, 9, 12]: # This works only if MonthStart is used
            self.selection_flag = True
            
    def EveryDayAfterMarketOpen(self):
        
        #Nice to have to help prevent variable from growing too large.
        #If we trim the Dropbox file, we will naturally have a smaller amount of data stored in this variable.
        self.earnings = {} 
        
        #Nice to have to help prevent variable from growing too large.
        #If we trim the Dropbox file, we will naturally have a smaller amount of data stored in this variable.
        # self.Debug(str(self.Time) + " Count of Earnings Universe " + str(len(self.earnings_universe)))
        # self.Log(str(self.Time) + " Count of Earnings Universe " + str(len(self.earnings_universe)))
        self.earnings_universe = [] # Stored earnings symbols
        
        # earnings_data = None
        
        # Daily price data
        # self.data = {}
        
        # days_before_earnings = []
        
        #Nice to have to help prevent variable from growing too large.
        #If we trim the Dropbox file, we will naturally have a smaller amount of data stored in this variable.
        self.earnings_set = set() # Ultimately, this is used to subscribe to data through AddEquity function
        # earnings_set = set()
        
        earnings_data = self.Download("https://www.dropbox.com/s/tacy2gwti1mu879/quantpedia_marketwatch_earnings_cal_3.8.csv?dl=1") # For Testing Only
        # earnings_data = self.Download("https://www.dropbox.com/s/lx1sl7rxw9hw257/quantpedia_marketwatch_earnings_cal_3.8.csv?dl=1")
        # earnings_data = self.Download("https://www.dropbox.com/s/3631k5cc31z8pg5/quantpedia_marketwatch_earnings_cal_3.9.csv?dl=1")
        # earnings_data = self.Download("https://www.dropbox.com/s/lrbgzuazci2vjgn/quantpedia_marketwatch_earnings_cal_3.9.csv?dl=1")
        # earnings_data = self.Download("https://www.dropbox.com/s/lrmbn6z1s40qv04/quantpedia_marketwatch_earnings_cal_4.0.csv?dl=1")
        
        # earnings_data_slices = earnings_data.split('\r\n')
        earnings_data_slices = earnings_data.split('\n') # This splittig works with the MarketWatch Scrape
        # self.Debug(str(earnings_data_slices))
        # self.Log(str(earnings_data_slices))
        # for earnings_date in earnings_data_slices[-10:]:
        for earnings_date in earnings_data_slices:
            data = earnings_date.split(';')
            date = data[0]
            # self.Log(str(date))
            if date == '' : continue
            
            date = datetime.strptime(date, "%Y-%m-%d").date()
            self.earnings[date] = []
            # days_before_earnings.append(date - BDay(5))
            self.days_before_earnings.append(date - BDay(5))
            
            index = 1
            while index < len(data):
                ticker = data[index]
                self.earnings[date].append(ticker)
                # earnings_set.add(ticker)
                self.earnings_set.add(ticker)
                index += 1
        
        # self.Debug(str(self.Time) + "Last Earnings Date: " + str(list(self.earnings.items())[-1]))
        # self.Debug(str(self.Time) + "Last Ticker: " + str(list(self.earnings_set)[-1]))
        # self.Debug(str(self.Time) + "Last Ticker: " + str(list(earnings_set)[-1]))
        
        # for ticker in earnings_set:
        for ticker in self.earnings_set:
            symbol = self.AddEquity(ticker, Resolution.Daily).Symbol
            # symbol = self.AddEquity(ticker, Resolution.Minute).Symbol # For Minute Resolution
            self.earnings_universe.append(symbol)
            
        # This is critical to enable Dropbox to feed dates out of order from the scrape
        # Another possible solution would be to use self.Schedule.On(self.DateRules.EveryDay() , self.TimeRules.AfterMarketOpen(self.symbol), self.DaysBefore)
        # days_before_earnings = sorted(days_before_earnings)
        self.days_before_earnings = sorted(self.days_before_earnings)
        
        self.selection_flag = True # VERY IMPORTANT to trade immediately with updated earnings_universe
        
        # self.Debug(str(self.Time) + "Purchasing AAPL: " + str(slice["SPY"].Price))
        # self.Debug(str(self.Time) + "Reached End of Dropbox Update: " + str(self.days_before_earnings[-1:]))
        
        
class SymbolData():
    def __init__(self, period):
        self.Closes = RollingWindow[float](period)
        
    def update(self, close):
        self.Closes.Add(close)
        
    def is_ready(self):
        return self.Closes.IsReady
        
    def performance(self):
        closes = [x for x in self.Closes]
        return (closes[0] - closes[-1]) / closes[-1]

class ManagedSymbol():
    def __init__(self, symbol, date_to_switch, date_to_liquidate):
        self.symbol = symbol
        self.date_to_switch = date_to_switch
        self.date_to_liquidate = date_to_liquidate
        
# Custom fee model
# class CustomFeeModel(FeeModel):
class CustomFeeModel:
    def GetOrderFee(self, parameters):
        fee = parameters.Security.Price * parameters.Order.AbsoluteQuantity * 0.00005
        return OrderFee(CashAmount(fee, "USD"))