Overall Statistics |
Total Trades 1143 Average Win 0.64% Average Loss -0.71% Compounding Annual Return -69.106% Drawdown 85.000% Expectancy -0.460 Net Profit -84.876% Sharpe Ratio -2.315 Probabilistic Sharpe Ratio 0% Loss Rate 72% Win Rate 28% Profit-Loss Ratio 0.90 Alpha -0.672 Beta 0.284 Annual Standard Deviation 0.262 Annual Variance 0.069 Information Ratio -2.701 Tracking Error 0.31 Treynor Ratio -2.138 Total Fees $1431.68 Estimated Strategy Capacity $1600000.00 Lowest Capacity Asset DNJR WSZRSZV9QR1H |
from time import time from pandas import DataFrame from datetime import timedelta import datetime as datetime import statistics as statistics class PensiveTanSalamander(QCAlgorithm): def Initialize(self): #Start date self.SetStartDate(2020, 1, 1) # Set Strategy Cash self.SetCash(100000) #Universe selection self.AddUniverse(self.Coarse, self.Fine) #Set data normalization mode to raw self.UniverseSettings.DataNormalizationMode = DataNormalizationMode.Raw #Resolution.Minute for all data self.UniverseSettings.Resolution = Resolution.Minute #Adding spy for scheduled events self.spy = self.AddEquity("SPY", Resolution.Minute) #Extended hours enabled self.UniverseSettings.ExtendedMarketHours = True #Turn fillforward to false self.UniverseSettings.FillForward = False #Five minute count self.five_minute_count = False #Long limit self.long_limit = False #Short limit self.short_limit = False #Self count for consolidating 5-minute data self.count = 1 #Multiplier self.multiplier = 3 #Short trailing prices self.trailing_prices_short = {} #Value self.Value = {} #Dictionary to keep track of volatility self.volatility = {} #HeikinAshi Close Window self.heikinashi_close_window = {} #HeikinAshi High Window self.heikinashi_high_window = {} #HeikinAshi Low Window self.heikinashi_low_window = {} #HeikinAshi Open Window self.heikinashi_open_window = {} #ATR Window self.stocks_atrWindow = {} #ATR Down Window self.stocks_atrDown = {} #ATR Up Window self.stocks_atrUp = {} #Accumulator to store five-minute volume data self.ticker_volume_accumulator = {} #Accumulator to store five minute highs self.five_minute_high_accumulator = {} #Accumulator to store minute low of stocks self.five_minute_low_accumulator = {} #Accumulator to store minute close of stocks self.five_minute_close_accumulator = {} #Dictionary to store trailing prices self.trailing_prices_long = {} #Storing five minute opening prices self.five_minute_open = {} #Storing five minute high prices self.five_minute_high = {} #Storing five minute low prices self.five_minute_low = {} #Storing five minute closing prices self.five_minute_close = {} #Storing five minute volume self.five_minute_volume = {} #Storing long buy tickets self.long_buy_tickets = {} #Storing long sell tickets self.long_sell_tickets = {} #Storing short sell tickets self.short_sell_tickets = {} #Storing short cover tickets self.short_cover_tickets = {} #tickers list to store all tickers in universe self.tickers = [] #List to store filtered stocks based on earnings reports self.filtered = [] #List to store securities that have collected sufficient data self.ticker_tradable = [] #List to store selling stocks self.selling_stocks = [] #List to store top ten stocks self.top_ten_sorted = [] #List to store stocks that pass criteria self.underscore = [] #Clear lists and dictionaries that need to be cleared self.Schedule.On(self.DateRules.EveryDay(), self.TimeRules.At(0, 0), self.Reset) #Start collecting data at 4.00 am in the morning self.Schedule.On(self.DateRules.EveryDay("SPY"), self.TimeRules.AfterMarketOpen("SPY", -330), self.is_market_hours) #Last collection of data happens 1 minute after 8.00 pm, hence -241 before market close self.Schedule.On(self.DateRules.EveryDay("SPY"), self.TimeRules.BeforeMarketClose("SPY", -241), self.not_market_hours) def Coarse(self, coarse): filtered = [x for x in coarse if x.HasFundamentalData and x.Price > 10 and x.DollarVolume > 20000000] #and x.Price < 10 sortedStocks = sorted(filtered, key = lambda x: x.DollarVolume, reverse = True) return [x.Symbol for x in sortedStocks] def Fine(self, fine): return [x.Symbol for x in fine if self.Time > x.EarningReports.FileDate + timedelta(days=14) and x.EarningReports.FileDate != datetime.time()][:50] def OnSecuritiesChanged(self, changes): for stock in changes.RemovedSecurities: symbol = stock.Symbol self.Liquidate(symbol) #If symbol is filtered out of universe if symbol in self.trailing_prices_long: self.trailing_prices_long.pop(symbol) if symbol in self.trailing_prices_short: self.trailing_prices_short.pop(symbol) if symbol in self.tickers: #Remove candlebar data self.five_minute_close.pop(symbol, None) self.five_minute_open.pop(symbol, None) self.five_minute_high.pop(symbol, None) self.five_minute_low.pop(symbol, None) self.five_minute_volume.pop(symbol, None) #Remove HA data self.heikinashi_close_window.pop(symbol, None) self.heikinashi_high_window.pop(symbol, None) self.heikinashi_low_window.pop(symbol, None) self.heikinashi_open_window.pop(symbol, None) #Remove ATR data self.stocks_atrWindow.pop(symbol, None) self.stocks_atrDown.pop(symbol, None) self.stocks_atrUp.pop(symbol, None) #Remove accumulator data self.ticker_volume_accumulator.pop(symbol, None) self.five_minute_high_accumulator.pop(symbol, None) self.five_minute_low_accumulator.pop(symbol, None) self.five_minute_close_accumulator.pop(symbol, None) #Remove ticker from tickers list self.tickers.remove(symbol) #Value self.Value.pop(symbol, None) history = self.History([stock.Symbol for stock in changes.AddedSecurities], 300, Resolution.Minute) for stock in changes.AddedSecurities: symbol = stock.Symbol if symbol in history.index: if symbol not in self.tickers: if symbol == self.spy.Symbol: continue #Add to candlebar data self.five_minute_close[symbol] = [] self.five_minute_low[symbol] = [] self.five_minute_high[symbol] = [] self.five_minute_open[symbol] = [] self.five_minute_volume[symbol] = [] #Add to HA data self.heikinashi_close_window[symbol] = [] self.heikinashi_high_window[symbol] = [] self.heikinashi_low_window[symbol] = [] self.heikinashi_open_window[symbol] = [] #Add to ATR data self.stocks_atrWindow[symbol] = [] self.stocks_atrDown[symbol] = [] self.stocks_atrUp[symbol] = [] #Add to accumulator data self.ticker_volume_accumulator[symbol] = 0 self.five_minute_low_accumulator[symbol] = [] self.five_minute_high_accumulator[symbol] = [] self.five_minute_close_accumulator[symbol] = [] #Add symbol to tickers list self.tickers.append(symbol) #Value self.Value[symbol] = 0 def OnData(self, data): #Start logic if self.market_hours == True: ### Check if there has been a long order submitted if len(self.long_buy_tickets) > 0: ### Loop through long ticket storage for i in self.long_buy_tickets: #Check if order for ticket has been filled if self.long_buy_tickets[i].Status == OrderStatus.Filled: pass #Check if partially filled, if so, cancel order elif self.long_buy_tickets[i].Status == OrderStatus.PartiallyFilled: #Cancel order self.long_buy_tickets[i].Cancel("Cancelled order") else: 0 self.long_buy_tickets.clear() ### Check if long sell order submitted if len(self.long_sell_tickets) > 0: ### Loop through long ticket storage for i in self.long_sell_tickets: #Check if order for ticket has been filled if self.long_sell_tickets[i].Status == OrderStatus.Filled: pass #Check if partially filled, if so, cancel order elif self.long_sell_tickets[i].Status == OrderStatus.PartiallyFilled: #Cancel order self.long_sell_tickets[i].Cancel("Cancelled order") else: 0 self.long_sell_tickets.clear() ### Check if short sell order submitted if len(self.short_sell_tickets) > 0: ### Loop through long ticket storage for i in self.short_sell_tickets: #Check if order for ticket has been filled if self.short_sell_tickets[i].Status == OrderStatus.Filled: pass #Check if partially filled, if so, cancel order elif self.short_sell_tickets[i].Status == OrderStatus.PartiallyFilled: #Cancel order self.short_sell_tickets[i].Cancel("Cancelled order") else: 0 self.short_sell_tickets.clear() ### Check if short cover order submitted if len(self.short_cover_tickets) > 0: ### Loop through long ticket storage for i in self.short_cover_tickets: #Check if order for ticket has been filled if self.short_cover_tickets[i].Status == OrderStatus.Filled: pass #Check if partially filled, if so, cancel order elif self.short_cover_tickets[i].Status == OrderStatus.PartiallyFilled: #Cancel order self.short_cover_tickets[i].Cancel("Cancelled order") else: 0 self.short_cover_tickets.clear() #Add count by 1 everytime data is received self.count += 1 #Trigger 5 minute count if count is divisable by 5 if self.count % 5 == 0: self.five_minute_count = True #Store data received in a global variable self.data = data #Collect tradebars self.tradebars = data.Bars #Store data from self.data into smaller pieces for processing #Loop through self.tickers for i in self.tickers: #Check if round of data contains data for ticker[i] if not self.data.ContainsKey(i): continue #Check if item contains trade bar data if not self.data.Bars.ContainsKey(i): continue #Create temporary variable to store ticker volume ticker_volume = self.tradebars[i].Volume #Update five minute volume accumulator for ticker self.ticker_volume_accumulator[i] += ticker_volume #Update minute high of stocks self.five_minute_high_accumulator[i].append(self.tradebars[i].High) #Update minute low of stocks self.five_minute_low_accumulator[i].append(self.tradebars[i].Low) #Update minute close of stocks self.five_minute_close_accumulator[i].append(self.tradebars[i].Close) #Check if it is the first minute that this stock has a trade if len(self.five_minute_high_accumulator[i]) == 1: #Save opening prices self.five_minute_open[i].append(self.tradebars[i].Open) #If five minute count is true if self.five_minute_count == True: #Loop through securities for i in self.tickers: #If there has been a trade if self.ticker_volume_accumulator[i] > 0: #Get highest point from five minute high accumulator x = max(self.five_minute_high_accumulator[i]) #Update five minute high list self.five_minute_high[i].append(x) #Reset accumulator self.five_minute_high_accumulator[i] = [] del x #Get lowest point from five minute low accumulator y = min(self.five_minute_low_accumulator[i]) #Update five minute low list self.five_minute_low[i].append(y) #Reset accumulator self.five_minute_low_accumulator[i] = [] del y #Update five minute volume tracker self.five_minute_volume[i].append(self.ticker_volume_accumulator[i]) #Reset accumulator self.ticker_volume_accumulator[i] = 0 #Update last close price last_closing_price = self.five_minute_close_accumulator[i][-1] self.five_minute_close[i].append(last_closing_price) #Reset accumulator self.five_minute_close_accumulator[i] = [] #Check if length of ticker[i] price tracker in dictionary is greater than 30 if len(self.five_minute_close[i]) > 30: self.five_minute_close[i] = self.five_minute_close[i][-30:] else: continue #Check if length of ticker[i] open tracker in dictionary is greater than 30 if len(self.five_minute_open[i]) > 30: self.five_minute_open[i] = self.five_minute_open[i][-30:] else: continue #Check if length of ticker[i] high tracker in dictionary is greater than 30 if len(self.five_minute_high[i]) > 30: self.five_minute_high[i] = self.five_minute_high[i][-30:] else: continue #Check if length of ticker[i] low tracker in dictionary is greater than 30 if len(self.five_minute_low[i]) > 30: self.five_minute_low[i] = self.five_minute_low[i][-30:] else: continue #Check if length of ticker[i] volume tracker in dictionary is greater than 30 if len(self.five_minute_volume[i]) > 30: self.five_minute_volume[i] = self.five_minute_volume[i][-30:] else: continue ### Heikin Ashi bars HA_open = (self.five_minute_open[i][-2] + self.five_minute_close[i][-2]) / 2 HA_close = (self.five_minute_open[i][-1] + self.five_minute_close[i][-1] + self.five_minute_high[i][-1] + self.five_minute_low[i][-1]) / 4 HA_high = self.five_minute_high[i][-1] HA_low = self.five_minute_low[i][-1] self.heikinashi_close_window[i].append(HA_close) self.heikinashi_high_window[i].append(HA_high) self.heikinashi_low_window[i].append(HA_low) self.heikinashi_open_window[i].append(HA_open) #Check for length of HA close, high, low, open window if len(self.heikinashi_close_window[i]) > 10: self.heikinashi_close_window[i] = self.heikinashi_close_window[i][-10:] else: continue if len(self.heikinashi_high_window[i]) > 10: self.heikinashi_high_window[i] = self.heikinashi_high_window[i][-10:] else: continue if len(self.heikinashi_low_window[i]) > 10: self.heikinashi_low_window[i] = self.heikinashi_low_window[i][-10:] else: continue if len(self.heikinashi_open_window[i]) > 10: self.heikinashi_open_window[i] = self.heikinashi_open_window[i][-10:] else: continue #Create create ATR and update bars = {} bars["open"] = self.five_minute_open[i] bars["close"] = self.five_minute_close[i] bars["high"] = self.five_minute_high[i] bars["low"] = self.five_minute_low[i] ATR = DataFrame (bars, columns = ["open","close","high","low"]) ATR["high_low"] = abs(ATR['high'] - ATR['low']) ATR["high_close"] = abs(ATR['high'] - ATR['close'].shift(1)) ATR["low_close"] = abs(ATR['low'] - ATR['close'].shift(1)) ATR["True_Range"] = ATR[["high_low","high_close","low_close"]].max(axis=1,skipna=False) ATR["ATR"] = ATR["True_Range"].rolling(2).mean() self.stocks_atrWindow[i] = ATR["ATR"].to_list()[-10:] #Condition for atrDown hl = (self.heikinashi_high_window[i][-1] + self.heikinashi_low_window[i][-1]) / 2 hltwo = (self.heikinashi_high_window[i][-1] + self.heikinashi_low_window[i][-1]) / 2 hltwoPrev = (self.heikinashi_high_window[i][-2] + self.heikinashi_low_window[i][-2]) / 2 downNow = hltwo - (self.multiplier * self.stocks_atrWindow[i][-1]) downPrev = hltwoPrev - (self.multiplier * self.stocks_atrWindow[i][-2]) if self.heikinashi_close_window[i][-2] > downPrev: atrDown = max(downPrev, downNow) else: atrDown = downNow self.stocks_atrDown[i].append(atrDown) #Condition for atrUp upNow = hltwo + (self.multiplier * self.stocks_atrWindow[i][-1]) upPrev = hltwoPrev + (self.multiplier * self.stocks_atrWindow[i][-2]) if self.heikinashi_close_window[i][-2] < upPrev: atrUp = min(upNow, upPrev) else: atrUp = upNow self.stocks_atrUp[i].append(atrUp) #Check atrUp and atrDown length if len(self.stocks_atrUp[i]) > 10: self.stocks_atrUp[i] = self.stocks_atrUp[i][-10:] else: continue if len(self.stocks_atrDown[i]) > 10: self.stocks_atrDown[i] = self.stocks_atrDown[i][-10:] else: continue #Value if self.heikinashi_close_window[i][-1] > self.stocks_atrUp[i][-2]: self.Value[i] = self.stocks_atrDown[i][-1] if self.heikinashi_close_window[i][-1] < self.stocks_atrDown[i][-2]: self.Value[i] = self.stocks_atrUp[i][-1] #Check value if self.Value[i] == 0: continue #Only trade if there are at least 5 atrDown or atrUp items self.ticker_tradable.append(i) #Check how many stocks we are short in portfolio, we're doing 2 as maximum limit count = 0 for kvp in self.Portfolio: security_holding = kvp.Value if security_holding.Invested: quantity = security_holding.Quantity if quantity < 0: count += 1 if count >= 2: self.short_limit = True else: self.short_limit = False ### Sell signal long if self.five_minute_count == True: #Start looping through owned securities for kvp in self.Portfolio: security_holding = kvp.Value if security_holding.Invested: symbol = security_holding.Symbol quantity = security_holding.Quantity price = security_holding.AveragePrice #If symbol is long if quantity > 0: ## Order submitted for ticker previously but not filled, try again at much lower price if symbol not in self.trailing_prices_long and symbol in self.tickers: self.long_sell_tickets[symbol] = self.LimitOrder( str(symbol), -quantity, self.five_minute_close[symbol][-1] * 0.80) ### Trailing stop loss 3% long elif symbol in self.trailing_prices_long: if self.five_minute_close[symbol][-1] > self.trailing_prices_long[symbol]: self.trailing_prices_long[symbol] = self.five_minute_close[symbol][-1] elif self.five_minute_close[symbol][-1] < self.trailing_prices_long[symbol]: percent_loss = self.five_minute_close[symbol][-1] / self.trailing_prices_long[symbol] if percent_loss < 0.97: #Exit long position at 5% discount self.long_sell_tickets[symbol] = self.LimitOrder( str(symbol), -quantity, self.five_minute_close[symbol][-1] * 0.95) self.trailing_prices_long.pop(symbol) # Submit short order at 3% discount if short limit is not matched if self.short_limit == False: self.short_sell_tickets[symbol] = self.LimitOrder( str(symbol), -quantity * 0.75, self.five_minute_close[symbol][-1] * 0.97) self.trailing_prices_short[symbol] = self.five_minute_close[symbol][-1] else: 0 ### Profit target 10% elif symbol in self.trailing_prices_long and symbol not in self.long_sell_tickets: if (self.five_minute_close[symbol][-1] / price) > 1.1: #Exit long position at 5% discount self.long_sell_tickets[symbol] = self.LimitOrder( str(symbol), -quantity, self.five_minute_close[symbol][-1] * 0.95) self.trailing_prices_long.pop(symbol) #Submit short order at 3% discount if self.short_limit == False: self.short_sell_tickets[symbol] = self.LimitOrder( str(symbol), -quantity * 0.75, self.five_minute_close[symbol][-1] * 0.97) self.trailing_prices_short[symbol] = self.five_minute_close[symbol][-1] else: 0 ### Buy to cover signal short if self.five_minute_count == True: for kvp in self.Portfolio: security_holding = kvp.Value if security_holding.Invested: symbol = security_holding.Symbol quantity = security_holding.Quantity price = security_holding.AveragePrice #If symbol is short if quantity < 0: ### Order submitted for ticker previously but not filled, try again at much higher price if symbol not in self.trailing_prices_short and symbol in self.tickers: self.short_cover_tickets[symbol] = self.LimitOrder( str(symbol), -quantity, self.five_minute_close[symbol][-1] * 1.2) ### Trailing stop loss 3% short elif symbol in self.trailing_prices_short: if self.five_minute_close[symbol][-1] < self.trailing_prices_short[symbol]: self.trailing_prices_short[symbol] = self.five_minute_close[symbol][-1] elif self.five_minute_close[symbol][-1] > self.trailing_prices_short[symbol]: percent_loss = self.five_minute_close[symbol][-1] / self.trailing_prices_short[symbol] if percent_loss > 1.03: #Exit short position at 5% premium self.short_cover_tickets[symbol] = self.LimitOrder( str(symbol), -quantity, self.five_minute_close[symbol][-1] * 1.05) self.trailing_prices_short.pop(symbol) else: 0 ### Profit target 10% elif symbol in self.trailing_prices_short and symbol not in self.short_cover_tickets: if (self.five_minute_close[symbol][-1] / price) < 0.9: #Exit short position at 5% discount self.short_cover_tickets[symbol] = self.LimitOrder( str(symbol), -quantity, self.five_minute_close[symbol][-1] * 1.05) self.trailing_prices_short.pop(symbol) else: 0 ### Buy signal if self.five_minute_count == True and len(self.ticker_tradable) > 0 and self.long_limit == False: ### Before running buy signal, check how many stocks we are long in portfolio, we're doing 2 long as maximum limit count = 0 already_owned_stocks = [] for kvp in self.Portfolio: security_holding = kvp.Value if security_holding.Invested: symbol = security_holding.Symbol already_owned_stocks.append(symbol) quantity = security_holding.Quantity if count >= 2: self.long_limit = True else: self.long_limit = False ### Loop through tradable tickers for i in self.ticker_tradable: #Check for momentum - if mean of last 3 close prices less than 3% of mean of last 47 prices, pass if statistics.mean(self.five_minute_close[i][-3:]) < (statistics.mean(self.five_minute_close[i][:-3]) * 1.03): continue #Check for volume growth - if mean of last 3 volume amounts less than mean of last 47 volume amounts, pass if statistics.mean(self.five_minute_volume[i][-3:]) < (statistics.mean(self.five_minute_volume[i][:-3]) * 1.03): continue #Check for volatility - volatility measured through a comparison of standard deviation over all stocks witthin universe self.volatility[i] = statistics.stdev(self.five_minute_close[i]) #Sort volatility in descending order in top ten list for i in self.volatility: if len(self.top_ten_sorted) == 0: self.top_ten_sorted.append(i) elif self.volatility[i] > self.volatility[self.top_ten_sorted[0]]: self.top_ten_sorted.insert(0,i) else: 0 #Get top ten ticker symbols in terms of volatility if len(self.top_ten_sorted) > 10: self.top_ten_sorted = self.top_ten_sorted[:10] #Loop through top ten list to run buy signal given by Ajmal for i in self.top_ten_sorted[:10]: if self.Securities[i].Price < self.Value[i] and i not in already_owned_stocks: self.long_buy_tickets[i] = self.LimitOrder( str(i), self.CalculateOrderQuantity(i , 0.2), self.five_minute_close[i][-1] * 1.03) self.Debug("bought") self.trailing_prices_long[i] = self.five_minute_close[i][-1] #If count divisable by 5, clear data if self.five_minute_count == True: #Set 5 minute count to false self.five_minute_count = False #Clear tradable list self.ticker_tradable.clear() #Clear underscore self.underscore.clear() #Clear top ten self.top_ten_sorted.clear() #Clear volatility self.volatility.clear() def Reset(self): self.count = 0 self.ticker_tradable.clear() self.underscore.clear() self.five_minute_count = False def is_market_hours(self): self.market_hours = True def not_market_hours(self): self.market_hours = False