Overall Statistics
Total Trades
184
Average Win
2.88%
Average Loss
-2.14%
Compounding Annual Return
160.009%
Drawdown
20.600%
Expectancy
0.442
Net Profit
160.009%
Sharpe Ratio
3.754
Probabilistic Sharpe Ratio
94.111%
Loss Rate
39%
Win Rate
61%
Profit-Loss Ratio
1.35
Alpha
1.322
Beta
0.008
Annual Standard Deviation
0.353
Annual Variance
0.124
Information Ratio
2.886
Tracking Error
0.37
Treynor Ratio
160.951
Total Fees
$1237.53
from System.Collections.Generic import List
from QuantConnect.Data.UniverseSelection import *
import operator
from math import ceil,floor
from scipy import stats
import numpy as np
from datetime import timedelta

class Piotroski(QCAlgorithm):
    
    def Initialize(self):
        ''' Backtesting Parameters '''
        self.SetStartDate(2019, 1, 1)  
        self.SetEndDate(2020, 1, 1)
        self.SetCash(50000)            

        ''' Universe Settings '''
        self.benchmark = Symbol.Create("SPY", SecurityType.Equity, Market.USA)
        self.UniverseSettings.Resolution = Resolution.Daily        
        self.AddUniverse(self.CoarseSelectionFunction, self.FineSelectionFunction)
        self.topScoreSymbolsCoarse = 10000
        self.topScoreSymbolsFine   = 30

        ''' Schedule Settings '''
        self.AddEquity("SPY", Resolution.Daily  )
        self.SetBenchmark("SPY")
        self.Schedule.On(self.DateRules.MonthStart("SPY"), self.TimeRules.AfterMarketOpen("SPY"), Action(self.Rebalance))
        
        ''' Other Settings '''
        self.month   = -1
        self.symbols = []
        self.changes = []
        self.qualityStocks = []
        self.lookback = 7
        
        self.initiated = -1
        
    def CoarseSelectionFunction(self, coarse):
        if self.month != self.Time.month:
            sortedCoarse = [x for x in coarse 
                            if x.HasFundamentalData 
                            and x.Price > 5]
            sortedDollarVolume =  sorted(sortedCoarse, key=lambda x: x.DollarVolume, reverse=True) 
            topCoarse = sortedDollarVolume[:self.topScoreSymbolsCoarse]
            return [x.Symbol for x in topCoarse]
        else: return self.symbols

    def FineSelectionFunction(self, fine):
        if self.month != self.Time.month:
            self.month = self.Time.month

            ''' Retrieve all stocks that have the valid variation ratios that we want '''
            filteredFine = [x for x in fine if x.FinancialStatements.IncomeStatement.NetIncome.TwelveMonths 
                                            and x.FinancialStatements.CashFlowStatement.CashFlowFromContinuingOperatingActivities.TwelveMonths 
                                            and x.OperationRatios.ROA.ThreeMonths 
                                            and x.OperationRatios.ROA.OneYear 
                                            and x.FinancialStatements.BalanceSheet.ShareIssued.ThreeMonths 
                                            and x.FinancialStatements.BalanceSheet.ShareIssued.TwelveMonths 
                                            and x.OperationRatios.GrossMargin.ThreeMonths 
                                            and x.OperationRatios.GrossMargin.OneYear 
                                            and x.OperationRatios.LongTermDebtEquityRatio.ThreeMonths 
                                            and x.OperationRatios.LongTermDebtEquityRatio.OneYear  
                                            and x.OperationRatios.CurrentRatio.ThreeMonths 
                                            and x.OperationRatios.CurrentRatio.OneYear  
                                            and x.OperationRatios.AssetsTurnover.ThreeMonths 
                                            and x.OperationRatios.AssetsTurnover.OneYear  
                                            and x.ValuationRatios.NormalizedPERatio
                                            and x.EarningReports.BasicAverageShares.ThreeMonths 
                                            and x.EarningReports.BasicEPS.TwelveMonths]
                                    
            ''' Using the FScore class, retrieve the stocks that have a score of X or higher '''    
            sortedByFScore = [x for x in filteredFine if FScore(x.FinancialStatements.IncomeStatement.NetIncome.TwelveMonths,
                                                                x.FinancialStatements.CashFlowStatement.CashFlowFromContinuingOperatingActivities.TwelveMonths,
                                                                x.OperationRatios.ROA.ThreeMonths, 
                                                                x.OperationRatios.ROA.OneYear,
                                                                x.FinancialStatements.BalanceSheet.ShareIssued.ThreeMonths, 
                                                                x.FinancialStatements.BalanceSheet.ShareIssued.TwelveMonths,  
                                                                x.OperationRatios.GrossMargin.ThreeMonths, 
                                                                x.OperationRatios.GrossMargin.OneYear,
                                                                x.OperationRatios.LongTermDebtEquityRatio.ThreeMonths, 
                                                                x.OperationRatios.LongTermDebtEquityRatio.OneYear,
                                                                x.OperationRatios.CurrentRatio.ThreeMonths, 
                                                                x.OperationRatios.CurrentRatio.OneYear,
                                                                x.OperationRatios.AssetsTurnover.ThreeMonths, 
                                                                x.OperationRatios.AssetsTurnover.OneYear).ObjectiveScore() > 6
                                                                ]
                                                                
            self.qualityStocks = sortedByFScore
            
            ''' The Piotroski score ranks quality stocks, but we still need to determine value by filtering it more '''
            sortedByNormalizedPE = sorted(sortedByFScore, key=lambda x: (FScore(x.FinancialStatements.IncomeStatement.NetIncome.TwelveMonths,
                                                                x.FinancialStatements.CashFlowStatement.CashFlowFromContinuingOperatingActivities.TwelveMonths,
                                                                x.OperationRatios.ROA.ThreeMonths, 
                                                                x.OperationRatios.ROA.OneYear,
                                                                x.FinancialStatements.BalanceSheet.ShareIssued.ThreeMonths, 
                                                                x.FinancialStatements.BalanceSheet.ShareIssued.TwelveMonths,  
                                                                x.OperationRatios.GrossMargin.ThreeMonths, 
                                                                x.OperationRatios.GrossMargin.OneYear,
                                                                x.OperationRatios.LongTermDebtEquityRatio.ThreeMonths, 
                                                                x.OperationRatios.LongTermDebtEquityRatio.OneYear,
                                                                x.OperationRatios.CurrentRatio.ThreeMonths, 
                                                                x.OperationRatios.CurrentRatio.OneYear,
                                                                x.OperationRatios.AssetsTurnover.ThreeMonths, 
                                                                x.OperationRatios.AssetsTurnover.OneYear).ObjectiveScore(),
                                                                x.ValuationRatios.NormalizedPERatio), reverse = False)
                                                                        
            topFine = sortedByNormalizedPE[:self.topScoreSymbolsFine]

            self.symbols = [i.Symbol for i in topFine]
            return self.symbols
        else: return self.symbols
        
        if self.initiated == -1:
            Rebalance()
            self.initiated == 1
        
    def Rebalance(self):

        # Fetch the historical data to perform the linear regression
        history = self.History(
            self.symbols + [self.benchmark], 
            self.lookback,
            Resolution.Daily).close.unstack(level=0)
            
        symbols = self.SelectSymbols(history)

        # Liquidate positions that are not held by selected symbols
        for holdings in self.Portfolio.Values:
            symbol = holdings.Symbol
            if symbol not in self.qualityStocks and holdings.Invested:
                self.Liquidate(symbol)

        # Invest 100% in the selected symbols
        for symbol in symbols:
            self.SetHoldings(symbol, .25)


    def SelectSymbols(self, history):
        '''Select symbols with the highest intercept/alpha to the benchmark
        '''
        alphas = dict()

        # Get the benchmark returns
        benchmark = history[self.benchmark].pct_change().dropna()

        # Conducts linear regression for each symbol and save the intercept/alpha
        for symbol in self.symbols:
            
            # Get the security returns
            if not symbol in history: continue
            returns = history[symbol].pct_change().dropna()
            returns = np.vstack([returns, np.ones(len(returns))]).T
            if len(returns) != len(benchmark): continue

            # Simple linear regression function in Numpy
            result = np.linalg.lstsq(returns, benchmark)
            alphas[symbol] = result[0][1]

        # Select symbols with the highest intercept/alpha to the benchmark
        selected = sorted(alphas.items(), key=lambda x: x[1], reverse=True)[:10]
        return [x[0] for x in selected]
         

