Overall Statistics |
Total Orders 1364 Average Win 0.13% Average Loss -0.05% Compounding Annual Return 894.494% Drawdown 8.500% Expectancy 1.763 Start Equity 1000000 End Equity 1783760.23 Net Profit 78.376% Sharpe Ratio 10.821 Sortino Ratio 20.569 Probabilistic Sharpe Ratio 98.547% Loss Rate 25% Win Rate 75% Profit-Loss Ratio 2.71 Alpha 3.654 Beta 2.512 Annual Standard Deviation 0.398 Annual Variance 0.158 Information Ratio 11.422 Tracking Error 0.354 Treynor Ratio 1.714 Total Fees $2525.65 Estimated Strategy Capacity $6500000.00 Lowest Capacity Asset ELF WE44M4VG4BQD Portfolio Turnover 25.96% |
from AlgorithmImports import * from datetime import timedelta from collections import deque import numpy as np from trendCalculator import get_trend from macd_oracle import get_macd_score from bollinger_oracle import get_bollinger_buy_and_short from rsi_oracle import get_rsi_buy_short class custom_alpha(AlphaModel): # contain bollinger band information class bollinger_holder: def __init__(self, lower, middle, upper, price): self.lower = lower self.middle = middle self.upper = upper self.price = price # contain macd information class macd_holder: def __init__(self, fast, slow, signal, macd, hist): self.fast = fast self.slow = slow self.signal = signal self.macd = macd self.hist = hist def __init__(self, algo): self.algo = self self.plotting = False # MACD Parameters self.macd_params = {'cross_check_length': 35, 'macd_above_below_length': 28, 'long_macd_threshold': 0.25, 'short_macd_threshold': -0.25} self.macd_candles_history_size = 15 # Bollinger Band Parameters self.Bollinger_window_size = 25 #140 self.long_threshold = 1 self.bollinger_params = {'long_threshold': self.long_threshold, 'short_threshold': self.long_threshold} # EMA parameters self.ema_rolling_window_length = 250 self.derivative_threshold = .005 # RSI Parameters self.price_rolling_window_length = 30 self.RSIS_rolling_window_length = 30 # ADX parameters self.adx_rolling_window_length = 30 self.adx_threshold = 30 # OBV parameters self.obv_rolling_window_length = 150 self.obv_threshold = .5 # maybe change around # ATR parameters self.atr_stop_multiplier = 3 # Trend Parameters self.trend_order = 5 self.K_order = 2 self.trend_history_size = 30 #210 # 2.5 month self.rsi_trend_order = 5 self.rsi_K_order = 2 self.obv_trend_order = 2 self.obv_K_order = 2 # Portfolio Management Parameters self.look_for_entries = {} self.entry_scores = {} self.hold_length = {} self.max_position_size = .15 self.activeStocks = set() self.insight_expiry = 14 self.insight_expiry_sell = 6 self.port_bias = 700 # Indicators self.trend_rolling_windows = {} self.MACDS = {} self.macd_consolidators = {} self.MACDS_rolling_windows = {} self.Bollingers = {} self.bollinger_consolidators = {} self.Bollingers_rolling_windows = {} self.RSIS = {} self.RSIS_trend = {} self.RSIS_rolling_windows = {} self.rsi_consolidators = {} self.EMAS = {} self.EMAS_rolling_windows = {} self.ema_consolidators = {} self.EMAS50 = {} self.EMAS50_rolling_windows = {} self.ema50_consolidators = {} self.ADX = {} self.adx_consolidators = {} self.adx_rolling = {} self.obvs = {} self.obv_consolidators = {} self.obvs_rolling = {} self.ATRS = {} self.atr_consolidators = {} self.peak_prices = {} self.universe_type = "equity" if self.universe_type != "equity": self.universe_equity = algo.AddEquity(self.universe_type, Resolution.Hour).Symbol self.symbols_invested_in_last_iteration = set() self.symbols_invested_in_last_iteration.add(algo.AddEquity("MS", Resolution.Hour).Symbol) self.symbols_invested_in_last_iteration.add(algo.AddEquity("HOOD", Resolution.Hour).Symbol) self.symbols_invested_in_last_iteration.add(algo.AddEquity("DAL", Resolution.Hour).Symbol) self.symbols_invested_in_last_iteration.add(algo.AddEquity("TOST", Resolution.Hour).Symbol) self.symbols_invested_in_last_iteration.add(algo.AddEquity("APP", Resolution.Hour).Symbol) def Update(self, algo, data): self.nobuyreasons = [] insights = [] if self.symbols_invested_in_last_iteration != None: for symbol in self.symbols_invested_in_last_iteration: self.activeStocks.add(symbol) insight = Insight.price(symbol, timedelta(days=self.insight_expiry-4), InsightDirection.Up, weight = .75) insights.append(insight) algo.Log("added initial insight: " + str(symbol)) self.symbols_invested_in_last_iteration = None if self.universe_type != "equity" and self.universe_equity not in self.activeStocks: self.activeStocks.add(self.universe_equity) algo.Log("symbols in active stocks: " + str(len(self.activeStocks))) for symbol in self.activeStocks: # region update indicators if not data.ContainsKey(symbol) or data[symbol] is None: self.nobuyreasons.append("no data") continue if not self.MACDS[symbol].IsReady: self.nobuyreasons.append("macd not ready") continue # if it is 10:00am if data[symbol].EndTime.hour == 10 and data[symbol].EndTime.minute == 0: self.trend_rolling_windows[symbol].Add(data[symbol].Close) self.Bollingers_rolling_windows[symbol].append(self.bollinger_holder(self.Bollingers[symbol].LowerBand.Current.Value, self.Bollingers[symbol].MiddleBand.Current.Value, self.Bollingers[symbol].UpperBand.Current.Value, data[symbol].price)) self.MACDS_rolling_windows[symbol].append(self.macd_holder(self.MACDS[symbol].Fast.Current.Value, self.MACDS[symbol].Slow.Current.Value, self.MACDS[symbol].Signal.Current.Value, self.MACDS[symbol].Current.Value, self.MACDS[symbol].histogram.Current.Value)) self.RSIS_rolling_windows[symbol].Add(self.RSIS_trend[symbol].Current.Value) self.EMAS_rolling_windows[symbol].Add(self.EMAS[symbol].Current.Value) self.EMAS50_rolling_windows[symbol].Add(self.EMAS50[symbol].Current.Value) self.obvs_rolling[symbol].Add(self.obvs[symbol].Current.Value) self.adx_rolling[symbol].Add(self.ADX[symbol].Current.Value) # endregion price_trend = get_trend(self.trend_rolling_windows[symbol], self.trend_order, self.K_order)/data[symbol].price rsi_trend = get_trend(self.RSIS_rolling_windows[symbol], self.rsi_trend_order, self.rsi_K_order)/self.RSIS[symbol].Current.Value obv_trend = get_trend(self.obvs_rolling[symbol], self.obv_trend_order, self.obv_K_order)/abs(self.obvs[symbol].Current.Value) # if 50 ema has been above 200 ema for a while, trend is up ema_trend = 0 ema50s = [x for x in self.EMAS50_rolling_windows[symbol]] ema200s = [x for x in self.EMAS_rolling_windows[symbol]] for i in range(len(ema50s)): if ema50s[i] > ema200s[i]: ema_trend += 1 bollinger_score_buy_short = get_bollinger_buy_and_short(algo, self.Bollingers_rolling_windows[symbol], 1, self.bollinger_params) macd_score = get_macd_score(self.MACDS_rolling_windows[symbol], 1, self.macd_params) rsi_score = get_rsi_buy_short(price_trend, rsi_trend) prices = [x for x in self.EMAS50_rolling_windows[symbol]] prices.reverse() derivative = np.gradient(prices)/self.EMAS50_rolling_windows[symbol][0] # buy signal if ema_trend >= 210: # if in ema uptrend, buy if price between midle and upper bollinger if bollinger_score_buy_short == 1: if macd_score == 1: if rsi_score == 1: if derivative[-1] > self.derivative_threshold: if self.ADX[symbol].Current.Value > self.adx_threshold: max_adx = max(self.adx_rolling[symbol]) current_adx = self.ADX[symbol].Current.Value if current_adx >= max_adx * .95: if obv_trend > self.obv_threshold: open_orders = algo.Transactions.GetOpenOrders(symbol) if not algo.Portfolio[symbol].Invested and len(open_orders) == 0: if symbol not in self.look_for_entries or self.look_for_entries[symbol] == 0: self.look_for_entries[symbol] = 1 self.entry_scores[symbol] = abs(int(derivative[-1] * self.ADX[symbol].Current.Value * max(price_trend, 1) * max(rsi_trend, 1) * max(obv_trend, 1) * 100 + self.port_bias)) else: self.nobuyreasons.append("obv trend too low: ") else: self.nobuyreasons.append("adx not at max") else: self.nobuyreasons.append("adx below threshold") else: self.nobuyreasons.append("not in derivative uptrend") else: self.nobuyreasons.append("not in rsi uptrend") else: self.nobuyreasons.append("not in macd uptrend") else: self.nobuyreasons.append("not in bollinger uptrend") else: self.nobuyreasons.append("not in ema uptrend") if bollinger_score_buy_short == 2: if macd_score == 2: if rsi_score == 2: if derivative[-1] < -self.derivative_threshold: if self.ADX[symbol].Current.Value > self.adx_threshold: min_adx = min(self.adx_rolling[symbol]) current_adx = self.ADX[symbol].Current.Value if current_adx <= min_adx * 1.05: if obv_trend < -self.obv_threshold: open_orders = algo.Transactions.GetOpenOrders(symbol) if not algo.Portfolio[symbol].Invested and len(open_orders) == 0: self.look_for_entries[symbol] = -1 self.entry_scores[symbol] = abs(int(derivative[-1] * self.ADX[symbol].Current.Value * max(price_trend, 1) * max(rsi_trend, 1) * max(obv_trend, 1) * 100 + self.port_bias)) # generate sell signal #if self.RSIS[symbol].Current.Value < 50: ### if algo.Portfolio[symbol].Invested and algo.Portfolio[symbol].IsLong: # insight = Insight.price(symbol, timedelta(days=self.insight_expiry_sell), InsightDirection.Flat, weight = 1) # insights.append(insight) #if self.RSIS[symbol].Current.Value > 50: # if algo.Portfolio[symbol].Invested and algo.Portfolio[symbol].IsShort: # insight = Insight.price(symbol, timedelta(days=self.insight_expiry_sell), InsightDirection.Flat, weight = 1) # insights.append(insight) if self.plotting: if symbol in self.peak_prices and self.peak_prices[symbol] != None: algo.Plot("price", "atr trail", self.peak_prices[symbol] - self.atr_stop_multiplier * self.ATRS[symbol].Current.Value) algo.Plot("price", "peak", self.peak_prices[symbol]) algo.Plot("atr", "atr", self.ATRS[symbol].Current.Value) algo.Plot("macd", "macd", self.MACDS[symbol].Current.Value) algo.Plot("adx", "adx", self.ADX[symbol].Current.Value) algo.Plot("obv trend", "obv trend", obv_trend) algo.Plot("obv", "obv", self.obvs[symbol].Current.Value) algo.Plot("trend", "price_trend", price_trend) algo.Plot("trend", "rsi_trend", rsi_trend) algo.Plot("rsi", "rsi", self.RSIS[symbol].Current.Value) algo.Plot("price", "ema50", self.EMAS50[symbol].Current.Value) algo.Plot("price", "ema200", self.EMAS[symbol].Current.Value) algo.Plot("bollinger_score", "bollinger_score", bollinger_score_buy_short) algo.Plot("macd_score", "macd_score", macd_score) algo.Plot("rsi_score", "rsi_score", rsi_score) algo.Plot("derivative", "derivative", derivative[-1]) algo.Plot("ema_trend: ", "ema_trend", ema_trend) algo.Plot("price", "price", data[symbol].Close) algo.Plot("price", "bollinger_middle", self.Bollingers[symbol].MiddleBand.Current.Value) algo.Plot("price", "bollinger_upper", self.Bollingers[symbol].UpperBand.Current.Value) algo.Plot("trend", "price_trend", price_trend) # print out by order of most the nobuyreasons and their number of occurences for reason in sorted(set(self.nobuyreasons), key = lambda x: self.nobuyreasons.count(x), reverse = True): algo.Log(reason + ": " + str(self.nobuyreasons.count(reason))) if len(self.look_for_entries.keys()) > 0: for key in self.look_for_entries: if self.look_for_entries[key] > 0: self.look_for_entries[key] += 1 if self.look_for_entries[key] > 70: self.look_for_entries[key] = 0 self.hold_length[key] = None else: #self.Log("Looking for entry for: " + str(key)) if self.trend_rolling_windows[key][0] > self.Bollingers[key].MiddleBand.Current.Value: #insight = Insight(key, timedelta(days=2), InsightType.PRICE, InsightDirection.Down, self.entry_scores[key]) insight = Insight.price(key, timedelta(days=self.insight_expiry), InsightDirection.Up, weight = self.entry_scores[key]) self.peak_prices[key] = data[key].price self.hold_length[key] = 1 insights.append(insight) self.look_for_entries[key] = 0 elif self.look_for_entries[key] < 0: self.look_for_entries[key] -= 1 if self.look_for_entries[key] < -70: self.look_for_entries[key] = 0 self.hold_length[key] = None else: #self.Log("Looking for entry for: " + str(key)) if self.trend_rolling_windows[key][0] < self.Bollingers[key].MiddleBand.Current.Value: #insight = Insight(key, timedelta(days=2), InsightType.PRICE, InsightDirection.Down, self.entry_scores[key]) insight = Insight.price(key, timedelta(days=self.insight_expiry), InsightDirection.Down, weight = self.entry_scores[key]) insights.append(insight) self.peak_prices[key] = data[key].price self.hold_length[key] = -1 self.look_for_entries[key] = 0 # endregion added_insights = self.atr_trail_stop_loss(algo, data) for insight in added_insights: insights.append(insight) return insights def atr_trail_stop_loss(self, algo, data): added_insights = [] for key in algo.Portfolio.Keys: if key in self.peak_prices and self.peak_prices[key] != None and key in self.hold_length: if self.hold_length[key] != None: self.hold_length[key] += 1 if algo.Portfolio[key].IsLong: if key in data and data[key] != None: price = data[key].price else: price = self.trend_rolling_windows[key][0] if price > self.peak_prices[key]: self.peak_prices[key] = price if price < self.peak_prices[key] - self.atr_stop_multiplier * self.ATRS[key].Current.Value: added_insights.append(Insight.price(key, timedelta(days=7), InsightDirection.Flat, weight = 1)) algo.Log("liquidating long " + str(key) + " price is: " + str(price) + " peak price is: " + str(self.peak_prices[key]) + " atr is: " + str(self.ATRS[key].Current.Value)) algo.Liquidate(key) self.hold_length[key] = None self.peak_prices[key] = None else: if key in data and data[key] != None: price = data[key].price else: price = self.trend_rolling_windows[key][0] if price < self.peak_prices[key]: self.peak_prices[key] = price if price > self.peak_prices[key] + self.atr_stop_multiplier * self.ATRS[key].Current.Value: added_insights.append(Insight.price(key, timedelta(days=7), InsightDirection.Flat, weight = 1)) algo.Log("liquidating short " + str(key) + " price is: " + str(price) + " peak price is: " + str(self.peak_prices[key]) + " atr is: " + str(self.ATRS[key].Current.Value)) algo.Liquidate(key) self.hold_length[key] = None self.peak_prices[key] = None return added_insights def OnSecuritiesChanged(self, algo, changes): # region removed securities for x in changes.RemovedSecurities: if x.Symbol in self.activeStocks: self.activeStocks.remove(x.Symbol) # can't open positions here since data might not be added correctly yet for x in changes.AddedSecurities: self.activeStocks.add(x.Symbol) self.trend_rolling_windows[x.Symbol] = RollingWindow[float](self.price_rolling_window_length) self.MACDS[x.Symbol] = MovingAverageConvergenceDivergence(12, 26, 9, MovingAverageType.Exponential) self.macd_consolidators[x.Symbol] = TradeBarConsolidator(timedelta(days=1)) algo.register_indicator(x.Symbol, self.MACDS[x.Symbol], self.macd_consolidators[x.Symbol]) self.MACDS_rolling_windows[x.Symbol] = deque(maxlen=self.macd_candles_history_size) self.Bollingers[x.Symbol] = BollingerBands(20, 2, MovingAverageType.Simple) self.bollinger_consolidators[x.Symbol] = TradeBarConsolidator(timedelta(days=1)) algo.register_indicator(x.Symbol, self.Bollingers[x.Symbol], self.bollinger_consolidators[x.Symbol]) self.Bollingers_rolling_windows[x.Symbol] = deque(maxlen=self.Bollinger_window_size) self.RSIS_trend[x.Symbol] = algo.rsi(x.Symbol, 14, Resolution.Hour) self.RSIS[x.Symbol] = RelativeStrengthIndex(14) self.rsi_consolidators[x.Symbol] = TradeBarConsolidator(timedelta(days=1)) algo.register_indicator(x.Symbol, self.RSIS[x.Symbol], self.rsi_consolidators[x.Symbol]) self.RSIS_rolling_windows[x.Symbol] = RollingWindow[float](self.RSIS_rolling_window_length) self.EMAS[x.Symbol] = ExponentialMovingAverage(200) self.ema_consolidators[x.Symbol] = TradeBarConsolidator(timedelta(days=1)) algo.register_indicator(x.Symbol, self.EMAS[x.Symbol], self.ema_consolidators[x.Symbol]) self.EMAS_rolling_windows[x.Symbol] = RollingWindow[float](self.ema_rolling_window_length) self.EMAS50[x.Symbol] = ExponentialMovingAverage(50) self.ema50_consolidators[x.Symbol] = TradeBarConsolidator(timedelta(days=1)) algo.register_indicator(x.Symbol, self.EMAS50[x.Symbol], self.ema50_consolidators[x.Symbol]) self.EMAS50_rolling_windows[x.Symbol] = RollingWindow[float](self.ema_rolling_window_length) self.ADX[x.Symbol] = AverageDirectionalIndex(14) self.adx_consolidators[x.Symbol] = TradeBarConsolidator(timedelta(days=1)) algo.register_indicator(x.Symbol, self.ADX[x.Symbol], self.adx_consolidators[x.Symbol]) self.adx_rolling[x.Symbol] = RollingWindow[float](self.adx_rolling_window_length) self.obvs[x.Symbol] = OnBalanceVolume() self.obv_consolidators[x.Symbol] = TradeBarConsolidator(timedelta(days=1)) algo.register_indicator(x.Symbol, self.obvs[x.Symbol], self.obv_consolidators[x.Symbol]) self.obvs_rolling[x.Symbol] = RollingWindow[float](self.obv_rolling_window_length) self.ATRS[x.Symbol] = AverageTrueRange(14) self.atr_consolidators[x.Symbol] = TradeBarConsolidator(timedelta(days=1)) algo.register_indicator(x.Symbol, self.ATRS[x.Symbol], self.atr_consolidators[x.Symbol]) history = algo.History[TradeBar](x.Symbol,self.ema_rolling_window_length*3, Resolution.Hour) history2 = algo.History[TradeBar](x.Symbol,self.ema_rolling_window_length*3, Resolution.Daily) for bar in history: self.RSIS_trend[x.Symbol].Update(bar.EndTime, bar.Close) self.RSIS_rolling_windows[x.Symbol].Add(self.RSIS_trend[x.Symbol].Current.Value) for bar in history2: self.trend_rolling_windows[x.Symbol].Add(bar.Close) self.MACDS[x.Symbol].Update(bar.EndTime, bar.Close) new_macd = self.macd_holder(self.MACDS[x.Symbol].Fast.Current.Value, self.MACDS[x.Symbol].Slow.Current.Value, self.MACDS[x.Symbol].Signal.Current.Value, self.MACDS[x.Symbol].Current.Value, self.MACDS[x.Symbol].histogram.Current.Value) self.MACDS_rolling_windows[x.Symbol].append(new_macd) self.Bollingers[x.Symbol].Update(bar.EndTime, bar.Close) new_bol = self.bollinger_holder(self.Bollingers[x.Symbol].LowerBand.Current.Value, self.Bollingers[x.Symbol].MiddleBand.Current.Value, self.Bollingers[x.Symbol].UpperBand.Current.Value, bar.Close) self.Bollingers_rolling_windows[x.Symbol].append(new_bol) self.RSIS[x.Symbol].Update(bar.EndTime, bar.Close) self.EMAS[x.Symbol].Update(bar.EndTime, bar.Close) self.EMAS_rolling_windows[x.Symbol].Add(self.EMAS[x.Symbol].Current.Value) self.EMAS50[x.Symbol].Update(bar.EndTime, bar.Close) self.EMAS50_rolling_windows[x.Symbol].Add(self.EMAS50[x.Symbol].Current.Value) self.ADX[x.Symbol].Update(bar) self.adx_rolling[x.Symbol].Add(self.ADX[x.Symbol].Current.Value) self.obvs[x.Symbol].Update(bar) self.obvs_rolling[x.Symbol].Add(self.obvs[x.Symbol].Current.Value) self.ATRS[x.Symbol].Update(bar) def display_rolling_window(self, rolling_window): rolling_str = "[" for x in rolling_window: if type(x) == self.macd_holder: rolling_str += str(x.macd) + ", " else: rolling_str += str(x) + ", " rolling_str += "]" return rolling_str
#region imports from AlgorithmImports import * #endregion def get_bollinger_buy_and_short(QCalgo, bollinger_rolling_window,trend, bollinger_params): score = 0 lowers = [x.lower for x in bollinger_rolling_window] middles = [x.middle for x in bollinger_rolling_window] uppers = [x.upper for x in bollinger_rolling_window] prices = [x.price for x in bollinger_rolling_window] lowers.reverse() middles.reverse() uppers.reverse() prices.reverse() above_upper = 0 middle_upper = 0 lower_middle = 0 below_lower = 0 amount_above = 0 amount_below = .0001 for i in range(len(lowers)): low = lowers[i] middle = middles[i] high = uppers[i] price = prices[i] if price >= high: above_upper += 1 elif price >= middle: middle_upper += 1 elif price >= low: lower_middle += 1 else: below_lower += 1 if price >= middle and amount_below == .0001: amount_above += (price-middle) elif price < middle: amount_below += (middle-price) most_recent = prices[0] current_location = None if most_recent >= uppers[0]: current_location = "above_upper" elif most_recent >= middles[0]: current_location = "middle_upper" elif most_recent >= lowers[0]: current_location = "lower_middle" else: current_location = "below_lower" if trend > 0: if current_location == "above_upper" or current_location == "middle_upper": if amount_above/amount_below >= bollinger_params['long_threshold']: score = 1 else: #QCalgo.Log("score would be one but percent upper and middle is: " + str(amount_above/amount_below)) return .5 elif trend < 0: if current_location == "lower_middle" or current_location == "below_lower": if (lower_middle + below_lower) / len(prices) >= bollinger_params['short_threshold']: score = 2 return score
#region imports from AlgorithmImports import * from tqdm import tqdm import pandas as pd #endregion def get_macd_score(macd_rolling, trend, macd_params): # last 35 macd histogram data points hists = [x.hist for x in macd_rolling][:macd_params['cross_check_length']] macds = [x.macd for x in macd_rolling][:macd_params['macd_above_below_length']] # detect a recent cross above or below 0 cross = 0 current = hists[0] if current >= 0: if any(x < 0 for x in hists): cross = 1 elif current <= 0: if any(x > 0 for x in hists): cross = -1 score = 0 if trend > 0: if all(x > macd_params['long_macd_threshold'] for x in macds) and macds[0] > macd_params['long_macd_threshold']: #if cross == 1: score = 1 elif trend < 0: if all(x > macd_params['short_macd_threshold'] for x in macds) and macds[0] > macd_params['short_macd_threshold']: #if cross == -1: score = 2 return score
# region imports from datetime import datetime from AlgorithmImports import * from alpha import custom_alpha # endregion class CompetitionAlgorithm(QCAlgorithm): def Initialize(self): # Backtest parameters self.SetStartDate(2024, 1, 1) self.SetEndDate(2024, 4, 1) self.SetCash(1000000) self.SetWarmUp(timedelta(days=30)) # Parameters: self.final_universe_size = 600 # Universe selection self.rebalanceTime = self.time self.universe_type = "equity" if self.universe_type == "equity": self.Log("adding equitiy universe") self.add_universe(self.equity_filter) #self.add_universe(CryptoUniverse.coinbase(self._crypto_universe_filter)) self.UniverseSettings.Resolution = Resolution.Hour self.set_portfolio_construction(self.MyPCM()) self.set_alpha(custom_alpha(self)) self.set_execution(VolumeWeightedAveragePriceExecutionModel()) self.add_risk_management(NullRiskManagementModel()) # set account type #self.SetBrokerageModel(BrokerageName.InteractiveBrokersBrokerage, AccountType.Margin) def _crypto_universe_filter(self, data): if self.Time <= self.rebalanceTime: return self.Universe.Unchanged self.rebalanceTime = self.Time + timedelta(days=300) # Define the universe selection function sorted_by_vol = sorted(data, key=lambda x: x.volume_in_usd, reverse=True)[:30] first_of_tickers_added = [''] new_universe = [] for cf in sorted_by_vol: # remove USD and EUR from string sym_string = str(cf.symbol).replace("USDT", "").replace("USDC", "").replace("USD", "")\ .replace("EUR", "").replace("GBP", "").split(" ")[0] self.Log("sym_string: " + sym_string) if sym_string not in first_of_tickers_added: first_of_tickers_added.append(sym_string) new_universe.append(cf) sorted_by_vol = sorted(new_universe, key=lambda x: x.volume_in_usd, reverse=True) final = [cf.symbol for cf in sorted_by_vol][:10] self.Log("final: ") for i in final: self.Log(str(i)) return final def equity_filter(self, data): self.Log("in filter for equities") # Rebalancing monthly if self.Time <= self.rebalanceTime: return self.Universe.Unchanged self.rebalanceTime = self.Time + timedelta(days=300) sortedByDollarVolume = sorted(data, key=lambda x: x.DollarVolume, reverse=True) final = [x.Symbol for x in sortedByDollarVolume if x.HasFundamentalData and x.price > 10 and x.MarketCap > 2000000000][:self.final_universe_size] self.Log("coming out of course: " + str(len(final))) return final class MyPCM(InsightWeightingPortfolioConstructionModel): # override to set leverage higher def CreateTargets(self, algorithm, insights): targets = super().CreateTargets(algorithm, insights) return [PortfolioTarget(x.Symbol, x.Quantity * 1.85) for x in targets]
#region imports from AlgorithmImports import * #endregion def get_rsi_buy_short(price_trend, rsi_trend): if price_trend > 0: if rsi_trend > 0: return 1 elif price_trend < 0: if rsi_trend < 0: return 2 return 0 def get_rsi_sell_cover(price_trend, rsi_trend): if price_trend > 0: if rsi_trend < 0: return 1 elif price_trend < 0: if rsi_trend > 0: return 2 return 0
#region imports from AlgorithmImports import * #endregion import numpy as np import matplotlib.pyplot as plt import pandas as pd from scipy.signal import argrelextrema from collections import deque from matplotlib.lines import Line2D from datetime import timedelta ''' Much of this code is sourced at the following link: https://raposa.trade/blog/higher-highs-lower-lows-and-calculating-price-trends-in-python/ ''' def getHigherLows(data: np.array, order, K): ''' Finds consecutive higher lows in price pattern. Must not be exceeded within the number of periods indicated by the width parameter for the value to be confirmed. K determines how many consecutive lows need to be higher. ''' # Get lows low_idx = argrelextrema(data, np.less, order=order)[0] lows = data[low_idx] # Ensure consecutive lows are higher than previous lows extrema = [] ex_deque = deque(maxlen=K) for i, idx in enumerate(low_idx): if i == 0: ex_deque.append(idx) continue if lows[i] < lows[i-1]: ex_deque.clear() ex_deque.append(idx) if len(ex_deque) == K: extrema.append(ex_deque.copy()) return extrema def getLowerHighs(data: np.array, order=5, K=2): ''' Finds consecutive lower highs in price pattern. Must not be exceeded within the number of periods indicated by the width parameter for the value to be confirmed. K determines how many consecutive highs need to be lower. ''' # Get highs high_idx = argrelextrema(data, np.greater, order=order)[0] highs = data[high_idx] # Ensure consecutive highs are lower than previous highs extrema = [] ex_deque = deque(maxlen=K) for i, idx in enumerate(high_idx): if i == 0: ex_deque.append(idx) continue if highs[i] > highs[i-1]: ex_deque.clear() ex_deque.append(idx) if len(ex_deque) == K: extrema.append(ex_deque.copy()) return extrema def getHigherHighs(data: np.array, order, K): ''' Finds consecutive higher highs in price pattern. Must not be exceeded within the number of periods indicated by the width parameter for the value to be confirmed. K determines how many consecutive highs need to be higher. ''' # Get highs high_idx = argrelextrema(data, np.greater, order = order)[0] highs = data[high_idx] # Ensure consecutive highs are higher than previous highs extrema = [] ex_deque = deque(maxlen=K) for i, idx in enumerate(high_idx): if i == 0: ex_deque.append(idx) continue if highs[i] < highs[i-1]: ex_deque.clear() ex_deque.append(idx) if len(ex_deque) == K: extrema.append(ex_deque.copy()) return extrema def getLowerLows(data: np.array, order, K): ''' Finds consecutive lower lows in price pattern. Must not be exceeded within the number of periods indicated by the width parameter for the value to be confirmed. K determines how many consecutive lows need to be lower. ''' # Get lows low_idx = argrelextrema(data, np.less, order=order)[0] lows = data[low_idx] # Ensure consecutive lows are lower than previous lows extrema = [] ex_deque = deque(maxlen=K) for i, idx in enumerate(low_idx): if i == 0: ex_deque.append(idx) continue if lows[i] > lows[i-1]: ex_deque.clear() ex_deque.append(idx) if len(ex_deque) == K: extrema.append(ex_deque.copy()) return extrema def get_trend(close_data, order, K): ''' Get the trend of the stock ''' close_data = [x for x in close_data] close_data.reverse() # data set to dataframe empty data = pd.DataFrame() data['Close'] = close_data close = data['Close'].values hh = getHigherHighs(close, order, K) hl = getHigherLows(close, order, K) ll = getLowerLows(close, order, K) lh = getLowerHighs(close, order, K) # format for tuples inside patterns: [type, location first price, location second price, first price, second price] patterns = [] for pattern in hh: # append a tuple with date and "hh" patterns.append(('hh', pattern[0], pattern[1], close[pattern[0]], close[pattern[1]])) for pattern in hl: patterns.append(('hl', pattern[0], pattern[1], close[pattern[0]], close[pattern[1]])) for pattern in ll: patterns.append(('ll', pattern[0], pattern[1], close[pattern[0]], close[pattern[1]])) for pattern in lh: patterns.append(('lh', pattern[0], pattern[1], close[pattern[0]], close[pattern[1]])) # sort by the second date patterns.sort(key=lambda x: x[2], reverse=True) trend = 0 total_movements = patterns total_swing_up = 0 total_swing_down = 0 for x in total_movements: if x[0] == 'hh' or x[0] == 'hl': total_swing_up += (x[4] - x[3]) else: total_swing_down += (x[4] - x[3]) total_swing = total_swing_up + total_swing_down return total_swing