Overall Statistics |
Total Trades 68 Average Win 0.26% Average Loss -0.21% Compounding Annual Return 53.163% Drawdown 3.900% Expectancy 0.402 Net Profit 5.274% Sharpe Ratio 1.978 Probabilistic Sharpe Ratio 61.805% Loss Rate 38% Win Rate 62% Profit-Loss Ratio 1.26 Alpha 0.052 Beta 0.531 Annual Standard Deviation 0.177 Annual Variance 0.031 Information Ratio -1.203 Tracking Error 0.176 Treynor Ratio 0.66 Total Fees $334.61 Estimated Strategy Capacity $1000.00 Lowest Capacity Asset AWRE R735QTJ8XC9X |
from AlgorithmImports import * from Selection.FundamentalUniverseSelectionModel import FundamentalUniverseSelectionModel from itertools import groupby from math import ceil class MonthlyDollarVolumeSelection(FundamentalUniverseSelectionModel): '''Defines the QC500 universe as a universe selection model for framework algorithm For details: https://github.com/QuantConnect/Lean/pull/1663''' def __init__(self, filterFineData = True, universeSettings = None, numOfUniverse = 500): '''Initializes a new default instance of the QC500UniverseSelectionModel''' super().__init__(filterFineData, universeSettings) self.numberOfSymbolsCoarse = 1000 self.numberOfSymbolsFine = numOfUniverse self.dollarVolumeBySymbol = {} self.lastMonth = -1 def SelectCoarse(self, algorithm, coarse): '''Performs coarse selection for the QC500 constituents. The stocks must have fundamental data The stock must have positive previous-day close price The stock must have positive volume on the previous trading day''' if algorithm.Time.month == self.lastMonth: return Universe.Unchanged sortedByDollarVolume = sorted( [x for x in coarse if x.HasFundamentalData and x.Volume > 0 and x.Price > 0], key = lambda x: x.DollarVolume, reverse=True )[:self.numberOfSymbolsCoarse] self.dollarVolumeBySymbol = {x.Symbol:x.DollarVolume for x in sortedByDollarVolume} # If no security has met the QC500 criteria, the universe is unchanged. # A new selection will be attempted on the next trading day as self.lastMonth is not updated if len(self.dollarVolumeBySymbol) == 0: return Universe.Unchanged # return the symbol objects our sorted collection return list(self.dollarVolumeBySymbol.keys()) def SelectFine(self, algorithm, fine): '''Performs fine selection for the QC500 constituents The company's headquarter must in the U.S. The stock must be traded on either the NYSE or NASDAQ At least half a year since its initial public offering The stock's market cap must be greater than 500 million''' sortedBySector = sorted( [x for x in fine if x.CompanyReference.CountryId == "USA" and x.CompanyReference.PrimaryExchangeID in ["NYS","NAS"] and (algorithm.Time - x.SecurityReference.IPODate).days > 180 and x.MarketCap > 5e8], key = lambda x: x.CompanyReference.IndustryTemplateCode ) count = len(sortedBySector) # If no security has met the QC500 criteria, the universe is unchanged. # A new selection will be attempted on the next trading day as self.lastMonth is not updated if count == 0: return Universe.Unchanged # Update self.lastMonth after all QC500 criteria checks passed self.lastMonth = algorithm.Time.month percent = self.numberOfSymbolsFine / count sortedByDollarVolume = [] # select stocks with top dollar volume in every single sector for code, g in groupby(sortedBySector, lambda x: x.CompanyReference.IndustryTemplateCode): y = sorted( g, key = lambda x: self.dollarVolumeBySymbol[x.Symbol], reverse = True ) c = ceil(len(y) * percent) sortedByDollarVolume.extend(y[:c]) sortedByDollarVolume = sorted( sortedByDollarVolume, key = lambda x: self.dollarVolumeBySymbol[x.Symbol], reverse=True ) invested_symbol_list = [s.Key for s in algorithm.Portfolio if s.Value.Invested] filtered_universe = [x.Symbol for x in sortedByDollarVolume[:self.numberOfSymbolsFine]] return list(set(invested_symbol_list + filtered_universe)) ############################################################ # Growth Investor FinViz Universe # https://medium.com/the-investors-handbook/the-best-finviz-screens-for-growth-investors-72795f507b91 ############################################################ class GrowthFinVizUniverseSelection(FundamentalUniverseSelectionModel): def __init__(self, filterFineData = True, universeSettings = None,): super().__init__(filterFineData, universeSettings) def SelectCoarse(self, algorithm, coarse): return [x.Symbol for x in coarse if x.HasFundamentalData] def SelectFine(self, algorithm, fine): # Tech industry filteredFine = [x for x in fine if x.AssetClassification.MorningstarSectorCode == MorningstarSectorCode.Technology] sortedByPERatio = sorted(filteredFine, key=lambda f: f.ValuationRatios.PERatio, reverse=True)[:int(0.6*len(filteredFine))] sortedByEPS = sorted(filteredFine, key=lambda f: f.EarningReports.BasicEPS.ThreeMonths, reverse=True)[:int(0.6*len(filteredFine))] sortedByROE = sorted(filteredFine, key=lambda f: f.OperationRatios.ROE.ThreeMonths, reverse=True)[:int(0.6*len(filteredFine))] sortedByNetIncome = sorted(filteredFine, key=lambda f: f.FinancialStatements.IncomeStatement.NetIncome.ThreeMonths, reverse=True)[:int(0.6*len(filteredFine))] s = set() s = set(sortedByPERatio) & set(sortedByEPS) & set(sortedByROE) & set(sortedByNetIncome) # Sort the list by pb ratio and not ADR self._sorted_portfolio_target = [x.Symbol for x in sorted(list(s), key=lambda f: f.ValuationRatios.PBRatio, reverse=True) if not x.SecurityReference.IsDepositaryReceipt] algorithm.Debug(f'List: {len(self._sorted_portfolio_target)}') invested_list = [s.Key for s in algorithm.Portfolio if s.Value.Invested] algorithm.Debug(f'Invested: {len(invested_list)}') self._sorted_portfolio_target += invested_list # self.Debug(f'FineFilter: {[x.Value for x in self._sorted_portfolio_target]}') return self._sorted_portfolio_target
from AlgorithmImports import * class SymbolData(): def __init__(self, algorithm, symbol) -> None: self.symbol = symbol self.algorithm = algorithm self._macd_indicator = self.algorithm.MACD( symbol, 12, 26, 9, MovingAverageType.Simple, Resolution.Daily ) self._macd_rolling_window = RollingWindow[float](2) self._awesome_osc_indicator = algorithm.AO( symbol, 5, 34, MovingAverageType.Simple, Resolution.Daily ) self._awesome_osc_rolling_window = RollingWindow[float](4) self._rsi_indicator = algorithm.RSI( symbol, 14, MovingAverageType.Simple, Resolution.Daily ) self._rsi_rolling_window = RollingWindow[float](3) warmUpData = self.History(symbol, 40, Resolution.Daily) for bar in warmUpData.loc[symbol, :].itertuples(): tradebar = TradeBar(bar.Index, symbol, bar.open, bar.high, bar.low, bar.close, bar.volume) self._macd_indicator.Update(bar.Index, bar.close) self._awesome_osc_indicator.Update(tradebar) self._rsi_indicator.Update(bar.Index, bar.close) def update(self): # Update rolling windows self._macd_rolling_window.Add(self._macd_indicator.Histogram.Current.Value) self._awesome_osc_rolling_window.Add(self._awesome_osc_indicator.Current.Value) self._rsi_rolling_window.Add(self._rsi_indicator.Current.Value) def remove(self): pass
from AlgorithmImports import * import talib as ta import numpy as np from ExtendFundamentalUniverseSelectionModel import GrowthFinVizUniverseSelection class AOSIGNALTYPE(Enum): SIMPLE = 1 # HISTORY = 2 # SAUCER = 3 LAME = 4 NONE = 5 class RSISIGNALTYPE(Enum): SMA = 0 SQUARE = 1 STOCH = 2 STOCH_MINE = 3 NONE = 4 class MacdRsiAlgorithm(QCAlgorithm): ################################################################### # Initialization def Initialize(self): self.SetStartDate(2019, 1 ,10) self.SetEndDate(2019, 2, 24) self._init_cash = 100000 self.SetCash(self._init_cash) self.SetBenchmark("SPY") self.UniverseSettings.Resolution = Resolution.Daily # self.AddUniverse(self.CoarseFilter, self.FineFilter) self.AddUniverseSelection(GrowthFinVizUniverseSelection()) self._changes = None self._capacity = 10 self._sorted_portfolio_target = [] self._macd_indicator = dict() self._macd_rolling_window = dict() self._awesome_osc_indicator = dict() self._awesome_osc_rolling_window = dict() self._rsi_indicator = dict() self._rsi_rolling_window = dict() self._rsi_previous_stoch_rsi = dict() self._ao_buy_strategy = AOSIGNALTYPE.LAME self._ao_sell_strategy = AOSIGNALTYPE.NONE self._rsi_buy_strategy = RSISIGNALTYPE.NONE self._rsi_sell_strategy = RSISIGNALTYPE.STOCH self._my_chart = Chart('AvailCash') self._my_chart.AddSeries(Series("Cash", SeriesType.Line, 0)) self.AddChart(self._my_chart) self._my_chart_2 = Chart('HeldPositions') self._my_chart_2.AddSeries(Series("Positions", SeriesType.Line, 0)) self.AddChart(self._my_chart_2) self._benchmark_symbol = self.AddEquity('SPY', Resolution.Daily).Symbol self._benchmark_queue = [] self.Schedule.On( self.DateRules.Every( DayOfWeek.Monday, DayOfWeek.Tuesday, DayOfWeek.Wednesday, DayOfWeek.Thursday, DayOfWeek.Friday ), self.TimeRules.At(9, 30), # self.TimeRules.At(14, 40), self.MonitorAndExit ) self.Schedule.On( self.DateRules.Every( DayOfWeek.Monday, DayOfWeek.Tuesday, DayOfWeek.Wednesday, DayOfWeek.Thursday, DayOfWeek.Friday ), self.TimeRules.At(9, 35), # self.TimeRules.At(14, 50), self.MonitorAndEnter ) self.Schedule.On( self.DateRules.Every( DayOfWeek.Monday, DayOfWeek.Tuesday, DayOfWeek.Wednesday, DayOfWeek.Thursday, DayOfWeek.Friday ), self.TimeRules.At(8, 30), self.BeforeMarketOpen ) ################################################################### # Enter and exit market def MonitorAndExit(self): self.Debug("MonitorAndExit fired at: {0}".format(self.Time)) invested_list = [x.Key for x in self.Portfolio if x.Value.Invested] for symbol in invested_list: if self.isMACDSignalSell(symbol) \ and (self.isAOSignalSell(symbol) or self.isRSISignalSell(symbol)): self.SetHoldings(symbol, 0) self.Debug(f'-Sell in security {symbol}') # Risk Control # for symbol in invested_list: # if self.Portfolio[symbol].Quantity != 0: # self.Debug(f'Symbol: {symbol.Value} - Unrealized profit: {self.Portfolio[symbol].UnrealizedProfitPercent}') # if self.Portfolio[symbol].UnrealizedProfitPercent < -0.15: # self.SetHoldings(symbol, 0) # self.Debug(f'-Sell in security {symbol}') def MonitorAndEnter(self): self.Debug("MonitorAndEnter fired at: {0}".format(self.Time)) invested_list = [x.Key for x in self.Portfolio if x.Value.Invested] invested = len(invested_list) open_order = 0 for symbol in self.ActiveSecurities.Keys: if invested + open_order >= self._capacity: # self.Debug(f'[Debug]: Reaching capacity') break if symbol in invested_list: # self.Debug(f'[Debug]: {symbol} is already in the portfolio') continue if self.isMACDSignalBuy(symbol) \ and self.isAOSignalBuy(symbol) \ and self.isRSISignalBuy(symbol): self.SetHoldings(symbol, 1/self._capacity) self.Debug(f'+Buy in security {symbol}') open_order += 1 def BeforeMarketOpen(self): pass ################################################################### # Coarse filter and fine filter def CoarseFilter(self, coarse): return [x.Symbol for x in coarse if x.HasFundamentalData] def FineFilter(self, fine): # Tech industry filteredFine = [x for x in fine if x.AssetClassification.MorningstarSectorCode == MorningstarSectorCode.Technology] sortedByPERatio = sorted(filteredFine, key=lambda f: f.ValuationRatios.PERatio, reverse=True)[:int(0.6*len(filteredFine))] sortedByEPS = sorted(filteredFine, key=lambda f: f.EarningReports.BasicEPS.ThreeMonths, reverse=True)[:int(0.6*len(filteredFine))] sortedByROE = sorted(filteredFine, key=lambda f: f.OperationRatios.ROE.ThreeMonths, reverse=True)[:int(0.6*len(filteredFine))] sortedByNetIncome = sorted(filteredFine, key=lambda f: f.FinancialStatements.IncomeStatement.NetIncome.ThreeMonths, reverse=True)[:int(0.6*len(filteredFine))] s = set() s = set(sortedByPERatio) & set(sortedByEPS) & set(sortedByROE) & set(sortedByNetIncome) # Sort the list by pb ratio and not ADR self._sorted_portfolio_target = [x.Symbol for x in sorted(list(s), key=lambda f: f.ValuationRatios.PBRatio, reverse=True) if not x.SecurityReference.IsDepositaryReceipt] self.Debug(f'List: {len(self._sorted_portfolio_target)}') invested_list = [s.Key for s in self.Portfolio if s.Value.Invested] self._sorted_portfolio_target += invested_list self.Debug(f'Invested: {len(invested_list)}') # self.Debug(f'FineFilter: {[x.Value for x in self._sorted_portfolio_target]}') return self._sorted_portfolio_target def OnSecuritiesChanged(self, changes): self._changes = changes def OnData(self, data): if len(data.Bars) <= 0: self.Debug(f'Time is {self.Time}: Data is empty') return else: self.Debug(f'Time to update: {self.Time}') self.Debug(f'Data.keys: {len(data.Keys)}') self.Debug(f'Activate: {len(self.ActiveSecurities.Keys)}') for delisting in data.Delistings: symbol = delisting.Key value = delisting.Value if value.Type == DelistingType.Warning: self.Debug(f"[Warning] OnData(Delistings): {self.Time}: {symbol} will be delisted at end of day today.") # liquidate on delisting warning self.SetHoldings(symbol, 0) if value.Type == DelistingType.Delisted: self.Debug(f"[Error] OnData(Delistings): {self.Time}: {symbol} has been delisted.") # Fails because the security has already been delisted and is no longer tradable self.SetHoldings(symbol, 1) if self._changes is None: # Update rolling windows for symbol in self._macd_rolling_window.keys(): self._macd_rolling_window[symbol].Add(self._macd_indicator[symbol].Histogram.Current.Value) for symbol in self._awesome_osc_rolling_window.keys(): self._awesome_osc_rolling_window[symbol].Add(self._awesome_osc_indicator[symbol].Current.Value) for symbol in self._rsi_rolling_window.keys(): self._rsi_rolling_window[symbol].Add(self._rsi_indicator[symbol].Current.Value) else: # self.Debug(f'AddedSecurities: {len(self._changes.AddedSecurities)}') for x in self._changes.RemovedSecurities: symbol = x.Symbol invested_list = [s.Key for s in self.Portfolio if s.Value.Invested] if symbol in list(self._macd_indicator.keys()) and symbol not in invested_list: del self._macd_indicator[symbol] if symbol in list(self._awesome_osc_indicator.keys()) and symbol not in invested_list: del self._awesome_osc_indicator[symbol] if symbol in list(self._rsi_indicator.keys()) and symbol not in invested_list: del self._rsi_indicator[symbol] if symbol in list(self._macd_rolling_window.keys()) and symbol not in invested_list: del self._macd_rolling_window[symbol] if symbol in list(self._awesome_osc_rolling_window.keys()) and symbol not in invested_list: del self._awesome_osc_rolling_window[symbol] if symbol in list(self._rsi_rolling_window.keys()) and symbol not in invested_list: del self._rsi_rolling_window[symbol] for symbol in self.ActiveSecurities.Keys: if symbol not in self._macd_indicator: self._macd_indicator[symbol] = self.MACD( symbol, 12, 26, 9, MovingAverageType.Simple, Resolution.Daily ) self._awesome_osc_indicator[symbol] = self.AO( symbol, 34, 5, MovingAverageType.Simple, Resolution.Daily ) self._rsi_indicator[symbol] = self.RSI( symbol, 14, # MovingAverageType.Simple, MovingAverageType.Exponential, Resolution.Daily ) self._macd_rolling_window[symbol] = RollingWindow[float](2) self._awesome_osc_rolling_window[symbol] = RollingWindow[float](4) self._rsi_rolling_window[symbol] = RollingWindow[float](4) warmUpData = self.History(symbol, 50, Resolution.Daily) for bar in warmUpData.loc[symbol, :].itertuples(): tradebar = TradeBar(bar.Index, symbol, bar.open, bar.high, bar.low, bar.close, bar.volume) self._macd_indicator[symbol].Update(bar.Index, bar.close) self._awesome_osc_indicator[symbol].Update(tradebar) self._rsi_indicator[symbol].Update(bar.Index, bar.close) # Update rolling windows if self._macd_indicator[symbol].IsReady: self._macd_rolling_window[symbol].Add(self._macd_indicator[symbol].Histogram.Current.Value) if self._awesome_osc_indicator[symbol].IsReady: self._awesome_osc_rolling_window[symbol].Add(self._awesome_osc_indicator[symbol].Current.Value) if self._rsi_indicator[symbol].IsReady: self._rsi_rolling_window[symbol].Add(self._rsi_indicator[symbol].Current.Value) else: # Update rolling windows if self._macd_indicator[symbol].IsReady: self._macd_rolling_window[symbol].Add(self._macd_indicator[symbol].Histogram.Current.Value) if self._awesome_osc_indicator[symbol].IsReady: self._awesome_osc_rolling_window[symbol].Add(self._awesome_osc_indicator[symbol].Current.Value) if self._rsi_indicator[symbol].IsReady: self._rsi_rolling_window[symbol].Add(self._rsi_indicator[symbol].Current.Value) # for sss in self.ActiveSecurities.Keys: # if sss.Value == 'ADSK': # self.Debug(f'[{sss.Value}] AO: {self._awesome_osc_rolling_window[sss][0]}') # self.Debug(f'[{sss.Value}] AO: {self._awesome_osc_rolling_window[sss][1]}') # self.Debug(f'[{sss.Value}] AO: {self._awesome_osc_rolling_window[sss][2]}') # self.Debug(f'[{sss.Value}] RSI: {self._rsi_rolling_window[sss][0]}') # historic_rsis = list(self._rsi_rolling_window[sss]) # stoch_rsi = None # if (max(historic_rsis[1:]) - min(historic_rsis[1:])) == 0: # stoch_rsi = 100 # else: # stoch_rsi = (historic_rsis[0] - min(historic_rsis[1:])) / (max(historic_rsis[1:]) - min(historic_rsis[1:])) * 100 # self.Debug(f'[{sss.Value}] STOCH: {stoch_rsi}') self._changes = None ################################################################### # MACD signal and Awesome Oscillator signal def isMACDSignalBuy(self, symbol): if not self._macd_indicator[symbol].IsReady: self.Debug(f'[Warning] MACD {symbol} warm up not ready') return False if self._macd_indicator[symbol].Histogram.Current.Value > 0: return True return False def isMACDSignalSell(self, symbol): if not self._macd_indicator[symbol].IsReady: self.Debug(f'[Warning] MACD {symbol} warm up not ready') return False if self._macd_indicator[symbol].Signal.Current.Value < 0: return True return False def isAOSignalBuy(self, symbol): if self._ao_buy_strategy == AOSIGNALTYPE.SIMPLE: if not self._awesome_osc_indicator[symbol].IsReady: self.Debug(f'[Warning] AO {symbol} warm up not ready') return False if self._awesome_osc_indicator[symbol].Current.Value > 0: return True elif self._ao_buy_strategy == AOSIGNALTYPE.LAME: if not self._awesome_osc_rolling_window[symbol].IsReady: return False if -self._awesome_osc_rolling_window[symbol][1] < -self._awesome_osc_rolling_window[symbol][0] \ and -self._awesome_osc_rolling_window[symbol][2] > -self._awesome_osc_rolling_window[symbol][1]: return True elif self._ao_buy_strategy == AOSIGNALTYPE.NONE: return True return False def isAOSignalSell(self, symbol): if self._ao_sell_strategy == AOSIGNALTYPE.SIMPLE: if not self._awesome_osc_indicator[symbol].IsReady: self.Debug(f'[Warning] AO {symbol} warm up not ready') return False if self._awesome_osc_indicator[symbol].Current.Value < 0: return True elif self._ao_sell_strategy == AOSIGNALTYPE.LAME: if not self._awesome_osc_rolling_window[symbol].IsReady: return False if -self._awesome_osc_rolling_window[symbol][1] > -self._awesome_osc_rolling_window[symbol][0] \ and -self._awesome_osc_rolling_window[symbol][2] < -self._awesome_osc_rolling_window[symbol][1]: return True elif self._ao_sell_strategy == AOSIGNALTYPE.NONE: return True return False def isRSISignalBuy(self, symbol): rsi_threshold = 30 if not self._rsi_indicator[symbol].IsReady: self.Debug(f'[Warning] RSI {symbol} indicator warm up not ready') return False if not self._rsi_rolling_window[symbol].IsReady: self.Debug(f'[Warning] RSI {symbol} rolling window warm up not ready') return False if self._rsi_buy_strategy == RSISIGNALTYPE.SMA: # RSI SMA1 if self._rsi_indicator[symbol].Current.Value <= rsi_threshold: return True ################################################ # RSI SMA2 # rsi_threshold = 30 # if self._rsi_indicator[symbol].Current.Value <= rsi_threshold \ # and self._rsi_indicator[symbol].Current.Value >= (\ # (self._rsi_rolling_window[symbol][0] + \ # self._rsi_rolling_window[symbol][1] + \ # self._rsi_rolling_window[symbol][2])/3): # 33.33 # return True ################################################ # RSI Stack # if self._rsi_rolling_window[symbol][0] <= 33.33\ # and self._rsi_rolling_window[symbol][1] <= 33.33: #\ # # and self._rsi_rolling_window[symbol][2] <= 33.33: # return True ################################################ # RSI square # rsi_square = ta.RSI(np.array(list(self._rsi_rolling_window[symbol]), dtype='double'), timeperiod=3)[-1] # if (rsi_square <= 30) and (self._rsi_indicator[symbol].Current.Value <= 30): # return True elif self._rsi_buy_strategy == RSISIGNALTYPE.SQUARE: # RSI square 2 rsi_square_2 = ta.RSI(np.array(list(self._rsi_rolling_window[symbol]), dtype='double'), timeperiod=3)[-1] if rsi_square_2 <= rsi_threshold: # 10, 20, 30 return True elif self._rsi_buy_strategy == RSISIGNALTYPE.STOCH: # Stochastic RSI historic_rsis = list(self._rsi_rolling_window[symbol]) stoch_rsi = None if (max(historic_rsis[1:]) - min(historic_rsis[1:])) == 0: stoch_rsi = 100 else: stoch_rsi = (historic_rsis[0] - min(historic_rsis[1:])) / (max(historic_rsis[1:]) - min(historic_rsis[1:])) * 100 if stoch_rsi <= rsi_threshold: return True elif self._rsi_buy_strategy == RSISIGNALTYPE.STOCH_MINE: # Mine RSI historic_rsis = list(self._rsi_rolling_window[symbol]) stoch_rsi = None rsi_threshold = 30 # 5, 10, 20, 30 if (max(historic_rsis[1:]) - min(historic_rsis[1:])) == 0: stoch_rsi = 100 else: stoch_rsi = (historic_rsis[0] - min(historic_rsis[1:])) / (max(historic_rsis[1:]) - min(historic_rsis[1:])) * 100 if symbol in self._rsi_previous_stoch_rsi: if self._rsi_previous_stoch_rsi[symbol] < rsi_threshold and stoch_rsi > rsi_threshold: self._rsi_previous_stoch_rsi[symbol] = stoch_rsi return True self._rsi_previous_stoch_rsi[symbol] = stoch_rsi elif self._rsi_buy_strategy == RSISIGNALTYPE.NONE: return True return False def isRSISignalSell(self, symbol): rsi_threshold = 80 if not self._rsi_indicator[symbol].IsReady: self.Debug(f'[Warning] RSI {symbol} indicator warm up not ready') return False if not self._rsi_rolling_window[symbol].IsReady: self.Debug(f'[Warning] RSI {symbol} rolling window warm up not ready') return False if self._rsi_sell_strategy == RSISIGNALTYPE.SMA: # RSI SMA1 if self._rsi_indicator[symbol].Current.Value <= rsi_threshold: return True elif self._rsi_sell_strategy == RSISIGNALTYPE.SQUARE: # RSI square 2 rsi_square_2 = ta.RSI(np.array(list(self._rsi_rolling_window[symbol]), dtype='double'), timeperiod=3)[-1] if rsi_square_2 <= rsi_threshold: return True elif self._rsi_sell_strategy == RSISIGNALTYPE.STOCH: # Stochastic RSI historic_rsis = list(self._rsi_rolling_window[symbol]) stoch_rsi = None # if (max(historic_rsis[1:]) - min(historic_rsis[1:])) == 0: # stoch_rsi = 100 # else: # stoch_rsi = (historic_rsis[0] - min(historic_rsis[1:])) / (max(historic_rsis[1:]) - min(historic_rsis[1:])) * 100 if (max(historic_rsis) - min(historic_rsis)) == 0: stoch_rsi = 100 else: stoch_rsi = (historic_rsis[0] - min(historic_rsis)) / (max(historic_rsis) - min(historic_rsis)) * 100 # if stoch_rsi <= rsi_threshold: # if stoch_rsi >= rsi_threshold: if stoch_rsi >= rsi_threshold or stoch_rsi == 0: return True elif self._rsi_sell_strategy == RSISIGNALTYPE.STOCH_MINE: # Mine RSI historic_rsis = list(self._rsi_rolling_window[symbol]) stoch_rsi = None if (max(historic_rsis[1:]) - min(historic_rsis[1:])) == 0: stoch_rsi = 100 else: stoch_rsi = (historic_rsis[0] - min(historic_rsis[1:])) / (max(historic_rsis[1:]) - min(historic_rsis[1:])) * 100 if symbol in self._rsi_previous_stoch_rsi: if self._rsi_previous_stoch_rsi[symbol] >= rsi_threshold and stoch_rsi < rsi_threshold: self._rsi_previous_stoch_rsi[symbol] = stoch_rsi return True self._rsi_previous_stoch_rsi[symbol] = stoch_rsi elif self._rsi_sell_strategy == RSISIGNALTYPE.NONE: return True return False ################################################################### # Log the end of day prices and plot the diagram def OnEndOfDay(self, symbol): self.Plot('AvailCash', 'Cash', self.Portfolio.Cash) self.Plot('AvailCash', 'Portfolio', self.Portfolio.TotalPortfolioValue) self.Plot('HeldPositions', "Long Pos", len([x.Key for x in self.Portfolio if x.Value.Invested and x.Value.IsLong])) self.Plot('HeldPositions', "Short Pos", len([x.Key for x in self.Portfolio if x.Value.Invested and x.Value.IsShort])) self.Plot('Portfolio Value', "Long Profit", sum([x.Value.UnrealizedProfit for x in self.Portfolio if x.Value.Invested and x.Value.IsLong])) self.Plot('Portfolio Value', "Short Profit", sum([x.Value.UnrealizedProfit for x in self.Portfolio if x.Value.Invested and x.Value.IsShort])) self.Plot('Portfolio Value', "Total Profit", sum([x.Value.UnrealizedProfit for x in self.Portfolio if x.Value.Invested])) # Plot the benchmark return hist = self.History(self._benchmark_symbol, 2, Resolution.Daily)['close'].unstack(level= 0).dropna() self._benchmark_queue.append(hist[self._benchmark_symbol].iloc[-1]) spy_perf = self._benchmark_queue[-1] / self._benchmark_queue[0] * self._init_cash self.Plot('Strategy Equity', 'SPY', spy_perf)