Overall Statistics |
Total Orders 548 Average Win 38.15% Average Loss -10.03% Compounding Annual Return 31.282% Drawdown 67.100% Expectancy 1.264 Start Equity 10000 End Equity 1032095.91 Net Profit 10220.959% Sharpe Ratio 0.78 Sortino Ratio 0.831 Probabilistic Sharpe Ratio 9.279% Loss Rate 53% Win Rate 47% Profit-Loss Ratio 3.80 Alpha 0.166 Beta 1.306 Annual Standard Deviation 0.33 Annual Variance 0.109 Information Ratio 0.733 Tracking Error 0.256 Treynor Ratio 0.197 Total Fees $261.08 Estimated Strategy Capacity $4000000.00 Lowest Capacity Asset CUM R735QTJ8XC9X Portfolio Turnover 0.44% |
# region imports from AlgorithmImports import * # endregion """ SEL(stock selection part) Based on the 'Momentum Strategy with Market Cap and EV/EBITDA' strategy introduced by Jing Wu, 6 Feb 2018 adapted and recoded by Jack Simonson, Goldie Yalamanchi, Vladimir, Peter Guenther, Leandro Maia, Simone Pantaleoni, Mirko Vari(Strongs) """ from QuantConnect.Data.UniverseSelection import * import math import numpy as np import pandas as pd import scipy as sp class EarningsFactorWithMomentum_InOut(QCAlgorithm): def Initialize(self): self.SetStartDate(2008, 1, 1) #Set Start Date #self.SetEndDate(2021, 5, 1) #Set Start Date self.cap = 10000 self.SetCash(self.cap) self.averages = { } res = Resolution.Hour # Holdings ### 'Out' holdings and weights self.BND1 = self.AddEquity('TLT', res).Symbol #TLT; TMF for 3xlev self.BND2 = self.AddEquity('UUP', res).Symbol #TLT; TMF for 3xlev self.quantity = {self.BND1: 0, self.BND2: 0} #self.safe = ()#self.BND2 if current_inflation > livinfla else self.BND1 ##### In & Out parameters ##### # Feed-in constants self.INI_WAIT_DAYS = 4 # out for 3 trading weeks self.wait_days = self.INI_WAIT_DAYS # Market and list of signals based on ETFs self.MRKT = self.AddEquity('SPY', res).Symbol # market self.GOLD = self.AddEquity('GLD', res).Symbol # gold self.SLVA = self.AddEquity('SLV', res).Symbol # vs silver self.UTIL = self.AddEquity('XLU', res).Symbol # utilities self.INDU = self.AddEquity('XLI', res).Symbol # vs industrials self.METL = self.AddEquity('DBB', res).Symbol # input prices (metals) self.USDX = self.AddEquity('UUP', res).Symbol # safe haven (USD) self.FORPAIRS = [self.GOLD, self.SLVA, self.UTIL, self.INDU, self.METL, self.USDX] # Specific variables self.DISTILLED_BEAR = 1 #999 self.BE_IN = 1 #999 self.BE_IN_PRIOR = 0 self.VOLA_LOOKBACK = 126 self.WAITD_CONSTANT = 85 self.DCOUNT = 0 # count of total days since start self.OUTDAY = (-self.INI_WAIT_DAYS+1) # dcount when self.be_in=0, initial setting ensures trading right away # set a warm-up period to initialize the indicator self.SetWarmUp(timedelta(350)) ##### Momentum & fundamentals strategy parameters ##### self.UniverseSettings.Resolution = res self.AddUniverse(self.UniverseCoarseFilter, self.UniverseFundamentalsFilter) self.num_coarse = 100#200 self.num_screener = 20#100 # changed from 15 self.num_stocks = 5 # lowered from 10 self.formation_days = 126 self.lowmom = False self.data = {} self.setrebalancefreq = 60 # X days, update universe and momentum calculation self.updatefinefilter = 0 self.symbols = None self.reb_count = 0 self.current_inflation = 0 self.fredcode = "MEDCPIM158SFRBCLE" self.cpi = self.add_data(Fred, self.fredcode, Resolution.DAILY).Symbol livinfla = 6 self.Schedule.On( self.DateRules.EveryDay(), self.TimeRules.AfterMarketOpen('SPY', 60), # reduced time self.rebalance_when_out_of_the_market) self.Schedule.On( self.DateRules.EveryDay(), self.TimeRules.BeforeMarketClose('SPY', 0), self.record_vars) # Benchmark = record SPY self.spy = [] # Setup daily consolidation symbols = [self.MRKT] + self.FORPAIRS for symbol in symbols: self.consolidator = TradeBarConsolidator(timedelta(days=1)) self.consolidator.DataConsolidated += self.consolidation_handler self.SubscriptionManager.AddConsolidator(symbol, self.consolidator) # Warm up history self.lookback = 126 self.history = self.History(symbols, self.lookback, Resolution.Daily) if self.history.empty or 'close' not in self.history.columns: return self.history = self.history['close'].unstack(level=0).dropna() '''def UniverseCoarseFilter(self, coarse): if not (((self.DCOUNT-self.reb_count)==self.setrebalancefreq) or (self.DCOUNT == self.OUTDAY + self.wait_days - 1)): self.updatefinefilter = 0 return Universe.Unchanged self.updatefinefilter = 1 # drop stocks which have no fundamental data or have too low prices selected = [x for x in coarse if (x.HasFundamentalData) and (float(x.Price) > 5)] # rank the stocks by dollar volume filtered = sorted(selected, key=lambda x: x.DollarVolume, reverse=True) return [x.Symbol for x in filtered[:self.num_coarse]]''' def UniverseCoarseFilter(self, coarse): # Update at the beginning (by setting self.OUTDAY = -self.INI_WAIT_DAYS), every X days (rebalance frequency), and one day before waitdays are up if not (((self.DCOUNT-self.reb_count)==self.setrebalancefreq) or (self.DCOUNT == self.OUTDAY + self.wait_days - 1)): self.updatefinefilter = 0 return Universe.Unchanged self.updatefinefilter = 1 # drop stocks which have no fundamental data or have too low prices selected = [x for x in coarse if (x.HasFundamentalData) and (float(x.Price) > 5)] # We are going to use a dictionary to refer the object that will keep the moving averages for cf in selected: symbol = cf.Symbol if cf.Symbol not in self.averages: self.averages[cf.Symbol] = SymbolDataVolume(cf.Symbol, 21, 5, 504) # Updates the SymbolData object with current EOD price avg = self.averages[cf.Symbol] avg.update(cf.EndTime, cf.AdjustedPrice, cf.DollarVolume) # Filter the values of the dict: we only want up-trending securities #values = list(filter(lambda sd: sd.smaw.Current.Value > 0, self.averages.values())) values = list(filter(lambda sd: sd.smaw.Current.Value > 0, self.averages.values())) values.sort(key=lambda x: (x.smaw.Current.Value), reverse=True) #values.sort(key=lambda x: (x.volume_ratiol), reverse=True) # we need to return only the symbol objects return [ x.symbol for x in values[:self.num_coarse] ] def UniverseFundamentalsFilter(self, fundamental): if self.updatefinefilter == 0: return Universe.Unchanged filtered_fundamental = [x for x in fundamental if x.FinancialStatements.IncomeStatement.NetIncome.TwelveMonths and x.FinancialStatements.CashFlowStatement.CashFlowFromContinuingOperatingActivities.TwelveMonths and x.OperationRatios.ROA.ThreeMonths and x.OperationRatios.ROA.OneYear and x.FinancialStatements.BalanceSheet.ShareIssued.ThreeMonths and x.FinancialStatements.BalanceSheet.ShareIssued.TwelveMonths and x.OperationRatios.GrossMargin.ThreeMonths and x.OperationRatios.GrossMargin.OneYear and x.OperationRatios.LongTermDebtEquityRatio.ThreeMonths and x.OperationRatios.LongTermDebtEquityRatio.OneYear and x.OperationRatios.CurrentRatio.ThreeMonths and x.OperationRatios.CurrentRatio.OneYear and x.OperationRatios.AssetsTurnover.ThreeMonths and x.OperationRatios.AssetsTurnover.OneYear and x.ValuationRatios.NormalizedPERatio] sortedByfactor1 = [x for x in fundamental if FScore(x.FinancialStatements.IncomeStatement.NetIncome.TwelveMonths, x.FinancialStatements.CashFlowStatement.CashFlowFromContinuingOperatingActivities.TwelveMonths, x.OperationRatios.ROA.ThreeMonths, x.OperationRatios.ROA.OneYear, x.FinancialStatements.BalanceSheet.ShareIssued.ThreeMonths, x.FinancialStatements.BalanceSheet.ShareIssued.TwelveMonths, x.OperationRatios.GrossMargin.ThreeMonths, x.OperationRatios.GrossMargin.OneYear, x.OperationRatios.LongTermDebtEquityRatio.ThreeMonths, x.OperationRatios.LongTermDebtEquityRatio.OneYear, x.OperationRatios.CurrentRatio.ThreeMonths, x.OperationRatios.CurrentRatio.OneYear, x.OperationRatios.AssetsTurnover.ThreeMonths, x.OperationRatios.AssetsTurnover.OneYear).ObjectiveScore() > 6] top = sorted(sortedByfactor1, key = lambda x: x.ValuationRatios.NormalizedPERatio, reverse=False)[:self.num_screener] self.symbols = [x.Symbol for x in top] self.updatefinefilter = 0 self.reb_count = self.DCOUNT return self.symbols def OnSecuritiesChanged(self, changes): added_symbols = [] # Usa il nome corretto del metodo (AddeddSecurities) for security in changes.AddedSecurities: added_symbols.append(security.Symbol) # Verifica se il simbolo non è già presente nel dizionario if security.Symbol not in self.data: self.data[security.Symbol] = SymbolData(security.Symbol, self.formation_days, self) # Verifica se ci sono nuovi simboli aggiunti if len(added_symbols) > 0: # Usa la funzione History correttamente history = self.History(added_symbols, 1 + self.formation_days, Resolution.DAILY) if not history.empty: for symbol in added_symbols: try: # Verifica se il simbolo è presente nel DataFrame history symbol_history = history.loc[symbol] if symbol in history.index else None if symbol_history is not None: self.data[symbol].warmup(symbol_history) except KeyError as e: # Aggiungi un messaggio di debug in caso di KeyError self.Debug(f"KeyError for symbol {symbol}: {str(e)}") continue def consolidation_handler(self, sender, consolidated): self.history.loc[consolidated.EndTime, consolidated.Symbol] = consolidated.Close self.history = self.history.iloc[-self.lookback:] def derive_vola_waitdays(self): volatility = 0.6 * np.log1p(self.history[[self.MRKT]].pct_change()).std() * np.sqrt(252) wait_days = int(volatility * self.WAITD_CONSTANT) returns_lookback = int((1.0 - volatility) * self.WAITD_CONSTANT) return wait_days, returns_lookback def rebalance_when_out_of_the_market(self): self.wait_days, returns_lookback = self.derive_vola_waitdays() livinfla = 6 current_inflation = self.securities[self.cpi].price self.safe = self.BND2 if current_inflation > livinfla else self.BND1 ## Check for Bears returns = self.history.pct_change(returns_lookback).iloc[-1] silver_returns = returns[self.SLVA] gold_returns = returns[self.GOLD] industrials_returns = returns[self.INDU] utilities_returns = returns[self.UTIL] metals_returns = returns[self.METL] dollar_returns = returns[self.USDX] self.DISTILLED_BEAR = ((((gold_returns > silver_returns) and (utilities_returns > industrials_returns)) and (metals_returns < dollar_returns)) and ( (industrials_returns < utilities_returns) and (silver_returns < gold_returns) if current_inflation > livinfla else ((industrials_returns < utilities_returns) and (silver_returns < gold_returns) and (metals_returns < dollar_returns)) ) ) #self.DISTILLED_BEAR = (((gold_returns > silver_returns) and # (utilities_returns > industrials_returns)) and # (metals_returns < dollar_returns) # ) # Determine whether 'in' or 'out' of the market if self.DISTILLED_BEAR: self.BE_IN = False self.OUTDAY = self.DCOUNT # Controlla se la quantità di BND1 è zero if self.quantity.get(self.safe, 0) == 0: # Annulla gli ordini sugli altri simboli for symbol in self.quantity.copy().keys(): if symbol == self.safe: continue # Salta l'asset sicuro self.Order(symbol, 0) # Vendi gli altri asset self.Debug([str(self.Time), str(symbol), str(-self.quantity[symbol])]) del self.quantity[symbol] # Rimuovi dalla lista degli asset # Calcola la quantità dell'asset sicuro da acquistare quantity = self.Portfolio.TotalPortfolioValue / self.Securities[self.safe].Close self.quantity[self.safe] = math.floor(quantity) # Esegui l'ordine per l'asset sicuro self.Order(self.safe, self.quantity[self.safe]) self.Debug([str(self.Time), str(self.safe), str(self.quantity[self.safe])]) #if self.quantity[self.BND1] == 0: # for symbol in self.quantity.copy().keys(): # if symbol == self.BND1: continue # self.Order(symbol, - self.quantity[symbol]) # self.Debug([str(self.Time), str(symbol), str(-self.quantity[symbol])]) # del self.quantity[symbol] # quantity = self.Portfolio.TotalPortfolioValue / self.Securities[self.BND1].Close #self.quantity[self.BND1] = math.floor(quantity) #self.Order(self.BND1, self.quantity[self.BND1]) #self.Debug([str(self.Time), str(self.BND1), str(self.quantity[self.BND1])]) if (self.DCOUNT >= self.OUTDAY + self.wait_days): self.BE_IN = True # Update stock ranking/holdings, when swithing from 'out' to 'in' plus every X days when 'in' (set rebalance frequency) if (self.BE_IN and not self.BE_IN_PRIOR) or (self.BE_IN and (self.DCOUNT==self.reb_count)): self.rebalance() self.BE_IN_PRIOR = self.BE_IN self.DCOUNT += 1 def rebalance(self): if self.symbols is None: return chosen_df = self.calc_return(self.symbols) chosen_df = chosen_df.iloc[:self.num_stocks] if self.BND1 in self.quantity and self.quantity[self.BND1] > 0: self.Order(self.BND1, -self.quantity[self.BND1]) self.quantity[self.BND1] = 0 weight = 1 / self.num_stocks for symbol in self.quantity.copy().keys(): if symbol == self.BND1: continue if not self.current_slice.contains_key(symbol) or self.current_slice[symbol] is None: continue if symbol not in chosen_df.index: self.Order(symbol, -self.quantity[symbol]) del self.quantity[symbol] else: quantity = self.portfolio.total_portfolio_value * weight / self.securities[symbol].close if math.floor(quantity) != self.quantity[symbol]: self.Order(symbol, math.floor(quantity) - self.quantity[symbol]) self.quantity[symbol] = math.floor(quantity) for symbol in chosen_df.index: if not self.current_slice.contains_key(symbol) or self.current_slice[symbol] is None: continue if symbol not in self.quantity.keys(): quantity = self.portfolio.total_portfolio_value * weight / self.securities[symbol].close self.quantity[symbol] = math.floor(quantity) self.Order(symbol, self.quantity[symbol]) def calc_return(self, stocks): #ready = [self.data[symbol] for symbol in stocks if self.data[symbol].Roc.IsReady] #sorted_by_roc = sorted(ready, key=lambda x: x.Roc.Current.Value, reverse = not self.lowmom) #return [symbol_data.Symbol for symbol_data in sorted_by_roc[:self.num_stocks] ] ret = {} for symbol in stocks: try: ret[symbol] = self.data[symbol].Roc.Current.Value except: self.Debug(str(symbol)) continue df_ret = pd.DataFrame.from_dict(ret, orient='index') df_ret.columns = ['return'] sort_return = df_ret.sort_values(by = ['return'], ascending = self.lowmom) return sort_return def record_vars(self): self.spy.append(self.history[self.MRKT].iloc[-1]) spy_perf = self.spy[-1] / self.spy[0] * self.cap self.Plot('Strategy Equity', 'SPY', spy_perf) account_leverage = self.Portfolio.TotalHoldingsValue / self.Portfolio.TotalPortfolioValue self.Plot('Holdings', 'leverage', round(account_leverage, 0)) class SymbolData(object): def __init__(self, symbol, roc, algorithm): self.Symbol = symbol self.Roc = RateOfChange(roc) self.algorithm = algorithm self.consolidator = algorithm.ResolveConsolidator(symbol, Resolution.Daily) algorithm.RegisterIndicator(symbol, self.Roc, self.consolidator) def warmup(self, history): for index, row in history.iterrows(): self.Roc.Update(index, row['close']) class SymbolDataVolume(object): def __init__(self, symbol, period, periodw, periodlt): self.symbol = symbol #self.tolerance = 1.01 self.tolerance = 0.95 self.fast = ExponentialMovingAverage(10) self.slow = ExponentialMovingAverage(50) self.is_uptrend = False self.scale = 0 self.volume = 0 self.volume_ratio = 0 self.volume_ratiow = 0 self.volume_ratiol = 0 self.sma = SimpleMovingAverage(period) self.smaw = SimpleMovingAverage(periodw) self.smalt = SimpleMovingAverage(periodlt) def update(self, time, value, volume): self.volume = volume if self.smaw.Update(time, volume): # get ratio of this volume bar vs previous 10 before it. if self.smaw.Current.Value != 0: self.volume_ratiow = volume / self.smaw.Current.Value if self.sma.Update(time, volume): # get ratio of this volume bar vs previous 10 before it. if self.sma.Current.Value != 0: self.volume_ratio = self.smaw.Current.Value / self.sma.Current.Value if self.smalt.Update(time, volume): if self.smalt.Current.Value != 0 and self.smaw.Current.Value != 0: self.volume_ratiol = self.smaw.Current.Value / self.smalt.Current.Value if self.fast.Update(time, value) and self.slow.Update(time, value): fast = self.fast.Current.Value slow = self.slow.Current.Value #self.is_uptrend = fast > slow * self.tolerance self.is_uptrend = (fast > (slow * self.tolerance)) and (value > (fast * self.tolerance)) if self.is_uptrend: self.scale = (fast - slow) / ((fast + slow) / 2.0) class FScore(object): def __init__(self, netincome, operating_cashflow, roa_current, roa_past, issued_current, issued_past, grossm_current, grossm_past, longterm_current, longterm_past, curratio_current, curratio_past, assetturn_current, assetturn_past): self.netincome = netincome self.operating_cashflow = operating_cashflow self.roa_current = roa_current self.roa_past = roa_past self.issued_current = issued_current self.issued_past = issued_past self.grossm_current = grossm_current self.grossm_past = grossm_past self.longterm_current = longterm_current self.longterm_past = longterm_past self.curratio_current = curratio_current self.curratio_past = curratio_past self.assetturn_current = assetturn_current self.assetturn_past = assetturn_past def ObjectiveScore(self): fscore = 0 fscore += np.where(self.netincome > 0, 1, 0) fscore += np.where(self.operating_cashflow > 0, 1, 0) fscore += np.where(self.roa_current > self.roa_past, 1, 0) fscore += np.where(self.operating_cashflow > self.roa_current, 1, 0) fscore += np.where(self.longterm_current <= self.longterm_past, 1, 0) fscore += np.where(self.curratio_current >= self.curratio_past, 1, 0) fscore += np.where(self.issued_current <= self.issued_past, 1, 0) fscore += np.where(self.grossm_current >= self.grossm_past, 1, 0) fscore += np.where(self.assetturn_current >= self.assetturn_past, 1, 0) return fscore