class FScore(object):
    
    def __init__(self, 
                 netincome, 
                 operating_cashflow, 
                 roa_current,
                 roa_past, 
                 issued_current, 
                 issued_past, 
                 grossm_current, 
                 grossm_past,
                 longterm_current, 
                 longterm_past, 
                 curratio_current, 
                 curratio_past,
                 assetturn_current, 
                 assetturn_past):
                     
        self.netincome = netincome
        self.operating_cashflow = operating_cashflow
        self.roa_current = roa_current
        self.roa_past = roa_past
        self.issued_current = issued_current
        self.issued_past = issued_past
        self.grossm_current = grossm_current
        self.grossm_past = grossm_past
        self.longterm_current = longterm_current
        self.longterm_past = longterm_past
        self.curratio_current = curratio_current
        self.curratio_past = curratio_past
        self.assetturn_current = assetturn_current
        self.assetturn_past = assetturn_past

    def ObjectiveScore(self):
        ''' The Piotroski score is broken down into profitability; leverage, liquidity, and source of funds; and operating efficiency categories, as follows: '''
        
        fscore = 0
        ''' Profitability Criteria '''
        fscore += np.where(self.netincome > 0,                            1, 0) # Positive Net Income (X Months?)
        fscore += np.where(self.operating_cashflow > 0,                   1, 0) # Positive Operating Cash Flow
        fscore += np.where(self.roa_current > self.roa_past,              1, 0) # Positive Return on Assets
        fscore += np.where(self.operating_cashflow > self.roa_current,    1, 0) # Cash flow from operations being greater than net income (quality of earnings)
        
        ''' Leverage, Liquidity, and Source of Dunds Criteria '''
        fscore += np.where(self.longterm_current <= self.longterm_past,   1, 0) # Lower ratio of long term debt in the current period, compared to the previous year (decreased leverage) 
        fscore += np.where(self.curratio_current >= self.curratio_past,   1, 0) # Higher current ratio this year compared to the previous year (more liquidity)
        fscore += np.where(self.issued_current <= self.issued_past,       1, 0) # No new shares were issued in the last year
        
        ''' Operating Efficiency Criteria '''
        # A higher gross margin compared to the previous year
        fscore += np.where(self.grossm_current >= self.grossm_past,       1, 0) # A higher gross margin compared to the previous year 
        fscore += np.where(self.assetturn_current >= self.assetturn_past, 1, 0) # A higher asset turnover ratio compared to the previous year (1 point)
        
        return fscore