Overall Statistics
Total Orders
9514
Average Win
0.18%
Average Loss
-0.19%
Compounding Annual Return
0.517%
Drawdown
21.300%
Expectancy
0.004
Start Equity
100000
End Equity
113710.90
Net Profit
13.711%
Sharpe Ratio
-0.339
Sortino Ratio
-0.415
Probabilistic Sharpe Ratio
0.000%
Loss Rate
48%
Win Rate
52%
Profit-Loss Ratio
0.95
Alpha
-0.014
Beta
-0.075
Annual Standard Deviation
0.051
Annual Variance
0.003
Information Ratio
-0.345
Tracking Error
0.178
Treynor Ratio
0.231
Total Fees
$317.39
Estimated Strategy Capacity
$330000000.00
Lowest Capacity Asset
GPI R735QTJ8XC9X
Portfolio Turnover
0.62%
# https://quantpedia.com/strategies/earnings-quality-factor/
#
# The investment universe consists of all non-financial stocks from NYSE, Amex and Nasdaq. Big stocks are defined as the largest stocks
# that make up 90% of the total market cap within the region, while small stocks make up the remaining 10% of the market cap. Investor defines
# breakpoints by the 30th and 70th percentiles of the multiple “Earnings Quality” ratios between large caps and small caps.
# The first “Earnings Quality” ratio is defined by cash flow relative to reported earnings. The high-quality earnings firms are characterized
# by high cash flows (relative to reported earnings) while the low-quality firms are characterized by high reported earnings (relative to cash flow).
# The second factor is based on return on equity (ROE) to exploit the well-documented “profitability anomaly” by going long high-ROE firms 
# (top 30%) and short low-ROE firms (bottom 30%). The third ratio – CF/A (cash flow to assets) factor goes long firms with high cash flow to total assets.
# The fourth ratio – D/A (debt to assets) factor goes long firms with low leverage and short firms with high leverage.
# The investor builds a scored composite quality metric by computing the percentile score of each stock on each of the four quality metrics
# (where “good” quality has a high score, so ideally a stock has low accruals, low leverage, high ROE, and high cash flow) and then add up 
# the percentiles to get a score for each stock from 0 to 400. He then forms the composite factor by going long the top 30% of small-cap 
# stocks and also large-cap stocks and short the bottom 30% of the small-cap stocks and also large-cap stocks and cap-weighting individual 
# stocks within the portfolios. The final factor portfolio is formed at the end of each June and is rebalanced yearly.
#
# QC implementation changes:
#   - Universe consists of 500 most liquid US non-financial stocks by market cap from NYSE, AMEX and NASDAQ.

#region imports
from AlgorithmImports import *
import numpy as np
from typing import List, Dict
from numpy import isnan
from dataclasses import dataclass
#endregion

