Overall Statistics |
Total Orders 2535 Average Win 0.07% Average Loss -0.04% Compounding Annual Return 126.613% Drawdown 14.500% Expectancy 0.602 Start Equity 1000000 End Equity 1228882.96 Net Profit 22.888% Sharpe Ratio 2.338 Sortino Ratio 3.523 Probabilistic Sharpe Ratio 68.953% Loss Rate 40% Win Rate 60% Profit-Loss Ratio 1.65 Alpha 0 Beta 0 Annual Standard Deviation 0.353 Annual Variance 0.125 Information Ratio 2.494 Tracking Error 0.353 Treynor Ratio 0 Total Fees $3332.33 Estimated Strategy Capacity $5300000.00 Lowest Capacity Asset USFD WAUYF0CD0JMT Portfolio Turnover 26.88% |
from AlgorithmImports import * from datetime import timedelta from collections import deque import numpy as np from trendCalculator import get_trend from macd_oracle import get_macd_score from bollinger_oracle import get_bollinger_buy_and_short from rsi_oracle import get_rsi_buy_short class custom_alpha(AlphaModel): # contain bollinger band information class bollinger_holder: def __init__(self, lower, middle, upper, price): self.lower = lower self.middle = middle self.upper = upper self.price = price # contain macd information class macd_holder: def __init__(self, fast, slow, signal, macd, hist): self.fast = fast self.slow = slow self.signal = signal self.macd = macd self.hist = hist def __init__(self, algo): self.algo = self self.plotting = False # MACD Parameters self.macd_params = {'cross_check_length': 35, 'macd_above_below_length': 28, 'long_macd_threshold': 0.25, 'short_macd_threshold': -0.25} self.macd_candles_history_size = 35 # Bollinger Band Parameters self.Bollinger_window_size = 140 self.long_threshold = 1 self.bollinger_params = {'long_threshold': self.long_threshold, 'short_threshold': self.long_threshold} # EMA parameters self.ema_rolling_window_length = 250 self.derivative_threshold = .005 # RSI Parameters self.price_rolling_window_length = 90 self.RSIS_rolling_window_length = 90 # ADX parameters self.adx_rolling_window_length = 35 self.adx_threshold = 20 # OBV parameters self.obv_rolling_window_length = 250 self.obv_threshold = .5 # maybe change around # ATR parameters self.atr_stop_multiplier = 4 # Trend Parameters self.trend_order = 5 self.K_order = 2 self.trend_history_size = 210 # 2.5 month self.rsi_trend_order = 5 self.rsi_K_order = 2 self.obv_trend_order = 2 self.obv_K_order = 2 # Portfolio Management Parameters self.look_for_entries = {} self.entry_scores = {} self.hold_length = {} self.max_position_size = .15 self.activeStocks = set() self.insight_expiry = 21 self.insight_expiry_sell = 6 self.port_bias = 1000 # Indicators self.trend_rolling_windows = {} self.MACDS = {} self.macd_consolidators = {} self.MACDS_rolling_windows = {} self.Bollingers = {} self.bollinger_consolidators = {} self.Bollingers_rolling_windows = {} self.RSIS = {} self.RSIS_trend = {} self.RSIS_rolling_windows = {} self.rsi_consolidators = {} self.EMAS = {} self.EMAS_rolling_windows = {} self.ema_consolidators = {} self.EMAS50 = {} self.EMAS50_rolling_windows = {} self.ema50_consolidators = {} self.ADX = {} self.adx_consolidators = {} self.adx_rolling = {} self.obvs = {} self.obvs_rolling = {} self.ATRS = {} self.atr_consolidators = {} self.peak_prices = {} self.universe_type = "equity" if self.universe_type != "equity": self.universe_equity = algo.AddEquity(self.universe_type, Resolution.Hour).Symbol def Update(self, algo, data): insights = [] if self.universe_type != "equity" and self.universe_equity not in self.activeStocks: self.activeStocks.add(self.universe_equity) for symbol in self.activeStocks: # region update indicators if not data.ContainsKey(symbol) or data[symbol] is None: continue if not self.MACDS[symbol].IsReady: continue self.trend_rolling_windows[symbol].Add(data[symbol].Close) self.Bollingers_rolling_windows[symbol].append(self.bollinger_holder(self.Bollingers[symbol].LowerBand.Current.Value, self.Bollingers[symbol].MiddleBand.Current.Value, self.Bollingers[symbol].UpperBand.Current.Value, data[symbol].price)) self.MACDS_rolling_windows[symbol].append(self.macd_holder(self.MACDS[symbol].Fast.Current.Value, self.MACDS[symbol].Slow.Current.Value, self.MACDS[symbol].Signal.Current.Value, self.MACDS[symbol].Current.Value, self.MACDS[symbol].histogram.Current.Value)) self.RSIS_rolling_windows[symbol].Add(self.RSIS_trend[symbol].Current.Value) self.EMAS_rolling_windows[symbol].Add(self.EMAS[symbol].Current.Value) self.EMAS50_rolling_windows[symbol].Add(self.EMAS50[symbol].Current.Value) self.obvs_rolling[symbol].Add(self.obvs[symbol].Current.Value) # endregion price_trend = get_trend(self.trend_rolling_windows[symbol], self.trend_order, self.K_order)/data[symbol].price rsi_trend = get_trend(self.RSIS_rolling_windows[symbol], self.rsi_trend_order, self.rsi_K_order)/self.RSIS[symbol].Current.Value obv_trend = get_trend(self.obvs_rolling[symbol], self.obv_trend_order, self.obv_K_order)/self.obvs[symbol].Current.Value # if 50 ema has been above 200 ema for a while, trend is up ema_trend = 0 ema50s = [x for x in self.EMAS50_rolling_windows[symbol]] ema200s = [x for x in self.EMAS_rolling_windows[symbol]] for i in range(len(ema50s)): if ema50s[i] > ema200s[i]: ema_trend += 1 bollinger_score_buy_short = get_bollinger_buy_and_short(algo, self.Bollingers_rolling_windows[symbol], 1, self.bollinger_params) macd_score = get_macd_score(self.MACDS_rolling_windows[symbol], 1, self.macd_params) rsi_score = get_rsi_buy_short(price_trend, rsi_trend) prices = [x for x in self.EMAS50_rolling_windows[symbol]] prices.reverse() derivative = np.gradient(prices)/self.EMAS50_rolling_windows[symbol][0] # buy signal if ema_trend >= 210: # if in ema uptrend, buy if price between midle and upper bollinger if bollinger_score_buy_short == 1: if macd_score == 1: if rsi_score == 1: if derivative[-1] > self.derivative_threshold: if self.ADX[symbol].Current.Value > self.adx_threshold: max_adx = max(self.adx_rolling[symbol]) current_adx = self.ADX[symbol].Current.Value if current_adx >= max_adx * .95: if obv_trend > self.obv_threshold: open_orders = algo.Transactions.GetOpenOrders(symbol) if not algo.Portfolio[symbol].Invested and len(open_orders) == 0: if symbol not in self.look_for_entries or self.look_for_entries[symbol] == 0: self.look_for_entries[symbol] = 1 self.entry_scores[symbol] = abs(int(derivative[-1] * self.ADX[symbol].Current.Value * max(price_trend, 1) * max(rsi_trend, 1) * max(obv_trend, 1) * 100 + self.port_bias)) else: if bollinger_score_buy_short == 2: if macd_score == 2: if rsi_score == 2: if derivative[-1] < -self.derivative_threshold: if self.ADX[symbol].Current.Value > self.adx_threshold: min_adx = min(self.adx_rolling[symbol]) current_adx = self.ADX[symbol].Current.Value if current_adx <= min_adx * 1.05: if obv_trend < -self.obv_threshold: open_orders = algo.Transactions.GetOpenOrders(symbol) if not algo.Portfolio[symbol].Invested and len(open_orders) == 0: self.look_for_entries[symbol] = -1 self.entry_scores[symbol] = abs(int(derivative[-1] * self.ADX[symbol].Current.Value * max(price_trend, 1) * max(rsi_trend, 1) * max(obv_trend, 1) * 100 + self.port_bias)) # generate sell signal if self.RSIS[symbol].Current.Value < 50: if algo.Portfolio[symbol].Invested and algo.Portfolio[symbol].IsLong: insight = Insight.price(symbol, timedelta(days=self.insight_expiry_sell), InsightDirection.Flat, weight = 1) insights.append(insight) if self.RSIS[symbol].Current.Value > 50: if algo.Portfolio[symbol].Invested and algo.Portfolio[symbol].IsShort: insight = Insight.price(symbol, timedelta(days=self.insight_expiry_sell), InsightDirection.Flat, weight = 1) insights.append(insight) if self.plotting: if symbol in self.peak_prices and self.peak_prices[symbol] != None: algo.Plot("price", "atr trail", self.peak_prices[symbol] - self.atr_stop_multiplier * self.ATRS[symbol].Current.Value) algo.Plot("price", "peak", self.peak_prices[symbol]) algo.Plot("atr", "atr", self.ATRS[symbol].Current.Value) algo.Plot("macd", "macd", self.MACDS[symbol].Current.Value) algo.Plot("adx", "adx", self.ADX[symbol].Current.Value) algo.Plot("obv trend", "obv trend", obv_trend) algo.Plot("obv", "obv", self.obvs[symbol].Current.Value) algo.Plot("trend", "price_trend", price_trend) algo.Plot("trend", "rsi_trend", rsi_trend) algo.Plot("rsi", "rsi", self.RSIS[symbol].Current.Value) algo.Plot("price", "ema50", self.EMAS50[symbol].Current.Value) algo.Plot("price", "ema200", self.EMAS[symbol].Current.Value) algo.Plot("bollinger_score", "bollinger_score", bollinger_score_buy_short) algo.Plot("macd_score", "macd_score", macd_score) algo.Plot("rsi_score", "rsi_score", rsi_score) algo.Plot("derivative", "derivative", derivative[-1]) algo.Plot("ema_trend: ", "ema_trend", ema_trend) algo.Plot("price", "price", data[symbol].Close) algo.Plot("price", "bollinger_middle", self.Bollingers[symbol].MiddleBand.Current.Value) algo.Plot("price", "bollinger_upper", self.Bollingers[symbol].UpperBand.Current.Value) algo.Plot("trend", "price_trend", price_trend) if len(self.look_for_entries.keys()) > 0: for key in self.look_for_entries: if self.look_for_entries[key] > 0: self.look_for_entries[key] += 1 if self.look_for_entries[key] > 70: self.look_for_entries[key] = 0 self.hold_length[key] = None else: #self.Log("Looking for entry for: " + str(key)) if data[key].price > self.Bollingers[key].MiddleBand.Current.Value: #insight = Insight(key, timedelta(days=2), InsightType.PRICE, InsightDirection.Down, self.entry_scores[key]) insight = Insight.price(key, timedelta(days=self.insight_expiry), InsightDirection.Up, weight = self.entry_scores[key]) self.peak_prices[key] = data[key].price self.hold_length[key] = 1 insights.append(insight) self.look_for_entries[key] = 0 elif self.look_for_entries[key] < 0: self.look_for_entries[key] -= 1 if self.look_for_entries[key] < -70: self.look_for_entries[key] = 0 self.hold_length[key] = None else: #self.Log("Looking for entry for: " + str(key)) if data[key].price < self.Bollingers[key].MiddleBand.Current.Value: #insight = Insight(key, timedelta(days=2), InsightType.PRICE, InsightDirection.Down, self.entry_scores[key]) insight = Insight.price(key, timedelta(days=self.insight_expiry), InsightDirection.Down, weight = self.entry_scores[key]) insights.append(insight) self.peak_prices[key] = data[key].price self.hold_length[key] = -1 self.look_for_entries[key] = 0 # endregion added_insights = self.atr_trail_stop_loss(algo, data) for insight in added_insights: insights.append(insight) return insights def atr_trail_stop_loss(self, algo, data): added_insights = [] for key in algo.Portfolio.Keys: if key in self.peak_prices and self.peak_prices[key] != None and key in self.hold_length: if self.hold_length[key] != None: self.hold_length[key] += 1 if algo.Portfolio[key].IsLong: if key in data and data[key] != None: price = data[key].price else: price = self.trend_rolling_windows[key][0] if price > self.peak_prices[key]: self.peak_prices[key] = price if price < self.peak_prices[key] - self.atr_stop_multiplier * self.ATRS[key].Current.Value: added_insights.append(Insight.price(key, timedelta(days=7), InsightDirection.Flat, weight = 1)) algo.Liquidate(key) self.hold_length[key] = None self.peak_prices[key] = None else: if key in data and data[key] != None: price = data[key].price else: price = self.trend_rolling_windows[key][0] if price < self.peak_prices[key]: self.peak_prices[key] = price if price > self.peak_prices[key] + self.atr_stop_multiplier * self.ATRS[key].Current.Value: added_insights.append(Insight.price(key, timedelta(days=7), InsightDirection.Flat, weight = 1)) algo.Liquidate(key) self.hold_length[key] = None self.peak_prices[key] = None return added_insights def OnSecuritiesChanged(self, algo, changes): # region removed securities for x in changes.RemovedSecurities: self.activeStocks.remove(x.Symbol) # can't open positions here since data might not be added correctly yet for x in changes.AddedSecurities: self.activeStocks.add(x.Symbol) self.trend_rolling_windows[x.Symbol] = RollingWindow[float](self.price_rolling_window_length) self.MACDS[x.Symbol] = MovingAverageConvergenceDivergence(12, 26, 9, MovingAverageType.Exponential) self.macd_consolidators[x.Symbol] = TradeBarConsolidator(timedelta(days=1)) algo.register_indicator(x.Symbol, self.MACDS[x.Symbol], self.macd_consolidators[x.Symbol]) self.MACDS_rolling_windows[x.Symbol] = deque(maxlen=self.macd_candles_history_size) self.Bollingers[x.Symbol] = BollingerBands(20, 2, MovingAverageType.Simple) self.bollinger_consolidators[x.Symbol] = TradeBarConsolidator(timedelta(days=1)) algo.register_indicator(x.Symbol, self.Bollingers[x.Symbol], self.bollinger_consolidators[x.Symbol]) self.Bollingers_rolling_windows[x.Symbol] = deque(maxlen=self.Bollinger_window_size) self.RSIS_trend[x.Symbol] = algo.rsi(x.Symbol, 14, Resolution.Hour) self.RSIS[x.Symbol] = RelativeStrengthIndex(14) self.rsi_consolidators[x.Symbol] = TradeBarConsolidator(timedelta(days=1)) algo.register_indicator(x.Symbol, self.RSIS[x.Symbol], self.rsi_consolidators[x.Symbol]) self.RSIS_rolling_windows[x.Symbol] = RollingWindow[float](self.RSIS_rolling_window_length) self.EMAS[x.Symbol] = ExponentialMovingAverage(200) self.ema_consolidators[x.Symbol] = TradeBarConsolidator(timedelta(days=1)) algo.register_indicator(x.Symbol, self.EMAS[x.Symbol], self.ema_consolidators[x.Symbol]) self.EMAS_rolling_windows[x.Symbol] = RollingWindow[float](self.ema_rolling_window_length) self.EMAS50[x.Symbol] = ExponentialMovingAverage(50) self.ema50_consolidators[x.Symbol] = TradeBarConsolidator(timedelta(days=1)) algo.register_indicator(x.Symbol, self.EMAS50[x.Symbol], self.ema50_consolidators[x.Symbol]) self.EMAS50_rolling_windows[x.Symbol] = RollingWindow[float](self.ema_rolling_window_length) self.ADX[x.Symbol] = AverageDirectionalIndex(14) self.adx_consolidators[x.Symbol] = TradeBarConsolidator(timedelta(days=1)) algo.register_indicator(x.Symbol, self.ADX[x.Symbol], self.adx_consolidators[x.Symbol]) self.adx_rolling[x.Symbol] = RollingWindow[float](self.adx_rolling_window_length) self.obvs[x.Symbol] = algo.obv(x.Symbol) self.obvs_rolling[x.Symbol] = RollingWindow[float](self.obv_rolling_window_length) self.ATRS[x.Symbol] = AverageTrueRange(14) self.atr_consolidators[x.Symbol] = TradeBarConsolidator(timedelta(days=1)) algo.register_indicator(x.Symbol, self.ATRS[x.Symbol], self.atr_consolidators[x.Symbol]) history = algo.History[TradeBar](x.Symbol,self.ema_rolling_window_length*3, Resolution.Hour) history2 = algo.History[TradeBar](x.Symbol,self.ema_rolling_window_length*3, Resolution.Daily) for bar in history: self.RSIS_trend[x.Symbol].Update(bar.EndTime, bar.Close) self.RSIS_rolling_windows[x.Symbol].Add(self.RSIS_trend[x.Symbol].Current.Value) for bar in history2: self.trend_rolling_windows[x.Symbol].Add(bar.Close) self.MACDS[x.Symbol].Update(bar.EndTime, bar.Close) new_macd = self.macd_holder(self.MACDS[x.Symbol].Fast.Current.Value, self.MACDS[x.Symbol].Slow.Current.Value, self.MACDS[x.Symbol].Signal.Current.Value, self.MACDS[x.Symbol].Current.Value, self.MACDS[x.Symbol].histogram.Current.Value) self.MACDS_rolling_windows[x.Symbol].append(new_macd) self.Bollingers[x.Symbol].Update(bar.EndTime, bar.Close) new_bol = self.bollinger_holder(self.Bollingers[x.Symbol].LowerBand.Current.Value, self.Bollingers[x.Symbol].MiddleBand.Current.Value, self.Bollingers[x.Symbol].UpperBand.Current.Value, bar.Close) self.Bollingers_rolling_windows[x.Symbol].append(new_bol) self.RSIS[x.Symbol].Update(bar.EndTime, bar.Close) self.EMAS[x.Symbol].Update(bar.EndTime, bar.Close) self.EMAS_rolling_windows[x.Symbol].Add(self.EMAS[x.Symbol].Current.Value) self.EMAS50[x.Symbol].Update(bar.EndTime, bar.Close) self.EMAS50_rolling_windows[x.Symbol].Add(self.EMAS50[x.Symbol].Current.Value) self.ADX[x.Symbol].Update(bar) self.adx_rolling[x.Symbol].Add(self.ADX[x.Symbol].Current.Value) self.obvs[x.Symbol].Update(bar) self.obvs_rolling[x.Symbol].Add(self.obvs[x.Symbol].Current.Value) self.ATRS[x.Symbol].Update(bar)
#region imports from AlgorithmImports import * #endregion def get_bollinger_buy_and_short(QCalgo, bollinger_rolling_window,trend, bollinger_params): score = 0 lowers = [x.lower for x in bollinger_rolling_window] middles = [x.middle for x in bollinger_rolling_window] uppers = [x.upper for x in bollinger_rolling_window] prices = [x.price for x in bollinger_rolling_window] lowers.reverse() middles.reverse() uppers.reverse() prices.reverse() above_upper = 0 middle_upper = 0 lower_middle = 0 below_lower = 0 amount_above = 0 amount_below = .0001 for i in range(len(lowers)): low = lowers[i] middle = middles[i] high = uppers[i] price = prices[i] if price >= high: above_upper += 1 elif price >= middle: middle_upper += 1 elif price >= low: lower_middle += 1 else: below_lower += 1 if price >= middle and amount_below == .0001: amount_above += (price-middle) elif price < middle: amount_below += (middle-price) most_recent = prices[0] current_location = None if most_recent >= uppers[0]: current_location = "above_upper" elif most_recent >= middles[0]: current_location = "middle_upper" elif most_recent >= lowers[0]: current_location = "lower_middle" else: current_location = "below_lower" if trend > 0: if current_location == "above_upper" or current_location == "middle_upper": if amount_above/amount_below >= bollinger_params['long_threshold']: score = 1 else: #QCalgo.Log("score would be one but percent upper and middle is: " + str(amount_above/amount_below)) return .5 elif trend < 0: if current_location == "lower_middle" or current_location == "below_lower": if (lower_middle + below_lower) / len(prices) >= bollinger_params['short_threshold']: score = 2 return score
#region imports from AlgorithmImports import * from tqdm import tqdm import pandas as pd #endregion def get_macd_score(macd_rolling, trend, macd_params): # last 35 macd histogram data points hists = [x.hist for x in macd_rolling][:macd_params['cross_check_length']] macds = [x.macd for x in macd_rolling][:macd_params['macd_above_below_length']] # detect a recent cross above or below 0 cross = 0 current = hists[0] if current >= 0: if any(x < 0 for x in hists): cross = 1 elif current <= 0: if any(x > 0 for x in hists): cross = -1 score = 0 if trend > 0: if all(x > macd_params['long_macd_threshold'] for x in macds) and macds[0] > macd_params['long_macd_threshold']: #if cross == 1: score = 1 elif trend < 0: if all(x > macd_params['short_macd_threshold'] for x in macds) and macds[0] > macd_params['short_macd_threshold']: #if cross == -1: score = 2 return score
# region imports from datetime import datetime from AlgorithmImports import * from alpha import custom_alpha # endregion class CompetitionAlgorithm(QCAlgorithm): def Initialize(self): # Backtest parameters self.SetStartDate(2024, 1, 18) self.SetEndDate(2024, 4, 18) self.SetCash(1000000) self.SetWarmUp(10) # Parameters: self.final_universe_size = 600 # Universe selection self.rebalanceTime = self.time self.universe_type = "equity" if self.universe_type == "equity": self.AddUniverse(self.CoarseFilter, self.FineFilter) self.UniverseSettings.Resolution = Resolution.Hour self.set_portfolio_construction(self.MyPCM()) self.set_alpha(custom_alpha(self)) self.set_execution(VolumeWeightedAveragePriceExecutionModel()) self.add_risk_management(NullRiskManagementModel()) # set account type self.SetBrokerageModel(BrokerageName.InteractiveBrokersBrokerage, AccountType.Margin) def CoarseFilter(self, coarse): # Rebalancing monthly if self.Time <= self.rebalanceTime: return self.Universe.Unchanged self.rebalanceTime = self.Time + timedelta(days=300) sortedByDollarVolume = sorted(coarse, key=lambda x: x.DollarVolume, reverse=True) return [x.Symbol for x in sortedByDollarVolume if x.HasFundamentalData][:1000] def FineFilter(self, fine): sortedbyVolume = sorted(fine, key=lambda x: x.DollarVolume, reverse=True ) fine_output = [x.Symbol for x in sortedbyVolume if x.price > 10 and x.MarketCap > 2000000000][:self.final_universe_size] return fine_output class MyPCM(RiskParityPortfolioConstructionModel): # override to set leverage higher def CreateTargets(self, algorithm, insights): targets = super().CreateTargets(algorithm, insights) return [PortfolioTarget(x.Symbol, x.Quantity * 1.85) for x in targets]
#region imports from AlgorithmImports import * #endregion def get_rsi_buy_short(price_trend, rsi_trend): if price_trend > 0: if rsi_trend > 0: return 1 elif price_trend < 0: if rsi_trend < 0: return 2 return 0 def get_rsi_sell_cover(price_trend, rsi_trend): if price_trend > 0: if rsi_trend < 0: return 1 elif price_trend < 0: if rsi_trend > 0: return 2 return 0
#region imports from AlgorithmImports import * #endregion import numpy as np import matplotlib.pyplot as plt import pandas as pd from scipy.signal import argrelextrema from collections import deque from matplotlib.lines import Line2D from datetime import timedelta ''' Much of this code is sourced at the following link: https://raposa.trade/blog/higher-highs-lower-lows-and-calculating-price-trends-in-python/ ''' def getHigherLows(data: np.array, order, K): ''' Finds consecutive higher lows in price pattern. Must not be exceeded within the number of periods indicated by the width parameter for the value to be confirmed. K determines how many consecutive lows need to be higher. ''' # Get lows low_idx = argrelextrema(data, np.less, order=order)[0] lows = data[low_idx] # Ensure consecutive lows are higher than previous lows extrema = [] ex_deque = deque(maxlen=K) for i, idx in enumerate(low_idx): if i == 0: ex_deque.append(idx) continue if lows[i] < lows[i-1]: ex_deque.clear() ex_deque.append(idx) if len(ex_deque) == K: extrema.append(ex_deque.copy()) return extrema def getLowerHighs(data: np.array, order=5, K=2): ''' Finds consecutive lower highs in price pattern. Must not be exceeded within the number of periods indicated by the width parameter for the value to be confirmed. K determines how many consecutive highs need to be lower. ''' # Get highs high_idx = argrelextrema(data, np.greater, order=order)[0] highs = data[high_idx] # Ensure consecutive highs are lower than previous highs extrema = [] ex_deque = deque(maxlen=K) for i, idx in enumerate(high_idx): if i == 0: ex_deque.append(idx) continue if highs[i] > highs[i-1]: ex_deque.clear() ex_deque.append(idx) if len(ex_deque) == K: extrema.append(ex_deque.copy()) return extrema def getHigherHighs(data: np.array, order, K): ''' Finds consecutive higher highs in price pattern. Must not be exceeded within the number of periods indicated by the width parameter for the value to be confirmed. K determines how many consecutive highs need to be higher. ''' # Get highs high_idx = argrelextrema(data, np.greater, order = order)[0] highs = data[high_idx] # Ensure consecutive highs are higher than previous highs extrema = [] ex_deque = deque(maxlen=K) for i, idx in enumerate(high_idx): if i == 0: ex_deque.append(idx) continue if highs[i] < highs[i-1]: ex_deque.clear() ex_deque.append(idx) if len(ex_deque) == K: extrema.append(ex_deque.copy()) return extrema def getLowerLows(data: np.array, order, K): ''' Finds consecutive lower lows in price pattern. Must not be exceeded within the number of periods indicated by the width parameter for the value to be confirmed. K determines how many consecutive lows need to be lower. ''' # Get lows low_idx = argrelextrema(data, np.less, order=order)[0] lows = data[low_idx] # Ensure consecutive lows are lower than previous lows extrema = [] ex_deque = deque(maxlen=K) for i, idx in enumerate(low_idx): if i == 0: ex_deque.append(idx) continue if lows[i] > lows[i-1]: ex_deque.clear() ex_deque.append(idx) if len(ex_deque) == K: extrema.append(ex_deque.copy()) return extrema def get_trend(close_data, order, K): ''' Get the trend of the stock ''' close_data = [x for x in close_data] close_data.reverse() # data set to dataframe empty data = pd.DataFrame() data['Close'] = close_data close = data['Close'].values hh = getHigherHighs(close, order, K) hl = getHigherLows(close, order, K) ll = getLowerLows(close, order, K) lh = getLowerHighs(close, order, K) # format for tuples inside patterns: [type, location first price, location second price, first price, second price] patterns = [] for pattern in hh: # append a tuple with date and "hh" patterns.append(('hh', pattern[0], pattern[1], close[pattern[0]], close[pattern[1]])) for pattern in hl: patterns.append(('hl', pattern[0], pattern[1], close[pattern[0]], close[pattern[1]])) for pattern in ll: patterns.append(('ll', pattern[0], pattern[1], close[pattern[0]], close[pattern[1]])) for pattern in lh: patterns.append(('lh', pattern[0], pattern[1], close[pattern[0]], close[pattern[1]])) # sort by the second date patterns.sort(key=lambda x: x[2], reverse=True) trend = 0 total_movements = patterns total_swing_up = 0 total_swing_down = 0 for x in total_movements: if x[0] == 'hh' or x[0] == 'hl': total_swing_up += (x[4] - x[3]) else: total_swing_down += (x[4] - x[3]) total_swing = total_swing_up + total_swing_down return total_swing