Overall Statistics |
Total Orders 18814 Average Win 0.20% Average Loss -0.39% Compounding Annual Return 18.719% Drawdown 75.000% Expectancy 0.051 Start Equity 100000 End Equity 198549.89 Net Profit 98.550% Sharpe Ratio 0.502 Sortino Ratio 0.557 Probabilistic Sharpe Ratio 11.756% Loss Rate 30% Win Rate 70% Profit-Loss Ratio 0.51 Alpha 0.074 Beta 1.442 Annual Standard Deviation 0.38 Annual Variance 0.145 Information Ratio 0.394 Tracking Error 0.279 Treynor Ratio 0.132 Total Fees $7637.92 Estimated Strategy Capacity $44000000.00 Lowest Capacity Asset PYPL W2CQDXKV7WPX Portfolio Turnover 26.27% |
#region imports from AlgorithmImports import * #endregion import talib import numpy as np weights = {} weights["TASUKIGAP"] = 1.5 weights["SEPARATINGLINES"] = 1 weights["GAPSIDESIDEWHITE"] = .5 weights["HARAMI"] = 1.5 weights["HIKKAKE"] = 1.5 weights["HOMINGPIGEON"] = 1 weights["HAMMER"] = .5 weights["MARUBOZU"] = .5 weights["DARKCLOUDCOVER"] = -1.5 weights["3LINESTRIKE"] = -1.5 weights["ENGULFING"] = -1 weights["SHOOTINGSTAR"] = -.5 def get_score(rolling_window, size, trend): O = np.array([rolling_window[i].Open for i in range(size)]) H = np.array([rolling_window[i].High for i in range(size)]) L = np.array([rolling_window[i].Low for i in range(size)]) C = np.array([rolling_window[i].Close for i in range(size)]) continuation_patterns = [] continuation_patterns.append(talib.CDLTASUKIGAP(O, H, L, C)) continuation_patterns.append(talib.CDLSEPARATINGLINES(O, H, L, C)) continuation_patterns.append(talib.CDLGAPSIDESIDEWHITE(O,H,L,C)) reversal_to_bull_patterns = [] reversal_to_bull_patterns.append(talib.CDLHARAMI(O,H,L,C)) reversal_to_bull_patterns.append(talib.CDLHIKKAKE(O,H,L,C)) reversal_to_bull_patterns.append(talib.CDLHOMINGPIGEON(O,H,L,C)) reversal_to_bull_patterns.append(talib.CDLHAMMER(O,H,L,C)) reversal_to_bull_patterns.append(talib.CDLMARUBOZU(O,H,L,C)) reversal_to_bear_patterns = [] reversal_to_bear_patterns.append(talib.CDLDARKCLOUDCOVER(O,H,L,C)) reversal_to_bear_patterns.append(talib.CDL3LINESTRIKE(O,H,L,C)) reversal_to_bear_patterns.append(talib.CDLENGULFING(O,H,L,C)) reversal_to_bear_patterns.append(talib.CDLSHOOTINGSTAR(O,H,L,C)) final_weight = 0 if trend == 1 or trend == -1: for i in range(len(continuation_patterns)-1): if continuation_patterns[i].any() > 0: if i == 0: # TASUKI GAP final_weight += weights["TASUKIGAP"] * trend elif i == 1: # SEPARATING LINES final_weight += weights["SEPARATINGLINES"] * trend elif i == 2: # GAP SIDE SIDE WHITE final_weight += weights["GAPSIDESIDEWHITE"] * trend elif trend == -.5: for i in range(len(reversal_to_bull_patterns)-1): if reversal_to_bull_patterns[i].any() > 0: if i == 0: # HARAMI final_weight += weights["HARAMI"] elif i == 1: # HIKKAKE final_weight += weights["HIKKAKE"] elif i == 2: # HOMING PIGEON final_weight += weights["HOMINGPIGEON"] elif i == 3: # HAMMER final_weight += weights["HAMMER"] elif i == 4: # MARUBOZU final_weight += weights["MARUBOZU"] elif trend == .5: for i in range(len(reversal_to_bear_patterns)-1): if reversal_to_bear_patterns[i].any() > 0: if i == 0: # DARK CLOUD COVER final_weight += weights["DARKCLOUDCOVER"] elif i == 1: # 3 LINE STRIKE final_weight += weights["3LINESTRIKE"] elif i == 2: # ENGULFING final_weight += weights["ENGULFING"] elif i == 3: # SHOOTING STAR final_weight += weights["SHOOTINGSTAR"] return final_weight
# region imports from datetime import datetime from AlgorithmImports import * from collections import deque import numpy as np from candlestickScore import get_score from trendCalculator import get_trend class bollinger_holder: def __init__(self, lower, middle, upper): self.lower = lower self.middle = middle self.upper = upper class CompetitionAlgorithm(QCAlgorithm): class bollinger_holder: def __init__(self, lower, middle, upper): self.lower = lower self.middle = middle self.upper = upper class macd_holder: def __init__(self, fast, slow, signal, macd): self.fast = fast self.slow = slow self.signal = signal self.macd = macd def Initialize(self): self.SetStartDate(2020, 1, 1) self.SetEndDate(2024, 1, 1) self.SetCash(100000) self.SetWarmUp(15) # Parameters: self.final_universe_size = 25 self.candles_history_size = 11 # 1.5 days self.clear_old_scores_days = 1 self.min_buy_score = float(self.get_parameter("min_buy_score")) self.max_sell_score = float(self.get_parameter("max_sell_score")) self.macd_lookback = 21 # 3 days self.min_macd = .45 self.Bollinger_window_size = 14 # 2 days self.SMA_rolling_window_length = 280 # 2 months self.trend_order = 10 self.K_order = 2 self.RSIS_rolling_window_length = 500 # 2.5 months # score weights #self.ccs = float(self.get_parameter("ccs")) self.mcd = float(self.get_parameter("mcd")) self.bb = float(self.get_parameter("bb")) self.rs = float(self.get_parameter("rs")) self.ptf = float(self.get_parameter("ptf")) self.ccs = 1 - self.mcd - self.bb - self.rs - self.ptf self.trend_history_size = 500 # 2.5 month self.macd_score_info = [-35, 38] self.candlestick_score_info = [-20, 20] # and all the weights for the candlestick patterns # data holders self.rollingWindows = {} self.trend_rolling_windows = {} self.price_movements = {} self.check_agains = [] self.MACDS = {} self.MACDS_rolling_windows = {} self.Bollingers = {} self.Bollingers_rolling_windows = {} self.SMAS200 = {} self.SMAS50 = {} self.SMAS200_rolling_windows = {} self.SMAS50_rolling_windows = {} self.RSIS = {} self.RSIS_rolling_windows = {} # portfolio management self.candleStickScores = {} self.portfolioTargets = [] self.candlestick_test_dates_to_sell = {} # Universe selection self.rebalanceTime = self.time self.activeStocks = set() # Define universe self.universe_type = self.get_parameter("universe_type") if self.universe_type == "crypto": self.add_universe(CryptoUniverse.coinbase(self.crypto_filter)) elif self.universe_type == "equity": self.AddUniverse(self.CoarseFilter, self.FineFilter) self.UniverseSettings.Resolution = Resolution.Hour # Debug self.macd_scores = [] self.candlestick_scores = [] self.final_scores = [] def crypto_filter(self, crypto): sortedByVolume = sorted(crypto, key=lambda x: x.Volume, reverse=True) return [x.Symbol for x in sortedByVolume][:10] def CoarseFilter(self, coarse): # Rebalancing monthly if self.Time <= self.rebalanceTime: return self.Universe.Unchanged self.rebalanceTime = self.Time + timedelta(days=10000) sortedByDollarVolume = sorted(coarse, key=lambda x: x.DollarVolume, reverse=True) return [x.Symbol for x in sortedByDollarVolume if x.HasFundamentalData][:1000] def FineFilter(self, fine): sortedbyVolume = sorted(fine, key=lambda x: x.DollarVolume, reverse=True ) fine_output = [x.Symbol for x in sortedbyVolume if x.MarketCap > 0][:self.final_universe_size] fine_output_str = "" for x in fine_output: fine_output_str += str(x).split(" ")[0] + " " #self.Log("fine_output_str: " + fine_output_str) return fine_output def OnSecuritiesChanged(self, changes): # close positions in removed securities for x in changes.RemovedSecurities: self.Liquidate(x.Symbol) self.activeStocks.remove(x.Symbol) # can't open positions here since data might not be added correctly yet for x in changes.AddedSecurities: self.activeStocks.add(x.Symbol) if self.universe_type == "crypto": self.rollingWindows[x.Symbol] = RollingWindow[QuoteBar](self.candles_history_size) elif self.universe_type == "equity": self.rollingWindows[x.Symbol] = RollingWindow[TradeBar](self.candles_history_size) self.trend_rolling_windows[x.Symbol] = RollingWindow[float](self.trend_history_size) self.MACDS[x.Symbol] = self.MACD(x.Symbol, 12, 26, 9, MovingAverageType.Simple, Resolution.HOUR) self.MACDS_rolling_windows[x.Symbol] = deque(maxlen=self.macd_lookback) self.Bollingers[x.Symbol] = self.BB(x.Symbol, 20, 2, MovingAverageType.Simple, Resolution.HOUR) self.Bollingers_rolling_windows[x.Symbol] = deque(maxlen=self.Bollinger_window_size) self.SMAS200[x.Symbol] = self.SMA(x.Symbol, 200, Resolution.HOUR) self.SMAS50[x.Symbol] = self.SMA(x.Symbol, 50, Resolution.HOUR) self.SMAS200_rolling_windows[x.Symbol] = RollingWindow[float](self.SMA_rolling_window_length) self.SMAS50_rolling_windows[x.Symbol] = RollingWindow[float](self.SMA_rolling_window_length) self.RSIS[x.Symbol] = self.RSI(x.Symbol, 14, MovingAverageType.Simple, Resolution.Hour) self.RSIS_rolling_windows[x.Symbol] = RollingWindow[float](self.RSIS_rolling_window_length) if self.universe_type == "crypto": history = self.History[QuoteBar](x.Symbol, self.trend_history_size, Resolution.Hour) elif self.universe_type == "equity": history = self.History[TradeBar](x.Symbol, self.trend_history_size, Resolution.Hour) for bar in history: self.rollingWindows[x.Symbol].Add(bar) self.trend_rolling_windows[x.Symbol].Add(bar.Close) self.MACDS[x.Symbol].Update(bar.EndTime, bar.Close) new_macd = self.macd_holder(self.MACDS[x.Symbol].Fast.Current.Value, self.MACDS[x.Symbol].Slow.Current.Value, self.MACDS[x.Symbol].Signal.Current.Value, self.MACDS[x.Symbol].Current.Value) self.MACDS_rolling_windows[x.Symbol].append(new_macd) self.Bollingers[x.Symbol].Update(bar.EndTime, bar.Close) new_bol = bollinger_holder(self.Bollingers[x.Symbol].LowerBand.Current.Value, self.Bollingers[x.Symbol].MiddleBand.Current.Value, self.Bollingers[x.Symbol].UpperBand.Current.Value) self.Bollingers_rolling_windows[x.Symbol].append(new_bol) self.SMAS200[x.Symbol].Update(bar.EndTime, bar.Close) self.SMAS50[x.Symbol].Update(bar.EndTime, bar.Close) self.RSIS[x.Symbol].Update(bar.EndTime, bar.Close) self.RSIS_rolling_windows[x.Symbol].Add(self.RSIS[x.Symbol].Current.Value) def OnData(self, data): #self.activeStocks = set() #self.activeStocks.add(self.AddEquity("AAPL").Symbol) self.finalScores = {} #self.activeStocks = set() #self.activeStocks.add(self.AddEquity("SPY", Resolution.Hour).Symbol) for symbol in self.activeStocks: # if symbol not in data slice return if not data.ContainsKey(symbol) or data[symbol] is None: return if not self.rollingWindows[symbol].IsReady: self.rollingWindows[symbol].Add(data[symbol]) return if not self.MACDS[symbol].IsReady: return self.rollingWindows[symbol].Add(data[symbol]) self.trend_rolling_windows[symbol].Add(data[symbol].Close) self.Bollingers_rolling_windows[symbol].append(self.bollinger_holder(self.Bollingers[symbol].LowerBand.Current.Value, self.Bollingers[symbol].MiddleBand.Current.Value, self.Bollingers[symbol].UpperBand.Current.Value)) self.MACDS_rolling_windows[symbol].append(self.macd_holder(self.MACDS[symbol].Fast.Current.Value, self.MACDS[symbol].Slow.Current.Value, self.MACDS[symbol].Signal.Current.Value, self.MACDS[symbol].Current.Value)) self.SMAS200_rolling_windows[symbol].Add(self.SMAS200[symbol].Current.Value) self.SMAS50_rolling_windows[symbol].Add(self.SMAS50[symbol].Current.Value) self.RSIS_rolling_windows[symbol].Add(self.RSIS[symbol].Current.Value) #if self.time.hour == 10 and self.time.minute == 0: self.plot_indicators(symbol, data) rolling_data = [x for x in self.trend_rolling_windows[symbol]] self.Log("rolling_Data: " + str(rolling_data)) price_trend_info = get_trend(self.trend_rolling_windows[symbol], self.trend_order, self.K_order) price_trend = price_trend_info[0] price_recent_swing = price_trend_info[1] price_total_swing = price_trend_info[2] #self.Plot("trend", "recent_swing", recent_swing) #self.Plot("trend", "total_swing", total_swing) #self.Plot("trend", "price", data[symbol].Close) if price_recent_swing <= 0 and price_total_swing <= 0: price_trend = -1 price_trend_factor = .4 elif price_recent_swing >= 0 and price_total_swing >= 0: price_trend = 1 price_trend_factor = .6 elif price_recent_swing <= 0 and price_total_swing >= 0: price_trend = .5 price_trend_factor = .55 elif price_recent_swing >= 0 and price_total_swing <= 0: price_trend = -.5 price_trend_factor = .45 self.Log("Trend: " + str(price_trend)) self.Plot("trend", "price_trend", price_trend * 100) rsi_trend_info = get_trend(self.RSIS_rolling_windows[symbol], self.trend_order, self.K_order) rsi_trend = rsi_trend_info[0] rsi_recent_swing = rsi_trend_info[1] rsi_total_swing = rsi_trend_info[2] if rsi_recent_swing <= 0 and rsi_total_swing <= 0: rsi_trend = -1 elif rsi_recent_swing >= 0 and rsi_total_swing >= 0: rsi_trend = 1 elif rsi_recent_swing <= 0 and rsi_total_swing >= 0: rsi_trend = .5 elif rsi_recent_swing >= 0 and rsi_total_swing <= 0: rsi_trend = -.5 self.plot("trend", "rsi_trendd", rsi_trend * 100) rsi_score = self.get_rsi_score(price_trend, rsi_trend) self.Plot("rsi score", "score", rsi_score) # get a candlestick score candlestick_score = get_score(self.rollingWindows[symbol], self.candles_history_size, price_trend) if symbol not in self.candleStickScores: self.candleStickScores[symbol] = [] self.candleStickScores[symbol].append((self.time, candlestick_score)) else: self.candleStickScores[symbol].append((self.time, candlestick_score)) # clear out old scores for x in self.candleStickScores[symbol]: if self.time - x[0] > timedelta(days=self.clear_old_scores_days): self.candleStickScores[symbol].remove(x) cumulative_candlestick_score = np.sum(x[1] for x in self.candleStickScores[symbol]) cumulative_candlestick_score = (cumulative_candlestick_score - self.candlestick_score_info[0]) / (self.candlestick_score_info[1] - self.candlestick_score_info[0]) self.Plot("candlestick score", "score", cumulative_candlestick_score) self.Plot("price", "price", data[symbol].Close) if cumulative_candlestick_score != 0: self.candlestick_scores.append(cumulative_candlestick_score) # get a macd score between 0 and 1 macd_score = self.get_macd_score(symbol, price_trend) self.Log("macd score after normalization: " + str(macd_score)) if macd_score != 0: self.macd_scores.append(macd_score) self.Plot("macd score", "score", macd_score) bollinger_score = float(self.get_bollinger_score(symbol, price_trend)) self.Plot("bollinger score", "score", bollinger_score) self.finalScores[symbol] = self.ccs * cumulative_candlestick_score + self.mcd * macd_score + self.bb * bollinger_score + self.rs * rsi_score + self.ptf * price_trend_factor self.final_scores.append(self.finalScores[symbol]) self.Log("Final score: " + str(self.finalScores[symbol])) self.Plot("final score", "score", self.finalScores[symbol]) if self.time.hour == 10 and self.time.minute == 0: self.log_score_details() self.BuyAndSell() def BuyAndSell(self): score_total = 0 for symbol in self.finalScores.keys(): if self.finalScores[symbol] >= self.min_buy_score: score_total += self.finalScores[symbol] for symbol in self.finalScores.keys(): if score_total != 0: if self.finalScores[symbol] >= self.min_buy_score: self.SetHoldings(symbol, self.finalScores[symbol]/score_total) self.Log("Buying " + str(symbol) + " with score: " + str(self.finalScores[symbol])) elif self.finalScores[symbol] <= self.max_sell_score: self.Liquidate(symbol) self.Log("Selling " + str(symbol) + " with score: " + str(self.finalScores[symbol])) def get_rsi_score(self, price_trend, rsi_trend): if price_trend == 1: if rsi_trend == 1: return .8 elif rsi_trend == -1: return .45 elif rsi_trend == .5: return .66 elif rsi_trend == -.5: return .5 elif price_trend == -1: if rsi_trend == 1: return .55 elif rsi_trend == -1: return .25 elif rsi_trend == .5: return .45 elif rsi_trend == -.5: return .35 elif price_trend == .5: if rsi_trend == 1: return .66 elif rsi_trend == -1: return .35 elif rsi_trend == .5: return .55 elif rsi_trend == -.5: return .45 elif price_trend == -.5: if rsi_trend == 1: return .6 elif rsi_trend == -1: return .35 elif rsi_trend == .5: return .55 elif rsi_trend == -.5: return .45 return 0 def get_macd_score(self, symbol, trend): self.Log("macd: " + str(self.MACDS[symbol].Current.Value)) self.Plot("real_macd: ", "macd", self.MACDS[symbol].Current.Value) macds = [x.macd for x in self.MACDS_rolling_windows[symbol]] is_above = None if trend == 1: # check for a cross downward for i in range(len(macds) - 1): if macds[i] >= 0: if is_above == None: is_above = True elif is_above == False: return .65 elif macds[i] < 0: if is_above == None: is_above = False elif is_above == True: return .45 elif trend == -1: for i in range(len(macds) - 1): if macds[i] >= 0: if is_above == None: is_above = True elif is_above == False: return .65 elif macds[i] < 0: if is_above == None: is_above = False elif is_above == True: return .45 elif trend == -.5: for i in range(len(macds) -1): if macds[i] >= 0: if is_above == None: is_above ==True elif is_above == False: return .65 elif macds[i] < 0: if is_above == None: is_above = False elif is_above == True: return .45 elif trend == .5: for i in range(len(macds) -1): if macds[i] >= 0: if is_above == None: is_above = True elif is_above == False: return .65 elif macds[i] < 0: if is_above == None: is_above = False elif is_above == True: return .45 return self.MACDS[symbol].Current.Value/5 + .5 def get_bollinger_score(self, symbol, trend): lowers = [x.lower for x in self.Bollingers_rolling_windows[symbol]] middles = [x.middle for x in self.Bollingers_rolling_windows[symbol]] uppers = [x.upper for x in self.Bollingers_rolling_windows[symbol]] prices = [x for x in self.trend_rolling_windows[symbol]] self.Log("length of prices: " + str(len(prices))) prices = prices[:self.Bollinger_window_size] self.Log("Length of prices after slicing: " + str(len(prices))) above_upper = 0 middle_upper = 0 lower_middle = 0 below_lower = 0 for i in range(len(lowers)): low = lowers[i] middle = middles[i] high = uppers[i] price = prices[i] if price >= high: above_upper += 1 elif price >= middle: middle_upper += 1 elif price >= low: lower_middle += 1 else: below_lower += 1 self.Log("above_upper: " + str(above_upper)) self.Log("middle_upper: " + str(middle_upper)) self.Log("lower_middle: " + str(lower_middle)) self.Log("below_lower: " + str(below_lower)) if trend == 1: self.Log("trend is 1, calculating bollinger score") score = middle_upper + .66 * above_upper + .33 * lower_middle + 0 score = score / len(prices) elif trend == -1: self.Log("trend is -1, calculating bollinger score") score = .33 * below_lower + .66 * middle_upper + above_upper + 0 score = score/ len(prices) elif trend == .5: score = 0 + .33 * lower_middle + .5 * middle_upper + .66 * above_upper score = score / len(prices) elif trend == -.5: score = middle_upper * .85 + .55 * middle_upper + .33 * lower_middle + 0 score = score / len(prices) self.Log("Bollinger score: " + str(score)) return score def plot_indicators(self, symbol, data): #self.Plot("macd", "fast", self.MACDS[symbol].Fast.Current.Value) #self.Plot("macd", "slow", self.MACDS[symbol].Slow.Current.Value) #self.Plot("macd", "signal", self.MACDS[symbol].Signal.Current.Value) self.Plot("macd", "macd", self.MACDS[symbol].Current.Value) self.Plot("smas", "50", self.SMAS50[symbol].Current.Value) self.Plot("smas", "200", self.SMAS200[symbol].Current.Value) self.Plot("bollinger", "lower", self.Bollingers[symbol].LowerBand.Current.Value) self.Plot("bollinger", "middle", self.Bollingers[symbol].MiddleBand.Current.Value) self.Plot("bollinger", "upper", self.Bollingers[symbol].UpperBand.Current.Value) self.Plot("bollinger", "price", data[symbol].Close) def log_score_details(self): # log macd average, min, max if len(self.macd_scores) > 0: self.Log("MACD average: " + str(np.mean(self.macd_scores))) self.Log("MACD min: " + str(np.min(self.macd_scores))) self.Log("MACD max: " + str(np.max(self.macd_scores))) self.Log("MACD median: " + str(np.median(self.macd_scores))) self.Log("MACD std: " + str(np.std(self.macd_scores))) # log candlestick average, min, max if len(self.candlestick_scores) > 0: self.Log("Candlestick average: " + str(np.mean(self.candlestick_scores))) self.Log("Candlestick min: " + str(np.min(self.candlestick_scores))) self.Log("Candlestick max: " + str(np.max(self.candlestick_scores))) self.Log("Candlestick median: " + str(np.median(self.candlestick_scores))) self.Log("Candlestick std: " + str(np.std(self.candlestick_scores))) # log final average, min, max self.Log("Final average: " + str(np.mean(self.final_scores))) self.Log("Final min: " + str(np.min(self.final_scores))) self.Log("Final max: " + str(np.max(self.final_scores))) self.Log("Final median: " + str(np.median(self.final_scores))) self.Log("Final std: " + str(np.std(self.final_scores)))
#region imports from AlgorithmImports import * #endregion import numpy as np import matplotlib.pyplot as plt import pandas as pd from scipy.signal import argrelextrema from collections import deque from matplotlib.lines import Line2D from datetime import timedelta ''' Much of this code is sourced at the following link: https://raposa.trade/blog/higher-highs-lower-lows-and-calculating-price-trends-in-python/ ''' def getHigherLows(data: np.array, order, K): ''' Finds consecutive higher lows in price pattern. Must not be exceeded within the number of periods indicated by the width parameter for the value to be confirmed. K determines how many consecutive lows need to be higher. ''' # Get lows low_idx = argrelextrema(data, np.less, order=order)[0] lows = data[low_idx] # Ensure consecutive lows are higher than previous lows extrema = [] ex_deque = deque(maxlen=K) for i, idx in enumerate(low_idx): if i == 0: ex_deque.append(idx) continue if lows[i] < lows[i-1]: ex_deque.clear() ex_deque.append(idx) if len(ex_deque) == K: extrema.append(ex_deque.copy()) return extrema def getLowerHighs(data: np.array, order=5, K=2): ''' Finds consecutive lower highs in price pattern. Must not be exceeded within the number of periods indicated by the width parameter for the value to be confirmed. K determines how many consecutive highs need to be lower. ''' # Get highs high_idx = argrelextrema(data, np.greater, order=order)[0] highs = data[high_idx] # Ensure consecutive highs are lower than previous highs extrema = [] ex_deque = deque(maxlen=K) for i, idx in enumerate(high_idx): if i == 0: ex_deque.append(idx) continue if highs[i] > highs[i-1]: ex_deque.clear() ex_deque.append(idx) if len(ex_deque) == K: extrema.append(ex_deque.copy()) return extrema def getHigherHighs(data: np.array, order, K): ''' Finds consecutive higher highs in price pattern. Must not be exceeded within the number of periods indicated by the width parameter for the value to be confirmed. K determines how many consecutive highs need to be higher. ''' # Get highs high_idx = argrelextrema(data, np.greater, order = order)[0] highs = data[high_idx] # Ensure consecutive highs are higher than previous highs extrema = [] ex_deque = deque(maxlen=K) for i, idx in enumerate(high_idx): if i == 0: ex_deque.append(idx) continue if highs[i] < highs[i-1]: ex_deque.clear() ex_deque.append(idx) if len(ex_deque) == K: extrema.append(ex_deque.copy()) return extrema def getLowerLows(data: np.array, order, K): ''' Finds consecutive lower lows in price pattern. Must not be exceeded within the number of periods indicated by the width parameter for the value to be confirmed. K determines how many consecutive lows need to be lower. ''' # Get lows low_idx = argrelextrema(data, np.less, order=order)[0] lows = data[low_idx] # Ensure consecutive lows are lower than previous lows extrema = [] ex_deque = deque(maxlen=K) for i, idx in enumerate(low_idx): if i == 0: ex_deque.append(idx) continue if lows[i] > lows[i-1]: ex_deque.clear() ex_deque.append(idx) if len(ex_deque) == K: extrema.append(ex_deque.copy()) return extrema def get_trend(close_data, order, K): ''' Get the trend of the stock ''' close_data = [x for x in close_data] close_data.reverse() # data set to dataframe empty data = pd.DataFrame() data['Close'] = close_data close = data['Close'].values hh = getHigherHighs(close, order, K) hl = getHigherLows(close, order, K) ll = getLowerLows(close, order, K) lh = getLowerHighs(close, order, K) # format for tuples inside patterns: [type, location first price, location second price, first price, second price] patterns = [] for pattern in hh: # append a tuple with date and "hh" patterns.append(('hh', pattern[0], pattern[1], close[pattern[0]], close[pattern[1]])) for pattern in hl: patterns.append(('hl', pattern[0], pattern[1], close[pattern[0]], close[pattern[1]])) for pattern in ll: patterns.append(('ll', pattern[0], pattern[1], close[pattern[0]], close[pattern[1]])) for pattern in lh: patterns.append(('lh', pattern[0], pattern[1], close[pattern[0]], close[pattern[1]])) # sort by the second date patterns.sort(key=lambda x: x[2], reverse=True) trend = 0 recent_movements_length = 3 recent_movements = patterns[:recent_movements_length] recent_swing_up = 0 recent_swing_down = 0 for x in recent_movements: if x[0] == 'hh' or x[0] == 'hl': recent_swing_up += (x[4] - x[3]) print("hh or hl, adding: ", (x[4] - x[3])) else: recent_swing_down += (x[4] - x[3]) print("ll or lh, adding: ", (x[4] - x[3])) recent_swing = recent_swing_up + recent_swing_down print("recent_swing: ", recent_swing) total_movements_length = 10 total_movements = patterns[:total_movements_length] total_swing_up = 0 total_swing_down = 0 for x in total_movements: if x[0] == 'hh' or x[0] == 'hl': total_swing_up += (x[4] - x[3]) else: total_swing_down += (x[4] - x[3]) total_swing = total_swing_up + total_swing_down print("total_swing: ", total_swing) return (trend, recent_swing, total_swing)