Overall Statistics
Total Trades
0
Average Win
0%
Average Loss
0%
Compounding Annual Return
0%
Drawdown
0%
Expectancy
0
Net Profit
0%
Sharpe Ratio
0
Probabilistic Sharpe Ratio
0%
Loss Rate
0%
Win Rate
0%
Profit-Loss Ratio
0
Alpha
0
Beta
0
Annual Standard Deviation
0
Annual Variance
0
Information Ratio
-1.692
Tracking Error
0.141
Treynor Ratio
0
Total Fees
$0.00
Estimated Strategy Capacity
$0
Lowest Capacity Asset
Portfolio Turnover
0%
#region imports
from AlgorithmImports import *
#endregion
from QuantConnect.Data.UniverseSelection import *
import math
import numpy as np
import pandas as pd
import scipy as sp
# import statsmodels.api as sm

class InOutWithFundamentalFactorAlgorithm(QCAlgorithm):

    def Initialize(self):

        self.SetStartDate(2023, 1, 1)  #Set Start Date
        #self.SetEndDate(2022, 11, 27)  #Set Start Date       
        self.SetCash(100000)            #Set Strategy Cash
        
        res = Resolution.Minute
        # Feed-in constants
        self.INI_WAIT_DAYS = 15  # out for 3 trading weeks
        
        # Market and list of signals based on ETFs
        self.MRKT = self.AddEquity('SPY', res).Symbol  # market
        self.PRDC = self.AddEquity('XLI', res).Symbol  # production (industrials)
        self.METL = self.AddEquity('DBB', res).Symbol  # input prices (metals)
        self.NRES = self.AddEquity('IGE', res).Symbol  # input prices (natural res)
        self.DEBT = self.AddEquity('SHY', res).Symbol  # cost of debt (bond yield)
        self.USDX = self.AddEquity('UUP', res).Symbol  # safe haven (USD)
        self.GOLD = self.AddEquity('GLD', res).Symbol  # gold
        self.SLVA = self.AddEquity('SLV', res).Symbol  # vs silver
        self.UTIL = self.AddEquity('XLU', res).Symbol  # utilities
        self.INDU = self.PRDC  # vs industrials
        self.SHCU = self.AddEquity('FXF', res).Symbol  # safe haven currency (CHF)
        self.RICU = self.AddEquity('FXA', res).Symbol  # vs risk currency (AUD)

        self.FORPAIRS = [self.GOLD, self.SLVA, self.UTIL, self.SHCU, self.RICU]
        self.SIGNALS = [self.PRDC, self.METL, self.NRES, self.DEBT, self.USDX]

        # Initialize variables
        ## 'In'/'out' indicator
        self.be_in = 999 #initially, set to an arbitrary value different from 1 (in) and 0 (out)
        ## Day count variables
        self.dcount = 0  # count of total days since start
        self.outday = 0  # dcount when self.be_in=0
        ## Flexi wait days
        self.WDadjvar = self.INI_WAIT_DAYS
        self.symbols = []
        self.symbolDataBySymbol = {}
        self.trade = True
        
        for symbol in self.symbols:
            self.AddEquity(symbol, Resolution.Hour).Symbol
            ema7 = self.EMA(symbol, 7, Resolution.Daily, Field.Close)
            sma20 = self.SMA(symbol, 20, Resolution.Daily, Field.Close)
            ema20 = self.EMA(symbol, 20, Resolution.Daily, Field.Close)
            sma50 = self.SMA(symbol, 50, Resolution.Daily, Field.Close)
            ema50 = self.EMA(symbol, 50, Resolution.Daily, Field.Close)
            sma100 = self.SMA(symbol, 50, Resolution.Daily, Field.Close)
            sma200 = self.SMA(symbol, 200, Resolution.Daily, Field.Close)
            rsi = self.RSI(symbol, 14, Resolution.Daily)
            macd = self.MACD(symbol, 12, 26, 9, MovingAverageType.Exponential, Resolution.Daily)
            macdSlow = self.MACD(symbol, 26, 48, 12, MovingAverageType.Exponential, Resolution.Daily)
            self.high = self.MAX(symbol, 5, Resolution.Daily, Field.High)
            self.midhigh = self.MAX(symbol, 3, Resolution.Daily, Field.High)
            self.buylow = self.MIN(symbol, 4, Resolution.Daily, Field.Low)
            self.low = self.MIN(symbol, 5, Resolution.Daily, Field.Low)
            self.stoplow = self.MIN(symbol, 20, Resolution.Daily, Field.Low)
            DCHindicator = self.DCH(symbol, 20, 20)
            self.Trend0Tickers = Trend0('Trend0 Single Equity', period=10, exponent=1.5)

            RSIConsolidate = self.RSI(symbol, 14, MovingAverageType.Wilders)
            dayConsolidator = TradeBarConsolidator(timedelta(days=5))
            self.SubscriptionManager.AddConsolidator(symbol, dayConsolidator)
            self.RegisterIndicator(symbol, RSIConsolidate, dayConsolidator)
            
            smaConsolidate = ExponentialMovingAverage(200, MovingAverageType.Simple)
            WeekConsolidator = TradeBarConsolidator(timedelta(days=10))
            self.SubscriptionManager.AddConsolidator(symbol, WeekConsolidator)
            self.RegisterIndicator(symbol, smaConsolidate, WeekConsolidator)
            
            symbolData = SymbolData(symbol, ema7, sma20, ema20, sma50, ema50, sma100, sma200, rsi, macd, macdSlow, DCHindicator, RSIConsolidate, smaConsolidate) 
            self.symbolDataBySymbol[symbol] = symbolData


        # set a warm-up period to initialize the indicator
        self.SetWarmUp(timedelta(350))
        
        self.UniverseSettings.Resolution = Resolution.Daily
        self.AddUniverse(self.MomentumSelectionFunction, self.FundamentalSelectionFunction)
        self.num_screener = 100
        self.num_stocks = 10
        self.formation_days = 200
        self.lowmom = False
        
        # rebalance the universe selection once a month
        self.rebalence_flag = 0
        # make sure to run the universe selection at the start of the algorithm even it's not the manth start
        self.flip_flag = 0
        self.first_month_trade_flag = 1
        self.trade_flag = 0 
        
        
        self.Schedule.On(
            self.DateRules.EveryDay(),
            self.TimeRules.AfterMarketOpen('SPY', 120),
            self.rebalance_when_out_of_the_market
        )

        self.Schedule.On(
            self.DateRules.EveryDay(),
            self.TimeRules.AfterMarketOpen('SPY', 30),
            self.buySignals
        )

    def rebalance_when_out_of_the_market(self):
        # Returns sample to detect extreme observations
        hist = self.History(
            self.SIGNALS + [self.MRKT] + self.FORPAIRS, 252, Resolution.Daily)['close'].unstack(level=0).dropna()
        hist_shift = hist.apply(lambda x: (x.shift(65) + x.shift(64) + x.shift(63) + x.shift(62) + x.shift(
            61) + x.shift(60) + x.shift(59) + x.shift(58) + x.shift(57) + x.shift(56) + x.shift(55)) / 11)

        returns_sample = (hist / hist_shift - 1)
        # Reverse code USDX: sort largest changes to bottom
        returns_sample[self.USDX] = returns_sample[self.USDX] * (-1)
        # For pairs, take returns differential, reverse coded
        returns_sample['G_S'] = -(returns_sample[self.GOLD] - returns_sample[self.SLVA])
        returns_sample['U_I'] = -(returns_sample[self.UTIL] - returns_sample[self.INDU])
        returns_sample['C_A'] = -(returns_sample[self.SHCU] - returns_sample[self.RICU])    
        self.pairlist = ['G_S', 'U_I', 'C_A']

        # Extreme observations; statist. significance = 1%
        pctl_b = np.nanpercentile(returns_sample, 1, axis=0)
        extreme_b = returns_sample.iloc[-1] < pctl_b

        # Determine waitdays empirically via safe haven excess returns, 50% decay
        self.WDadjvar = int(
            max(0.50 * self.WDadjvar,
                self.INI_WAIT_DAYS * max(1,
                                         np.where((returns_sample[self.GOLD].iloc[-1]>0) & (returns_sample[self.SLVA].iloc[-1]<0) & (returns_sample[self.SLVA].iloc[-2]>0), self.INI_WAIT_DAYS, 1),
                                         np.where((returns_sample[self.UTIL].iloc[-1]>0) & (returns_sample[self.INDU].iloc[-1]<0) & (returns_sample[self.INDU].iloc[-2]>0), self.INI_WAIT_DAYS, 1),
                                         np.where((returns_sample[self.SHCU].iloc[-1]>0) & (returns_sample[self.RICU].iloc[-1]<0) & (returns_sample[self.RICU].iloc[-2]>0), self.INI_WAIT_DAYS, 1)
                                         ))
        )
        adjwaitdays = min(60, self.WDadjvar)

        # self.Debug('{}'.format(self.WDadjvar))

        # Determine whether 'in' or 'out' of the market
        if (extreme_b[self.SIGNALS + self.pairlist]).any():
            self.be_in = False
            self.outday = self.dcount
            for symbol in self.Portfolio.Keys:
                if symbol.Value != "TLT":
                    self.Liquidate(symbol.Value)
            self.AddEquity("TLT")
            self.SetHoldings("TLT", 1)
            #self.Debug("Be OUT TRIGGERED: " + str(self.Time))
        if self.dcount >= self.outday + adjwaitdays:
            #So this logic is best put as: we are about to flip the be_in to TRUE 
            #so right before that do the rebalance otherwise everyday it be_in is true it will try to rebalance
            if not self.be_in:
                self.flip_flag = 1
                self.rebalance()
                self.flip_flag = 0
            self.be_in = True
            #self.Debug("Be IN TRIGGERED: " + str(self.Time))
        self.dcount += 1

        self.Plot("In Out", "in_market", int(self.be_in))
        self.Plot("In Out", "num_out_signals", extreme_b[self.SIGNALS + self.pairlist].sum())
        self.Plot("Wait Days", "waitdays", adjwaitdays)
        
        
 
    def MomentumSelectionFunction(self, momentum):
        #self.Debug(str(self.Time) + "MomentumSelectionFunction: be_in:" + str(self.be_in) + " flip_flag:" + str(self.flip_flag))
        if (self.rebalence_flag or self.first_month_trade_flag) and (self.be_in or self.flip_flag):
            # drop stocks which have no fundamental data or have too low prices
            selected = [x for x in momentum if (x.HasFundamentalData) and (float(x.Price) > 5)]
            # rank the stocks by dollar volume 
            filtered = sorted(selected, key=lambda x: x.DollarVolume, reverse=True)
            return [ x.Symbol for x in filtered[:200]]
        else:
            return self.symbols


    def FundamentalSelectionFunction(self, fundamental):
        #self.Debug(str(self.Time) + "FundamentalSelectionFunction: be_in:" + str(self.be_in) + " flip_flag:" + str(self.flip_flag))
        if (self.rebalence_flag or self.first_month_trade_flag) and (self.be_in or self.flip_flag):
            hist = self.History([i.Symbol for i in fundamental], 1, Resolution.Daily)
            try:
                filtered_fundamental = [x for x in fundamental if (x.ValuationRatios.EVToEBITDA > 0) 
                                                and (x.EarningReports.BasicAverageShares.ThreeMonths > 0) 
                                                and float(x.EarningReports.BasicAverageShares.ThreeMonths * hist.loc[str(x.Symbol)]['close'][0] > 2e9)
                                                and (x.EarningReports.BasicAverageShares.ThreeMonths > 0) 
                                                and float(x.EarningReports.BasicAverageShares.ThreeMonths) * x.Price > 2e9
                                                and x.SecurityReference.IsPrimaryShare
                                                and x.SecurityReference.SecurityType == "ST00000001"
                                                and x.SecurityReference.IsDepositaryReceipt == 0
                                                and x.CompanyReference.IsLimitedPartnership == 0
                                                and x.EarningReports.BasicEPS.OneMonth > x.EarningReports.BasicEPS.SixMonths
                                                #and x.ValuationRatios.ForwardPE > 0
                                                and ((x.FinancialStatements.IncomeStatement.TotalRevenue.SixMonths -x.FinancialStatements.IncomeStatement.TotalRevenue.OneYear)/ (x.FinancialStatements.IncomeStatement.TotalRevenue.SixMonths)) >0.20
                                                and x.OperationRatios.RevenueGrowth.ThreeYears > 0
                                                and x.EarningReports.NormalizedBasicEPS.ThreeMonths > 0.1  
                                                and x.OperationRatios.OperationIncomeGrowth.ThreeYears > 0
                                                and x.AssetClassification.GrowthGrade == "B" or x.AssetClassification.GrowthGrade == "A"
                                                and x.AssetClassification.FinancialHealthGrade == "B" or x.AssetClassification.FinancialHealthGrade == "A"
                                                and x.ValuationRatios.EVToEBITDA < 50 # Good quality ratio
                                                and x.ValuationRatios.FCFYield > 0]
            except:
                filtered_fundamental = [x for x in fundamental if (x.ValuationRatios.EVToEBITDA > 0) 
                                                and (x.EarningReports.BasicAverageShares.ThreeMonths > 0)] 

            top = sorted(filtered_fundamental, key = lambda x: x.ValuationRatios.EVToEBITDA, reverse=True)[:self.num_screener]
            self.symbols = [x.Symbol for x in top]
            self.rebalence_flag = 0
            self.first_month_trade_flag = 0
            self.trade_flag = 1
            return self.symbols
        else:
            return self.symbols
    
    def OnData(self, data):
        pass

    def rebalance(self):
        self.rebalence_flag = 1
        #self.Debug(str(self.Time) + "rebalance: be_in:" + str(self.be_in) + " flip_flag:" + str(self.flip_flag))
        spy_hist = self.History([self.MRKT], 120, Resolution.Daily).loc[str(self.MRKT)]['close']
        if self.Securities[self.MRKT].Price < spy_hist.mean():
            for symbol in self.Portfolio.Keys:
                if symbol.Value != "TLT":
                    self.Liquidate(symbol.Value)
            self.AddEquity("TLT")
            self.SetHoldings("TLT", 1)

        if self.symbols is None: return
        chosen_df = self.calc_return(self.symbols)
        chosen_df = chosen_df.iloc[:self.num_stocks]
        
        self.existing_pos = 0
        add_symbols = []
        for symbol in self.Portfolio.Keys:
            if symbol.Value == 'SPY': continue
            if (symbol.Value not in chosen_df.index):
                self.SetHoldings(symbol, 0)
            elif (symbol.Value in chosen_df.index): 
                self.existing_pos += 1
        #self.Debug(self.symbols)

        weight = 0.99/len(chosen_df)
        for symbol in chosen_df.index:
            self.AddEquity(symbol)
            self.EnterTrade = True
            self.Debug(symbol)
            #self.SetHoldings(symbol, weight)    
                
    def calc_return(self, stocks):
        hist = self.History(stocks, self.formation_days, Resolution.Daily)
        current = self.History(stocks, 1, Resolution.Minute)
        
        self.price = {}
        ret = {}
     
        for symbol in stocks:
            if str(symbol) in hist.index.levels[0] and str(symbol) in current.index.levels[0]:
                self.price[symbol.Value] = list(hist.loc[str(symbol)]['close'])
                self.price[symbol.Value].append(current.loc[str(symbol)]['close'][0])
        for symbol in self.price.keys():
            ret[symbol] = (self.price[symbol][-1] - self.price[symbol][0]) / self.price[symbol][0]
        df_ret = pd.DataFrame.from_dict(ret, orient='index')
        df_ret.columns = ['return']
        sort_return = df_ret.sort_values(by = ['return'], ascending = self.lowmom)
        
        return sort_return

    def buySignals(self):
        if self.trade_flag == 0:
            return
        for symbol, symbolData in self.symbolDataBySymbol.items():
            if self.Portfolio[symbol].Invested and (self.Securities[symbol].Close < symbolData.sma20.Current.Value):
                self.SetHoldings(symbol, .2, False, "Buy Signal")

class SymbolData:
    def __init__(self, symbol, ema7, sma20, ema20, sma50, ema50, sma100, sma200, rsi, macd, macdSlow, DCHindicator, RSIConsolidate, smaConsolidate):
        self.Symbol = symbol
        self.ema7 = ema7
        self.sma20 = sma20
        self.sma50 = sma50
        self.ema50 = ema50
        self.sma100 = sma100
        self.sma200 = sma200
        self.rsi = rsi
        self.macd = macd
        self.macdSlow = macdSlow
        self.DCHindicator = DCHindicator

        self.RSIConsolidate = RSIConsolidate
        self.smaConsolidate = smaConsolidate