class EarningsQualityFactor(QCAlgorithm):

    def Initialize(self) -> None:
        self.SetStartDate(2000, 1, 1)
        self.SetCash(100_000)

        self.exchange_codes: List[str] = ['NYS', 'NAS', 'ASE']
        self.tickers_to_ignore: List[str] = ['TOPS', 'CRW']

        self.fundamental_count = 500
        self.fundamental_sorting_key = lambda x: x.DollarVolume

        self.leverage: int = 10
        self.quantile: int = 3
        self.rebalance_month: int = 7

        market: Symbol = self.AddEquity('SPY', Resolution.Daily).Symbol
        
        self.accruals_data: Dict[Symbol, AcrrualsData] = {}
        
        self.long: List[Symbol] = []
        self.short: List[Symbol] = []
        
        self.data: Dict[Symbol, StockData] = {}
        
        self.selection_flag: bool = True
        self.UniverseSettings.Leverage = self.leverage
        self.UniverseSettings.Resolution = Resolution.Daily
        self.AddUniverse(self.FundamentalSelectionFunction)
        self.Settings.MinimumOrderMarginPortfolioPercentage = 0.
        self.Schedule.On(self.DateRules.MonthEnd(market), self.TimeRules.AfterMarketOpen(market), self.Selection)

        self.settings.daily_precise_end_time = False

    def OnSecuritiesChanged(self, changes: SecurityChanges) -> None:
        for security in changes.AddedSecurities:
            security.SetFeeModel(CustomFeeModel())

    def FundamentalSelectionFunction(self, fundamental: List[Fundamental]) -> List[Symbol]:
        if not self.selection_flag:
            return Universe.Unchanged
        
        selected: List[Fundamental] = [
            x for x in fundamental 
            if x.HasFundamentalData 
            and x.Market == 'usa'
            and x.MarketCap != 0 
            and x.SecurityReference.ExchangeId in self.exchange_codes
            and x.CompanyReference.IndustryTemplateCode != "B"
            and not isnan(x.FinancialStatements.BalanceSheet.CurrentAssets.Value) and x.FinancialStatements.BalanceSheet.CurrentAssets.Value != 0 
            and not isnan(x.FinancialStatements.BalanceSheet.CashAndCashEquivalents.Value) and x.FinancialStatements.BalanceSheet.CashAndCashEquivalents.Value != 0 
            and not isnan(x.FinancialStatements.BalanceSheet.CurrentLiabilities.Value) and x.FinancialStatements.BalanceSheet.CurrentLiabilities.Value != 0
            and not isnan(x.FinancialStatements.BalanceSheet.CurrentDebt.Value) and x.FinancialStatements.BalanceSheet.CurrentDebt.Value != 0
            and not isnan(x.FinancialStatements.IncomeStatement.DepreciationAndAmortization.Value) and x.FinancialStatements.IncomeStatement.DepreciationAndAmortization.Value != 0 
            and not isnan(x.FinancialStatements.BalanceSheet.GrossPPE.Value) and x.FinancialStatements.BalanceSheet.GrossPPE.Value != 0
            and not isnan(x.FinancialStatements.IncomeStatement.TotalRevenueAsReported.Value ) and x.FinancialStatements.IncomeStatement.TotalRevenueAsReported.Value != 0
            and not isnan(x.FinancialStatements.CashFlowStatement.OperatingCashFlow.Value) and x.FinancialStatements.CashFlowStatement.OperatingCashFlow.Value != 0
            and not isnan(x.EarningReports.BasicEPS.Value) and x.EarningReports.BasicEPS.Value != 0
            and not isnan(x.EarningReports.BasicAverageShares.Value) and x.EarningReports.BasicAverageShares.Value != 0 
            and not isnan(x.operation_ratios.debt_to_assets.Value) and x.operation_ratios.debt_to_assets.Value != 0
            and not isnan(x.OperationRatios.ROE.Value) and x.OperationRatios.ROE.Value != 0
            and x.Symbol.Value not in self.tickers_to_ignore
        ]
        
        if len(selected) > self.fundamental_count:
            selected = [x for x in sorted(selected, key=self.fundamental_sorting_key, reverse=True)[:self.fundamental_count]]

        for stock in selected:
            symbol = stock.Symbol

            if symbol not in self.accruals_data:
                # Data for previous year.
                self.accruals_data[symbol] = None
                
            # Accrual calc.
            current_accruals_data: AcrrualsData = AcrrualsData(stock.FinancialStatements.BalanceSheet.CurrentAssets.Value, stock.FinancialStatements.BalanceSheet.CashAndCashEquivalents.Value,
                                                stock.FinancialStatements.BalanceSheet.CurrentLiabilities.Value, stock.FinancialStatements.BalanceSheet.CurrentDebt.Value, stock.FinancialStatements.BalanceSheet.IncomeTaxPayable.Value,
                                                stock.FinancialStatements.IncomeStatement.DepreciationAndAmortization.Value, stock.FinancialStatements.BalanceSheet.TotalAssets.Value,
                                                stock.FinancialStatements.IncomeStatement.TotalRevenueAsReported.Value)
            
            # There is not previous accruals data.
            if not self.accruals_data[symbol]:
                self.accruals_data[symbol] = current_accruals_data
                continue
            
            current_accruals: float = self.CalculateAccruals(current_accruals_data, self.accruals_data[symbol])
            
            # cash flow to assets
            CFA: float = stock.FinancialStatements.CashFlowStatement.OperatingCashFlow.Value / (stock.EarningReports.BasicEPS.Value * stock.EarningReports.BasicAverageShares.Value)
            # debt to assets
            DA: float = stock.operation_ratios.debt_to_assets.Value
            # return on equity
            ROE: float = stock.OperationRatios.ROE.Value
            
            if symbol not in self.data:
                self.data[symbol] = None

            self.data[symbol] = StockData(current_accruals, CFA, DA, ROE)
            self.accruals_data[symbol] = current_accruals_data

        # Remove not updated symbols.
        updated_symbols: List[Symbol] = [x.Symbol for x in selected]
        not_updated: List[Symbol] = [x for x in self.data if x not in updated_symbols]
        for symbol in not_updated:
            del self.data[symbol]
            del self.accruals_data[symbol]
            
        return [x[0] for x in self.data.items()]
    
    def OnData(self, slice: Slice) -> None:
        if not self.selection_flag:
            return
        self.selection_flag = False

        # Sort stocks by four factors respectively.
        sorted_by_accruals: List[Tuple[Symbol, float]] = sorted(self.data.items(), key=lambda x: x[1].Accruals, reverse=True) # high score with low accrual 
        sorted_by_CFA: List[Tuple[Symbol, float]] = sorted(self.data.items(), key=lambda x: x[1].CFA)                       # high score with high CFA
        sorted_by_DA: List[Tuple[Symbol, float]] = sorted(self.data.items(), key=lambda x: x[1].DA, reverse=True)           # high score with low leverage
        sorted_by_ROE: List[Tuple[Symbol, float]] = sorted(self.data.items(), key=lambda x: x[1].ROE)                       # high score with high ROE
        
        score = {}

        # Assign a score to each stock according to their rank with different factors.
        for i, obj in enumerate(sorted_by_accruals):
            score_accruals = i
            score_CFA = sorted_by_CFA.index(obj)
            score_DA = sorted_by_DA.index(obj)
            score_ROE = sorted_by_ROE.index(obj)
            score[obj[0]] = score_accruals + score_CFA + score_DA + score_ROE
                
        sorted_by_score: List[Tuple[Symbol, float]] = sorted(score.items(), key = lambda x: x[1], reverse = True)
        quantile: int = int(len(sorted_by_score) / self.quantile)
        long: List[Symbol] = [x[0] for x in sorted_by_score[:quantile]]
        short: List[Symbol] = [x[0] for x in sorted_by_score[-quantile:]]
        
        # Trade execution.
        # NOTE: Skip year 2007 due to data error.
        # if self.Time.year == 2007:
        #     self.Liquidate()
        #     return
        
        targets: List[PortfolioTarget] = []
        for i, portfolio in enumerate([long, short]):
            for symbol in portfolio:
                if slice.contains_key(symbol) and slice[symbol]:
                    targets.append(PortfolioTarget(symbol, ((-1) ** i) / len(portfolio)))
        
        self.SetHoldings(targets, True)

    # Source: https://papers.ssrn.com/sol3/papers.cfm?abstract_id=3188172                
    def CalculateAccruals(self, current_accrual_data, prev_accrual_data) -> float:
        delta_assets: float = current_accrual_data.CurrentAssets - prev_accrual_data.CurrentAssets
        delta_cash: float = current_accrual_data.CashAndCashEquivalents - prev_accrual_data.CashAndCashEquivalents
        delta_liabilities: float = current_accrual_data.CurrentLiabilities - prev_accrual_data.CurrentLiabilities
        delta_debt: float = current_accrual_data.CurrentDebt - prev_accrual_data.CurrentDebt
        dep: float = current_accrual_data.DepreciationAndAmortization
        total_assets_prev_year: float = prev_accrual_data.TotalAssets
        
        acc: float = (delta_assets - delta_liabilities - delta_cash + delta_debt - dep) / total_assets_prev_year
        return acc
    
    def Selection(self) -> None:
        if self.Time.month == self.rebalance_month:
            self.selection_flag = True
        
@dataclass
class AcrrualsData():
    CurrentAssets: float
    CashAndCashEquivalents: float
    CurrentLiabilities: float
    CurrentDebt: float 
    IncomeTaxPayable: float 
    DepreciationAndAmortization: float
    TotalAssets: float 
    Sales: float

@dataclass     
class StockData():
    Accruals: AcrrualsData
    CFA: float
    DA: float 
    ROE: float

def MultipleLinearRegression(x, y):
    x = np.array(x).T
    x = sm.add_constant(x)
    result = sm.OLS(endog=y, exog=x).fit()
    return result

# Custom fee model.
class CustomFeeModel(FeeModel):
    def GetOrderFee(self, parameters: OrderFeeParameters) -> OrderFee:
        fee: float = parameters.Security.Price * parameters.Order.AbsoluteQuantity * 0.00005
        return OrderFee(CashAmount(fee, "USD"))