Overall Statistics
Total Trades
44
Average Win
5.59%
Average Loss
-1.36%
Compounding Annual Return
360.388%
Drawdown
18.000%
Expectancy
2.895
Net Profit
127.033%
Sharpe Ratio
7.399
Probabilistic Sharpe Ratio
98.652%
Loss Rate
24%
Win Rate
76%
Profit-Loss Ratio
4.11
Alpha
2.348
Beta
1.067
Annual Standard Deviation
0.375
Annual Variance
0.14
Information Ratio
6.911
Tracking Error
0.344
Treynor Ratio
2.599
Total Fees
$103.35
Estimated Strategy Capacity
$600000.00
from QuantConnect.Data.UniverseSelection import *
import math
import numpy as np
import pandas as pd
import scipy as sp
# import statsmodels.api as sm

class FundamentalFactorAlgorithm(QCAlgorithm):

    def Initialize(self):

        self.SetStartDate(2020, 10, 10)  #Set Start Date
        #self.SetEndDate(2005, 10, 10)  #Set Start Date       
        self.SetCash(50000)            #Set Strategy Cash
    
        
        self.UniverseSettings.Resolution = Resolution.Daily
        self.AddUniverse(self.CoarseSelectionFunction, self.FineSelectionFunction)
        self.spy = self.AddEquity("SPY", Resolution.Hour).Symbol 
        self.holding_months = 1
        self.num_screener = 40
        self.num_stocks = 4
        self.formation_days = 126
        self.lowmom = False
        self.month_count = self.holding_months
        self.Schedule.On(self.DateRules.MonthStart("SPY"), self.TimeRules.At(0, 0), Action(self.monthly_rebalance))
        self.Schedule.On(self.DateRules.MonthStart("SPY"), self.TimeRules.At(10, 0), Action(self.rebalance))
        # rebalance the universe selection once a month
        self.rebalence_flag = 0
        # make sure to run the universe selection at the start of the algorithm even it's not the manth start
        self.first_month_trade_flag = 1
        self.trade_flag = 0 
        self.symbols = None
 
    def CoarseSelectionFunction(self, coarse):
        if self.rebalence_flag or self.first_month_trade_flag:
            # drop stocks which have no fundamental data or have too low prices
            selected = [x for x in coarse if (x.HasFundamentalData) and (float(x.Price) > 5)]
            # rank the stocks by dollar volume 
            filtered = sorted(selected, key=lambda x: x.DollarVolume, reverse=True) 
    
            return [ x.Symbol for x in filtered[:1000]]
        else:
            return self.symbols


    def FineSelectionFunction(self, fine):
        if self.rebalence_flag or self.first_month_trade_flag:
            try:
                filtered_fine = [x for x in fine if (x.ValuationRatios.EVToEBITDA > 0) 
                                                    and (x.EarningReports.BasicAverageShares.ThreeMonths > 0) 
                                                    and x.EarningReports.BasicAverageShares.ThreeMonths * (x.EarningReports.BasicEPS.TwelveMonths*x.ValuationRatios.PERatio) > 2e9]
            except:
                filtered_fine = [x for x in fine if (x.ValuationRatios.EVToEBITDA > 0) 
                                                and (x.EarningReports.BasicAverageShares.ThreeMonths > 0)] 
            sortedByfactor1 = sorted(filtered_fine, key=lambda x: x.OperationRatios.ROIC.Value, reverse=True)
            sortedByfactor2 = sorted(filtered_fine, key=lambda x: x.OperationRatios.LongTermDebtEquityRatio.Value, reverse=True)
            sortedByfactor3 = sorted(filtered_fine, key=lambda x: x.ValuationRatios.FCFYield, reverse=True)
        
            stock_dict = {}
        
            # assign a score to each stock, you can also change the rule of scoring here.
            for i,ele in enumerate(sortedByfactor1):
                rank1 = i
                rank2 = sortedByfactor2.index(ele)
                rank3 = sortedByfactor3.index(ele)
                score = sum([rank1*0.4,rank2*0.2,rank3*0.4])
                stock_dict[ele] = score
        
        # sort the stocks by their scores
            self.sorted_stock = sorted(stock_dict.items(), key=lambda d:d[1],reverse=False)
            sorted_symbol = [x[0] for x in self.sorted_stock]
            self.sorted_stock = sorted(stock_dict.items(), key=lambda d:d[1],reverse=True)
            #
            self.sorted_symbol = [self.sorted_stock[i][0] for i in range(len(self.sorted_stock))]
            top= self.sorted_symbol[:self.num_screener]
            
            #top = sorted(filtered_fine, key = lambda x: x.ValuationRatios.FCFYield, reverse=True)[:self.num_screener]
            self.symbols = [x.Symbol for x in top]
            
            self.rebalence_flag = 0
            self.first_month_trade_flag = 0
            self.trade_flag = 1
            return self.symbols
        else:
            return self.symbols
    
    def OnData(self, data):
        pass
    
    def monthly_rebalance(self):
        self.rebalence_flag = 1

    def rebalance(self):
        spy_hist = self.History([self.spy], 200, Resolution.Daily).loc[str(self.spy)]['close']
        spy_hist1=self.History([self.spy], 50, Resolution.Daily).loc[str(self.spy)]['close']
        
        if self.Securities[self.spy].Price < spy_hist.mean() and self.Securities[self.spy].Price < spy_hist1.mean():
            
            for symbol in self.Portfolio.Keys:
                if symbol.Value != "TLT":
                    self.Liquidate()
            self.AddEquity("TLT")
            self.SetHoldings("TLT", 1)
            return
        if self.Securities[self.spy].Price < spy_hist.mean() and self.Securities[self.spy].Price > spy_hist1.mean():
            self.state=True
        elif self.Securities[self.spy].Price > spy_hist.mean():
            self.state=False
        if self.symbols is None: return
        chosen_df = self.calc_return(self.symbols)
        chosen_df = chosen_df.iloc[:self.num_stocks]
        
        self.existing_pos = 0
        add_symbols = []
        for symbol in self.Portfolio.Keys:
            if symbol.Value == 'SPY': continue
            if (symbol.Value not in chosen_df.index):  
                self.SetHoldings(symbol, 0)
            elif (symbol.Value in chosen_df.index): 
                self.existing_pos += 1
            
        weight = 0.99/len(chosen_df)
        for symbol in chosen_df.index:
            self.AddEquity(symbol)
            self.SetHoldings(symbol, weight)    
                
    def calc_return(self, stocks):
        hist = self.History(stocks, self.formation_days, Resolution.Daily)
        current = self.History(stocks, 10, Resolution.Minute)
        
        self.price = {}
        ret = {}
     
        for symbol in stocks:
            if str(symbol) in hist.index.levels[0] and str(symbol) in current.index.levels[0]:
                self.price[symbol.Value] = list(hist.loc[str(symbol)]['close'])
                self.price[symbol.Value].append(current.loc[str(symbol)]['close'][0])
        
        for symbol in self.price.keys():
            ret[symbol] = (self.price[symbol][-1] - self.price[symbol][0]) / self.price[symbol][0]
        df_ret = pd.DataFrame.from_dict(ret, orient='index')
        df_ret.columns = ['return']
        if self.state==True: 
        #if self.Securities[self.spy].Price < spy_hist.mean() and self.Securities[self.spy].Price > spy_hist1.mean()::
            sort_return = df_ret.sort_values(by = ['return'], ascending = True)
        else:
            sort_return = df_ret.sort_values(by = ['return'], ascending = self.lowmom)
        #sort_return = df_ret.sort_values(by = ['return'], ascending = self.lowmom)
        return sort_return