Overall Statistics |
Total Orders 2794 Average Win 1.40% Average Loss -0.72% Compounding Annual Return 29.622% Drawdown 15.200% Expectancy 0.264 Start Equity 100000 End Equity 1210998.28 Net Profit 1110.998% Sharpe Ratio 1.457 Sortino Ratio 1.431 Probabilistic Sharpe Ratio 89.420% Loss Rate 57% Win Rate 43% Profit-Loss Ratio 1.95 Alpha 0.182 Beta 0.279 Annual Standard Deviation 0.143 Annual Variance 0.02 Information Ratio 0.649 Tracking Error 0.174 Treynor Ratio 0.747 Total Fees $43811.60 Estimated Strategy Capacity $51000000.00 Lowest Capacity Asset QQQ RIWIV7K5Z9LX Portfolio Turnover 158.31% |
# region imports from AlgorithmImports import * # endregion ''' 1. Iteration Nochmal großen Dank, das war wieder extrem hilfreich und gut strukturiert! Beim einen oder andere Code Schnipsel wäre es hilfreich, wenn wir den nochmal durchgehen und ich Kommentare ergänze. 2. Iteration - grünes Licht für diese Punkte: Entry-Logik -------------- Hier die Punkte vom letzten Mal. Zur Re-Entry nur nach mindestens x Minuten - ist aus meienr Sicht doch nicht nötig, da man es über die Toleranzen steuern kann. Task ------ Entry nur zwischen Start- und Endzeit - bitte die Ende-Zeit noch in config.json übernehmen und mir erklären wie das geht ;) Entry-Modularisierung der Daten der Vortage ------------------------------------------- Modularisierung Task (ca. 2 h) ---------------- Bitte eine Logik ergänzen, mit der ich Filter anlegen kann, auf die ich über einen Integer Parameter abprüfen kann. Dazu habe ich mal meine GlobalSignals class beigefügt, da ist eine Logik mit einem Dictionary drin welche True oder False wiedergibt. Du kannst gern aber auch was Schlankeres nehmen. Siehe im Code: # Arthur, das sind Filter, die man immer wieder benutzen kann - wie kann man das modularisieren, zB mit einer Logik ähnlich wie in den Global Signals? Task (ca. 2 h) ----------------- Bitte einen Zugriff auf folgende Datenpunkte verfügbar machen. Idealerweise in einem in wiederverwendbaren Moduloder oder auch einem Monster-Indikator (es Security spezifische Datenpunkte). EMA(close, daily, n days) EMA(close, 4 Stunden, n perioden) EMA(volume, daily, n days) ATR(daily, n days) Properties value_area_high und value_area_low aus dem Volume Profile des Vortages (https://www.quantconnect.com/docs/v2/writing-algorithms/indicators/supported-indicators/volume-profile) Pre-Market High und Low. Dazu musst Du wohl auf extended Market Hours umswitchen. O, H, L, C der letzten 5 Sessions, gern indiziert zB closeday[0] ist der Close von gestern. Würde zB diese Logik passen oder hast Du eine bessere idee? self.close = self.algorithm.SMA(self.symbol, 1) self.high = self.algorithm.MAX(self.symbol, 1) self.low = self.algorithm.MIN(self.symbol, 1) self.close.Window.Size = 5 self.high.Window.Size = 5 self.low.Window.Size = 5 Exit-Logik ------------ Hier vom letzten Mal. Task (ca. 1.5 h) -------------------- Indicator: Bitte die Toleranzen auf x * ATR,daily ausbauen Architecture: Bitte ein Modul ergänzen, in dem ich dann verschiedene Exit-Kriterien ergänzen kann. Siehe im Code: # Arthur, das ist nur exemplarisch - wie kann man die SL Logic modularisieren, zB einen Indicator? PCM & Execution ---------------------- Da Du das EqualWeighting PCM genutzt hattest, habe ich mal das MultAlpha PCM eingebaut, denn es setzt auch auf dem EqualWeighting PCM auf. Die Ergebnisse sind identisch. Ich bin mit der Modularisierung eigentlich soweit zufrieden. Tasks (ca. 1.5 h) ------------------------ OnData: Geht das auch ohne OnData oder wie kann man das besser aufrufen? Kannst Du hier ggf. ein Code Beispiel machen oder es direkt umsetzen? Vola Sizing: Könntest Du bitte die Volatiltiy (den ATR daily?) verfgbar machen? Dann könnte ich das nutzen, um 'switchable' ein Volatility Sizing einzubauen. Kannst Du bitte zB im Security Init noch einen Leverage Factor hinterlegen? Im PCM habe ich folgenden Code: adjusted_quantity = x.Quantity * algorithm.Securities[x.Symbol].Leverage * long_short_factor GlobalSignals ------------------ Gerne können wir das im nächsten Call besprechen, hier suche ich auch nach Hilfe beim Modularisieren. Tasks (ca. 1 h) ------------------------ Architecture: Wie kann man die globalen Variablen verfügbar machen ohne überall self.algorithm zu ergänzen? Kannst Du hier ggf. ein Code Beispiel machen oder es direkt umsetzen? Indicator: Kannst Du bitte den SMA vom VIX schlank einbauen? Siehe Code: self.vix_sma = 0 # Arthur, kannst Du dies hier bitte ergänzen? Ich hatte einen Mittelwert auf ein deque genutzt, geht aber sicher schlanker? 3. Iteration - Gerne können wir das im nächsten Call besprechen. „Life Ready“ Themen - welche ToDo's siehst Du hier noch? CFD Option Weitere Alpha-Modelle '''
# region imports from typing_extensions import Annotated from AlgorithmImports import * from pydantic import BaseModel, Field, field_serializer import dateutil from datetime import datetime, timedelta, timezone, date, time # endregion class AlgorithmConfig(BaseModel): start_date: str | date end_date: str | date initial_capital: Annotated[int, Field(strict=False, gt=0)] # = 100_000 directional_bias: int = 0 # -1=short only, +1=long only tickers: str | list[str] # = ["SPY"] trading_start_time: time #= time(hour=9, minute=44) trading_end_time: time = time(hour=15, minute=59) eod_exit: bool = False costs_enabled: bool # = True leverage: float = 2.0 freePortfolioValuePercentage: float = 0.0025 # Default Value: 0.0025 def model_post_init(self, __context) -> None: if isinstance(self.start_date, str): self.start_date = dateutil.parser.parse(self.start_date).date() if isinstance(self.end_date, str): self.end_date = dateutil.parser.parse(self.end_date).date() if isinstance(self.tickers, str): self.tickers = [*map(str.strip, self.tickers.split(','))] @field_serializer('start_date', 'end_date') def serialize_dates(self, dt: date, _info) -> str: pass def to_string(self): self.model_dump_json() # Define long and short multipliers which are used in the PCM. For testing, set to 1. long_factor: float = 1. short_factor: float = 1. # Min order margin portfolio percentage to ignore bad orders and orders with small sizes in PCM. For testing, set to 0. minimumOrderMarginPortfolioPercentage: float = 0. #minimumOrderMarginPortfolioPercentage: float = 0.003 # 0.003 using $300 for a $100_000 portfolio # Min order quantity change percentage to ignore bad orders and orders with small sizes. For testing, set to 0. #minimumOrderQuantityChangePercentage: float = 0. minimumOrderQuantityChangePercentage: float = 0.1 # Max percentage of portfolio of one security per position. For testing, set to 1. #max_percentage_per_position: float = 1. max_percentage_per_position: float = 2.0 # leverage # Benchmark myBenchmark: str = 'SPY' # Global Signals global_case_filter_condition: int = 1 # always True
# region imports from AlgorithmImports import * from analytics import SecurityAnalytics # endregion class CustomAlphaModel(AlphaModel): def __init__(self): self.name = self.__class__.__name__ self.securities = [] def update(self, algorithm: QCAlgorithm, data: Slice) -> list[Insight]: insights = [] for security in self.securities: insight = security.analytics.create_insight(algorithm, data) if insight: insights.append(insight) return insights def on_securities_changed(self, algorithm, changes): for security in changes.added_securities: if security.type is not SecurityType.EQUITY: continue if security in self.securities: continue security.analytics = SecurityAnalytics(algorithm, security) self.securities.append(security) for security in changes.removed_securities: if security not in self.securities: continue self.securities.remove(security) security.analytics.reset()
# region imports from AlgorithmImports import * from indicators import NoiseAreaIndicator, IntradayVWAP from toolbox import TimeFrameHelper # endregion class SecurityAnalytics: def __init__(self, algorithm: QCAlgorithm, security: Security) -> None: self.algorithm = algorithm self.security = security self.symbol = security.symbol tf_helper = TimeFrameHelper(security, Resolution.MINUTE) self.insight = Insight.price(symbol=self.symbol, period=timedelta(1), direction=InsightDirection.FLAT) # NoiseAreaIndicator scaling_factor = 0.8 # reduces the noise area, as breakouts already happen for smaller noise areas than the average gap_stretch_factor = 1.8 # increases the noise area asymmetrically to the gap side period = tf_helper.quarter #self.noise_area_exit_tol = -0.002 # currently unused self.noise_area = NoiseAreaIndicator(tf_helper, period, scaling_factor, gap_stretch_factor) algorithm.warm_up_indicator(security.symbol, self.noise_area, Resolution.MINUTE) algorithm.register_indicator(security.symbol, self.noise_area, Resolution.MINUTE) #--------------------------------- # Entry-Logik # Arthur, das sind Prior Day Filter, die man immer wieder benutzen kann - wie kann man das modularisieren, zB mit einer Logik ähnlich wie in den Global Signals? # Exclude extreme gaps self.gap_min_long = -0.035 self.gap_max_short = +0.035 # VWAP self.vwap_entry_tol = 0.0010 # Arthur, müsste ein ATR Faktor sein self.vwap_exit_tol = -0.0005 # Arthur, müsste ein ATR Faktor sein. Vorschicht, ich habe die Logik der Vorzeichen verändert! self.vwap = IntradayVWAP() algorithm.warm_up_indicator(security.symbol, self.vwap, Resolution.MINUTE) algorithm.register_indicator(security.symbol, self.vwap, Resolution.MINUTE) # Regime Min, Max, MA regime_max_period = 3 self.regime_max = Maximum(period=regime_max_period) #algorithm.warm_up_indicator(security.symbol, self.regime_max, Resolution.DAILY, Field.CLOSE) #algorithm.register_indicator(security.symbol, self.regime_max, Resolution.DAILY, Field.CLOSE) #algorithm.warm_up_indicator(security.symbol, self.regime_max, Resolution.DAILY, Field.HIGH) #algorithm.register_indicator(security.symbol, self.regime_max, Resolution.DAILY, Field.HIGH) algorithm.warm_up_indicator(security.symbol, self.regime_max, Resolution.DAILY, Field.LOW) algorithm.register_indicator(security.symbol, self.regime_max, Resolution.DAILY, Field.LOW) #regime_min_period = 3 #self.regime_min = Maximum(period=regime_min_period) #algorithm.warm_up_indicator(security.symbol, self.regime_min, Resolution.DAILY, Field.CLOSE) #algorithm.register_indicator(security.symbol, self.regime_min, Resolution.DAILY, Field.CLOSE) #algorithm.warm_up_indicator(security.symbol, self.regime_min, Resolution.DAILY, Field.HIGH) #algorithm.register_indicator(security.symbol, self.regime_min, Resolution.DAILY, Field.HIGH) #algorithm.warm_up_indicator(security.symbol, self.regime_min, Resolution.DAILY, Field.LOW) #algorithm.register_indicator(security.symbol, self.regime_min, Resolution.DAILY, Field.LOW) #regime_ma_period = 5 #self.regime_ma = SimpleMovingAverage(period=regime_ma_period) #algorithm.warm_up_indicator(security.symbol, self.regime_ma, Resolution.DAILY, Field.CLOSE) #algorithm.register_indicator(security.symbol, self.regime_ma, Resolution.DAILY, Field.CLOSE) #--------------------------------- # Exit-Logik # Arthur, das ist nur exemplarisch - wie kann man die SL Logic modularisieren, zB einen Indicator? # Trailing EMA stop loss trailing_ema_period = 60 self.trailing_ema_exit_tol = -0.005 # müsste ein ATR Faktor sein self.trailing_ema = SimpleMovingAverage(period=trailing_ema_period) algorithm.warm_up_indicator(security.symbol, self.trailing_ema, Resolution.MINUTE, Field.CLOSE) algorithm.register_indicator(security.symbol, self.trailing_ema, Resolution.MINUTE, Field.CLOSE) # Time SMA to avoid spikes to trigger stop loss time_sma_period = 3 self.time_sma = SimpleMovingAverage(period=time_sma_period) algorithm.warm_up_indicator(security.symbol, self.time_sma, Resolution.MINUTE, Field.CLOSE) algorithm.register_indicator(security.symbol, self.time_sma, Resolution.MINUTE, Field.CLOSE) # EoD MA to allow overnight holdings in case we are on the safe side of the moving average eod_ma_period = 50 self.eod_ma = SimpleMovingAverage(period=eod_ma_period) algorithm.warm_up_indicator(security.symbol, self.eod_ma, Resolution.DAILY, Field.CLOSE) algorithm.register_indicator(security.symbol, self.eod_ma, Resolution.DAILY, Field.CLOSE) #--------------------------------- def create_insight(self, algorithm: QCAlgorithm, data: Slice) -> Insight | None: if self.noise_area.is_ready: algorithm.plot("Noise Area", "Upper Bound", self.noise_area.upper_bound) algorithm.plot("Noise Area", "Lower Bound", self.noise_area.lower_bound) algorithm.plot("Noise Area", "Price", self.security.price) if self.vwap.is_ready: algorithm.plot("Noise Area", "VWAP", self.vwap.value) if not self.can_emit_insight: return if self.insight.direction is not InsightDirection.FLAT: # exit if self.exit_conditions_met: self.insight = Insight.price(symbol=self.symbol, period=timedelta(1), direction=InsightDirection.FLAT) return self.insight # exit-and-reverse if not self.algorithm.config.eod_exit: if self.insight.direction is InsightDirection.DOWN and self.long_entry_conditions_met: self.insight = Insight.price(symbol=self.symbol, period=timedelta(1), direction=InsightDirection.UP) return self.insight if self.insight.direction is InsightDirection.UP and self.short_entry_conditions_met: self.insight = Insight.price(symbol=self.symbol, period=timedelta(1), direction=InsightDirection.DOWN) return self.insight else: if self.long_entry_conditions_met: self.insight = Insight.price(symbol=self.symbol, period=timedelta(1), direction=InsightDirection.UP) return self.insight if self.short_entry_conditions_met: self.insight = Insight.price(symbol=self.symbol, period=timedelta(1), direction=InsightDirection.DOWN) return self.insight return @property def long_entry_conditions_met(self) -> bool: gap = (self.noise_area.day_open - self.noise_area.previous_day_close) / self.noise_area.previous_day_close if self.noise_area.previous_day_close != 0 else -1. #--------------------------------- # Arthur, das sind Prior Day Filter, die man immer wieder benutzen kann - wie kann man das modularisieren, zB mit einer Logik ähnlich wie in den Global Signals? prior_days_condition = ( gap > self.gap_min_long and #self.security.price < self.regime_max.Current.Value and #self.security.price < self.regime_ma.Current.Value and #self.security.price < self.regime_min.Current.Value and #self.security.price > self.regime_max.Current.Value and #self.security.price > self.regime_ma.Current.Value and #self.security.price > self.regime_min.Current.Value and True) #--------------------------------- current_day_condition = ( self.security.price > self.noise_area.upper_bound and #self.time_sma.Current.Value > self.noise_area.upper_bound and self.security.price > self.vwap.value * (1 + self.vwap_entry_tol) and #self.time_sma.Current.Value > self.vwap.value * (1 + self.vwap_entry_tol) and True) exit_preventing_condition = ( self.security.price > self.trailing_ema.Current.Value and #self.security.price > self.trailing_ema.Current.Value * (1 + self.trailing_ema_exit_tol) and True) # Arthur, wie kann ich hier auf den Case Filter zugreifen? case_filter_condition = True #case_filter_condition = self.algorithm.global_case_filter.check_condition(self.algorithm.config.global_case_filter_condition) if prior_days_condition and current_day_condition and exit_preventing_condition and case_filter_condition and self.algorithm.config.directional_bias >= 0: return True return False @property def short_entry_conditions_met(self) -> bool: gap = (self.noise_area.day_open - self.noise_area.previous_day_close) / self.noise_area.previous_day_close if self.noise_area.previous_day_close != 0 else +1. prior_days_condition = ( gap < self.gap_max_short and #self.security.price < self.regime_max.Current.Value and #self.security.price < self.regime_ma.Current.Value and # gutes ergebnis #self.security.price < self.regime_min.Current.Value and self.security.price > self.regime_max.Current.Value and #self.security.price > self.regime_ma.Current.Value and # gleichmässig #self.security.price > self.regime_min.Current.Value and True) current_day_condition = ( self.security.price < self.noise_area.lower_bound and #self.time_sma.Current.Value < self.noise_area.lower_bound and self.security.price < self.vwap.value * (1 - self.vwap_entry_tol) and #self.time_sma.Current.Value < self.vwap.value * (1 - self.vwap_entry_tol) and True) exit_preventing_condition = ( self.security.price < self.trailing_ema.Current.Value and #self.security.price < self.trailing_ema.Current.Value * (1 - self.trailing_ema_exit_tol) and True) case_filter_condition = True #case_filter_condition = self.algorithm.global_case_filter.check_condition(self.algorithm.config.global_case_filter_condition) if prior_days_condition and current_day_condition and exit_preventing_condition and case_filter_condition and self.algorithm.config.directional_bias <= 0: return True return False @property def exit_conditions_met(self) -> bool: if self.insight.direction is InsightDirection.UP: exit_standard_condition = ( (self.security.price < self.trailing_ema.Current.Value * (1 + self.trailing_ema_exit_tol)) or False) exit_alpha_condition = ( #self.security.price < self.noise_area.upper_bound * (1 + self.noise_area_exit_tol) or #self.security.price < self.vwap.value * (1 + self.vwap_exit_tol) or (self.security.price < self.vwap.value * (1 + self.vwap_exit_tol) and self.time_sma.Current.Value < self.vwap.value * (1 + self.vwap_exit_tol)) or False) if exit_standard_condition or exit_alpha_condition: return True if self.insight.direction is InsightDirection.DOWN: exit_standard_condition = ( (self.security.price > self.trailing_ema.Current.Value * (1 - self.trailing_ema_exit_tol)) or False) exit_alpha_condition = ( #self.security.price > self.noise_area.lower_bound * (1 - self.noise_area_exit_tol) or #self.security.price > self.vwap.value * (1 - self.vwap_exit_tol) or (self.security.price > self.vwap.value * (1 - self.vwap_exit_tol) and self.time_sma.Current.Value > self.vwap.value * (1 - self.vwap_exit_tol)) or False) if exit_standard_condition or exit_alpha_condition: return True if self.security.exchange.is_closing_soon(minutes_to_close=1): exit_eod_condition = ( self.algorithm.config.eod_exit or self.insight.direction is InsightDirection.UP and self.security.price < self.eod_ma.Current.Value or self.insight.direction is InsightDirection.DOWN and self.security.price > self.eod_ma.Current.Value or False) if exit_eod_condition: return True return False @property def can_emit_insight(self) -> bool: if not self.security.is_tradable: return False if not self.security.exchange.exchange_open: return False if not self.security.has_data: return False if self.algorithm.time.time() < self.algorithm.config.trading_start_time: return False if self.algorithm.time.time() >= self.algorithm.config.trading_end_time and self.insight.direction is InsightDirection.FLAT: return False if self.security.exchange.is_closing_soon(minutes_to_close=1) and self.insight.direction is InsightDirection.FLAT: return False return True
#region imports from AlgorithmImports import * #endregion class GlobalCaseFilter: """ Applies a boolean filter based on the input variable using a dictionary of conditions. Condition 0 returns always True. Usage: def initialize(algorithm): global_case_filter = CaseFilter(algorithm) result = global_case_filter.check_condition(3) """ def __init__(self, algorithm): self.algorithm = algorithm # Placeholder condition methods def condition_1(self): return True def condition_2(self): return False def check_condition(self, input_var: int) -> bool: conditions = { # Direct boolean values 0: False, # always False as a benchmark and for up:True / down:not False 1: True, # always True as a benchmark and for up:True / down:not False # Existing variables 2: self.algorithm.Vix_less_SMA1, # Method references 98: self.condition_1, 99: self.condition_2, } # We can (a) call the condition method reference or (b) evaluate the direct boolean condition if input_var in conditions: condition = conditions[input_var] return condition() if callable(condition) else condition else: return False class GlobalSignals: """ Creates global indicators and manages their update. Usage: def initialize(algorithm): global_signals = GlobalSignals(algorithm) def OnData(algorithm, data: Slice): algorithm.global_signals.OnData(data) """ def __init__(self, algorithm): self.algorithm = algorithm # vix self.vix = algorithm.AddIndex("VIX").Symbol self.vix_sma = 0 # Arthur, kannst Du dies hier bitte ergänzen? Ich hatte einen Mittelwert auf ein deque genutzt, geht aber sicher schlanker? # make results available globally self.algorithm.Vix_Value = 0 self.algorithm.Vix_less_SMA = False def OnData(self, slice): # vix if slice.ContainsKey(self.vix): self.algorithm.Vix_Value = slice[self.vix].Close self.algorithm.Vix_less_SMA = self.algorithm.Vix_Value <= self.vix_sma
# region imports from AlgorithmImports import * from itertools import repeat from toolbox import TimeFrameHelper # endregion class NoiseAreaIndicator(PythonIndicator): def __init__(self, tf_helper: TimeFrameHelper, period=63, scaling_factor=1.0, gap_stretch_factor = 1.0): self.time = datetime.min self.value = 0 self.period = period # tf_helper.quarter self.warm_up_period = int(tf_helper.day*self.period) + 1 self.count = 0 self.first_bar_of_day = TradeBar(time=self.time, symbol=None, open=0, high=0, low=0, close=0, volume=0) self.day_open = 0 self.previous_close = 0 self.previous_day_open = 0 self.previous_day_close = 0 self.upper_bound_by_time = dict.fromkeys(range(1, tf_helper.day + 1), 0) self.lower_bound_by_time = dict.fromkeys(range(1, tf_helper.day + 1), 0) self.upper_bound = 0 self.lower_bound = 0 self.latest_time_for_reset = time(9,35) # Wert erhöht self.sigma_by_time = dict(zip(range(1, tf_helper.day + 1), repeat(SimpleMovingAverage(self.period), tf_helper.day))) self.scaling_factor = scaling_factor self.gap_stretch_factor = gap_stretch_factor def update(self, data: TradeBar) -> bool: if self.first_bar_of_day.time.day != data.end_time.day: if data.end_time.time() > self.latest_time_for_reset: # Arthur, hier sollten wir eine Fehlermeldung ausgeben #return pass self.previous_day_open = self.day_open self.previous_day_close = self.previous_close self.first_bar_of_day = data self.day_open = self.first_bar_of_day.open abs_move = abs(data.close / self.first_bar_of_day.open - 1) minutes_elapsed = int((data.end_time - self.first_bar_of_day.time).total_seconds() // 60) self.sigma_by_time[minutes_elapsed].update(data.end_time, abs_move) upper_bound_reference = lower_bound_reference = self.first_bar_of_day.open if self.previous_day_close is not None: #upper_bound_reference = max(upper_bound_reference, self.previous_day_close) upper_bound_reference = upper_bound_reference + max(0, self.previous_day_close-upper_bound_reference) * self.gap_stretch_factor #lower_bound_reference = min(lower_bound_reference, self.previous_day_close) lower_bound_reference = lower_bound_reference - max(0, lower_bound_reference-self.previous_day_close) * self.gap_stretch_factor self.upper_bound = upper_bound_reference * (1 + self.sigma_by_time[minutes_elapsed].current.value * self.scaling_factor) # scaling nur auf sigma angewandt self.upper_bound_by_time[minutes_elapsed] = self.upper_bound #lower_bound_reference = self.first_bar_of_day.open self.lower_bound = lower_bound_reference * (1 - self.sigma_by_time[minutes_elapsed].current.value * self.scaling_factor) # scaling nur auf sigma angewandt self.lower_bound_by_time[minutes_elapsed] = self.lower_bound self.previous_close = data.close self.count += 1 return self.is_ready @property def is_ready(self) -> bool: return self.count > self.warm_up_period def reset(self): self.time = datetime.min self.value = 0 class IntradayVWAP(PythonIndicator): def __init__(self, name='VWAP'): self.name = name self.value = 0 self.time = datetime.min self.sum_of_volume = 0 self.sum_of_dollar_volume = 0 self.count = 0 self.warm_up_period = 1 def update(self, data: TradeBar) -> bool: if data.is_fill_forward: return self.is_ready if data.end_time.day != self.time.day: self.sum_of_volume = 0 self.sum_of_dollar_volume = 0 self.count = 0 avg_price = (data.high + data.low + data.close) / 3 self.sum_of_volume += data.volume self.sum_of_dollar_volume += avg_price * data.volume if self.sum_of_volume == 0: self.value = data.value return self.is_ready self.value = self.sum_of_dollar_volume / self.sum_of_volume self.time = data.end_time self.count += 1 return self.is_ready @property def is_ready(self) -> bool: return self.sum_of_volume > 0 and self.count >= 1
# region imports from AlgorithmImports import * from toolbox import read_config from security_init import IbkrSecurityInitializer from alpha import CustomAlphaModel from global_signals import GlobalSignals, GlobalCaseFilter from pcm_execution import MultiAlphaHelpers, MultiAlphaAveragingDirectionPCM, MultiAlphaMinQuantityChangeExecutionModel # endregion class ConcretumIntradayMomentumStrategy(QCAlgorithm): def initialize(algorithm): config = read_config(algorithm) # Backtest algorithm.set_start_date(config.start_date) algorithm.set_end_date(config.end_date) algorithm.set_cash(config.initial_capital) algorithm.set_brokerage_model(BrokerageName.INTERACTIVE_BROKERS_BROKERAGE, AccountType.MARGIN) algorithm.set_security_initializer(IbkrSecurityInitializer(algorithm, algorithm.brokerage_model, FuncSecuritySeeder(algorithm.get_last_known_price))) # configure settings of each security individually # Other algorithm.set_risk_free_interest_rate_model(ConstantRiskFreeRateInterestRateModel(0)) algorithm.settings.free_portfolio_value_percentage = algorithm.config.freePortfolioValuePercentage # Universe for ticker in config.tickers: security = algorithm.add_equity(ticker, resolution=Resolution.MINUTE, fill_forward=True, leverage=algorithm.config.leverage, extended_market_hours=False) # Benchmark algorithm.myBenchmark = algorithm.config.myBenchmark algorithm.SetBenchmark(algorithm.myBenchmark) # Alpha Models algorithm.add_alpha(CustomAlphaModel()) #algorithm.set_portfolio_construction(EqualWeightingPortfolioConstructionModel(lambda t: None)) #algorithm.set_execution(ImmediateExecutionModel()) # Multi Alpha PCM and Execution algorithm.ma_helpers = MultiAlphaHelpers(algorithm) algorithm.set_portfolio_construction(MultiAlphaAveragingDirectionPCM(algorithm, rebalance=Resolution.Daily, portfolioBias=PortfolioBias.LongShort, long_factor=algorithm.config.long_factor, short_factor=algorithm.config.short_factor, use_multi_alpha_insights=True, use_direction_averaged_weighting=True, max_percentage_per_position=algorithm.config.max_percentage_per_position)) algorithm.set_execution(MultiAlphaMinQuantityChangeExecutionModel( algorithm.config.minimumOrderQuantityChangePercentage)) # Global Signals algorithm.global_signals = GlobalSignals(algorithm) algorithm.global_case_filter = GlobalCaseFilter(algorithm) """ def OnData(algorithm, data: Slice): algorithm.ma_helpers.OnData(data) # list stock splits and dividends algorithm.global_signals.OnData(data) # update global signals """
# region imports from AlgorithmImports import * from collections import defaultdict # endregion #---------------------------------------------------------------------------------------- # # Multi Alpha Model Helpers # class MultiAlphaHelpers: """ Provide OnData and configure the basic settings for MultiAlphaAveragingDirectionPCM and MinQuantityChangeImmediateExecutionModel Usage: def initialize(algorithm): algorithm.ma_helpers = MultiAlphaHelpers(algorithm) def OnData(algorithm, data: Slice): algorithm.ma_helpers.OnData(data) """ def __init__(self, algorithm): self.algorithm = algorithm self.ApplyStandardSettings() def ApplyStandardSettings(self): ## PCM # Enable rebalances when the Alpha model emits insights or when insights expire in PCM. For testing, set to False. self.algorithm.Settings.RebalancePortfolioOnInsightChanges = True # Default = True # Enable rebalances when security changes occur in PCM. For testing, set to False. self.algorithm.Settings.RebalancePortfolioOnSecurityChanges = True # Default = True # Min order margin portfolio percentage to ignore bad orders and orders with small sizes in PCM. For testing, set to 0. self.algorithm.Settings.MinimumOrderMarginPortfolioPercentage = 0.003 # Default = 0.001, better to use a min order margin of $300 for a $100_000 portfolio size # Define long and short multipliers which are used in the PCM. For testing, set to 1.0. self.algorithm.long_factor = 1.0 self.algorithm.short_factor = 1.0 ## Execution # Min order quantity change to ignore bad orders and orders with small sizes in EXECUTION. For testing, set to 0. self.algorithm.minimumOrderQuantityChangePercentage = 0.10 # Custom minimum order quantity change percentage of at least 10% of the currently held quantity def OnData(self, slice: Slice): """ Test data # MLI: Forward stock split 2 for 1 on 23.10.2023 # ADXN: Reverse stock split 1 for 20 on 23.10.2023 # 2023-09-01 00:00:00 2023-09-01 00:00:00 OnSecuritiesChanged received a removal for WGOV R735QTJ8XC9X. # 2023-09-01 00:00:00 2023-09-01 00:00:00 SymbolData disposed a WGOV R735QTJ8XC9X with 1. TODO Delistings etc. in depth testing https://www.quantconnect.com/docs/v2/writing-algorithms/securities/asset-classes/us-equity/corporate-actions TODO If you have indicators in your algorithm, reset and warm-up your indicators with ScaledRaw data when splits occur so that the data in your indicators account for the price adjustments that the splits cause. https://www.quantconnect.com/docs/v2/writing-algorithms/indicators/key-concepts#10-Reset-Indicators For notification in live mode, please check out this doc for reference to implementation as the "# notification action" in the attached backtest. This information is provided by QuantConnect, and is still available even if you choose other brokerages such as IB, as long as you chose QuantConnect data feed which is only available on QuantConnect Cloud. Alternatively, you can subscribe to the Security Master dataset, and use Lean-CLI to update the data every day to get the splits and dividents. https://www.quantconnect.com/forum/discussion/12273/will-i-get-split-dividends-events-on-live-if-i-am-using-interactive-brokers-data-feed/p1 """ """ ## Stock splits # TODO check if we have the first candle of the day + test if events come once a day or more often if self.algorithm.Time.hour == 9 and self.algorithm.Time.minute == 31: for kvp in slice.Splits: symbol = kvp.Key #self.algorithm.Debug(f'{self.algorithm.Time} OnData received a split event for {symbol}.') ''' # Handle stock splits for all alpha models with a 'ResetAndWarmUpIndicators' method in their SymbolData # TODO in life mode: refresh all indicators daily to ensure we have most recent historical data? Is a reco of Jared from 2017 for alphaModel in self.algorithm.instantiated_alpha_models: if hasattr(alphaModel, 'symbol_data') and symbol in alphaModel.symbol_data and hasattr(alphaModel.symbol_data[symbol], 'ResetAndWarmUpIndicators'): modelName = getattr(alphaModel, 'Name', type(alphaModel).__name__) #self.algorithm.Debug(f'{self.algorithm.Time} OnData handled a split event for {symbol} in {modelName}.') alphaModel.symbol_data[symbol].ResetAndWarmUpIndicators() ''' ## Dividends # TODO check if we have the first candle of the day + test if events come once a day or more often if self.algorithm.Time.hour == 9 and self.algorithm.Time.minute == 31: for kvp in slice.Dividends: symbol = kvp.Key #self.algorithm.Debug(f'{self.algorithm.Time} OnData received a dividend event for {symbol}.') """ pass #---------------------------------------------------------------------------------------- # # Multi Alpha Averaging Direction PCM # class MultiAlphaAveragingDirectionPCM(PortfolioConstructionModel): """ This PCM is designed to combine active insights from multiple Alpha Models based on the 'insight.Direction' using two methods: (1) Equal weighting of each insight 1 We allocate 100% equally weighted to each active insight (2) Directional averaging of each insight per symbol 1 We allocate 100% equally weighted to each symbol 2 We multiply the symbol share with the average direction from all insights for a symbol (value from -1 .. +1) For further processing, we then distribute this result to all active insights Insight Requirements: Active: Insight must not be expired Latest Insight per Alpha Model: Insight used is the most recent insight from its Alpha Model for a given symbol 'insight.Direction': The direction property is used to caclulate the portfolio share Effects of active insights from several Alpha Models for one symbol: 'insight.Direction' is long: Vote for a bullish portfolio weight. If we have 1 long insight, the weight will be 100%. 'insight.Direction' is short: Vote for a bearish portfolio weight. If we have 1 long and 1 short insight, the weight will be 0%. 'insight.Direction' is not active: Don't vote at all. If we have 2 long insights and a third Alpha Model does not vote, the weight will be 2/2 = 100%. 'insight.Direction' is flat: Vote for a neutral portfolio weight. If we have 2 long and 1 neutral insights, the weight will be 2/3 = 66.7%. !!! Note: This means that insights must be emitted as long as the Alpha Model sees a certain direction, not just once for an entry!!! Parameters and Switches: 'portfolioBias': Insight must align with the portfolio bias 'long_factor' and 'short_factor': To adjust the quantity in the portfolio 'use_multi_alpha_insights': Switch to activate the grouping of insights by symbol and apha model 'use_direction_averaged_weighting': Switch for (1) equal weighting or (2) directional averaging 'max_percentage_per_position': The resulting position size must be within the specified portfolio limits Implementation It overrides all common methods of the base class. Changes are made in the GetTargetInsights and the DetermineTargetPercent methods as suggested in the QC documentation. https://www.quantconnect.com/docs/v2/writing-algorithms/algorithm-framework/portfolio-construction/key-concepts GetTargetInsights: To combine the active insights differently, the GetTargetInsights returns all active insights. DetermineTargetPercent: Target weights are beeing derived based on the average direction of all active insights from all Alpha Models for a symbol. Usage: self.SetPortfolioConstruction(MultiAlphaAveragingDirectionPCM(self)) """ def __init__(self, algorithm, rebalance=Resolution.Daily, portfolioBias=PortfolioBias.LongShort, long_factor=1., short_factor=0.6, use_multi_alpha_insights=True, use_direction_averaged_weighting=True, max_percentage_per_position=0.1): super().__init__() self.algorithm = algorithm self.portfolioBias = portfolioBias self.use_multi_alpha_insights = use_multi_alpha_insights self.use_direction_averaged_weighting = use_direction_averaged_weighting # Define long and short multipliers self.long_factor = long_factor self.short_factor = short_factor # Define max percentage of portfolio of one security per position self.max_percentage_per_position = max_percentage_per_position def CreateTargets(self, algorithm, insights): """ Generates portfolio targets based on active insights from multiple Alpha Models. This method aggregates multiple insights per symbol into a single portfolio target, applying leverage and specified long/short factors. The resulting target ensures that the portfolio aligns with the combined directional insights provided by different Alpha Models while respecting a maximum percentage allocation per position. """ ## Get targets from insights using the base model targets_per_insight = super().CreateTargets(algorithm, insights) # Return, if no targets if len(targets_per_insight) == 0: return targets_per_insight # same as return [] ## Aggregate several targets per symbol to only one target per symbol # Note: Immediate Execution model fills a PortfolioTargetCollection dict(k=Symbol,v=PortfolioTarget) using AddRange, commented as "If a target for the same symbol already exists it will be overwritten." # So we have to ensure only one target per symbol is returned here. targets_per_symbol = defaultdict(int) for x in targets_per_insight: # Determine long_short_factor long_short_factor = self.long_factor if x.Quantity > 0 else self.short_factor # Apply leverage and the long_short_factor and aggregate adjusted_quantity = x.Quantity * algorithm.Securities[x.Symbol].Leverage * long_short_factor targets_per_symbol[x.Symbol] += adjusted_quantity ## Limit the quantity to the max quantity per security # Create new PortfolioTargets with aggregated quantities if not self.max_percentage_per_position: # Create new PortfolioTargets without limited quantities targets = [PortfolioTarget(symbol, quantity) for symbol, quantity in targets_per_symbol.items()] else: # Create new PortfolioTargets with quantities limited by max percentage total_portfolio_value = algorithm.Portfolio.TotalPortfolioValue max_value = total_portfolio_value * self.max_percentage_per_position targets = [PortfolioTarget(symbol, 0) if algorithm.Securities[symbol].Price == 0 else PortfolioTarget(symbol, np.sign(quantity) * int(min(abs(quantity), max_value / algorithm.Securities[symbol].Price))) for symbol, quantity in targets_per_symbol.items()] return targets def GetTargetInsights(self) -> List[Insight]: """ Gets the last generated active insight for each symbol """ # Get all insights from the algorithm that haven't expired yet, for each symbol that is still in the universe activeInsights = self.algorithm.Insights.GetActiveInsights(self.algorithm.UtcTime) if self.use_multi_alpha_insights: ## GetTargetInsights by symbol and model # Group insights by symbol and apha model using a nested defaultdict keyed by symbol and then source model; value = latest insight last_insights_per_symbol_model = defaultdict(lambda: defaultdict(lambda: None)) # Iterate over each active insight and store it, if the insight is more recent than the currently stored one for its symbol and source model for insight in activeInsights: if insight.CloseTimeUtc >= self.algorithm.UtcTime: # only consider insights that are not outdated current_stored_insight = last_insights_per_symbol_model[insight.Symbol][insight.SourceModel] # Check if we already have a stored insight for this symbol and model, and if the new one is more recent if current_stored_insight is None or insight.GeneratedTimeUtc > current_stored_insight.GeneratedTimeUtc: last_insights_per_symbol_model[insight.Symbol][insight.SourceModel] = insight # Flatten the nested dictionary to get a list of the latest active insights from each model for each symbol self.insights = [insight for symbol_insights in last_insights_per_symbol_model.values() for insight in symbol_insights.values()] else: ## GetTargetInsights by symbol only # Group insights by symbol and get the last generated insight for each symbol last_insights_per_symbol = defaultdict(list) for insight in activeInsights: last_insights_per_symbol[insight.Symbol].append(insight) # Select the last generated active insight for each symbol self.insights = [sorted(insights, key=lambda x: x.GeneratedTimeUtc)[-1] for insights in last_insights_per_symbol.values()] return self.insights def DetermineTargetPercent(self, activeInsights: List[Insight]) -> Dict[Insight, float]: """ Determines the target percentage allocation for each active insight based on the selected weighting method. The process considers various factors such as the portfolio bias, the direction of insights, and whether direction averaging or equal weighting is applied. The final output is a dictionary mapping each active insight to its corresponding portfolio target percentage. Parameters: activeInsights : List[Insight] A list of active insights that have not expired and are generated by various Alpha Models. Returns: A dictionary where each key is an active insight and the value is the target portfolio percentage allocated to that insight. Implementation Notes: The method calculates the percentage allocation for each insight considering the number of active insights and their respective directions. The resulting portfolio allocation respects the constraints imposed by the portfolio bias and maximum position size. The portfolio target percentage can be positive (long), negative (short), or zero (flat), depending on the calculated insights and the portfolio's overall strategy. """ # Define the threshold for the expiry date comparison (4 days) expiry_threshold = timedelta(days=4) if self.use_direction_averaged_weighting == False: ## 'Equal Weighting' of each insight # Same as EqualWeighting https://github.com/QuantConnect/Lean/blob/master/Algorithm.Framework/Portfolio/EqualWeightingPortfolioConstructionModel.cs#L118 insights_count = sum(1 for insight in activeInsights if insight.Direction != InsightDirection.Flat and self.RespectPortfolioBias(insight)) # we count all insights pct_by_insight = {insight: 1. / insights_count if insights_count > 0 else 0 for insight in activeInsights if self.RespectPortfolioBias(insight)} # we allocate 100% equally weighted to each insight else: ## 'Direction Averaged Weighting' per source Alpha model of each insight insights_count = 0 symbol_insight_count = defaultdict(int) symbol_insight_dir_sum = defaultdict(int) for insight in activeInsights: insights_count += 1 # we count all insights symbol_insight_count[insight.Symbol] += 1 # we count all insights per symbol symbol_insight_dir_sum[insight.Symbol] += insight.Direction # we add up all insight directions per symbol symbols_count = len(symbol_insight_count) # Arthur, bitte hier Zugriff auf die Vola ermöglichen # Step 1: we allocate 100% EQUALLY weighted to each symbol to get the symbol share using (1. / symbols_count) # Step 2: we multiply the symbol share with the average direction of this symbol (value from -1 .. +1) using (direction_sum / symbol_insight_count) # Step 3: as targetPercent is indexed by insight, we may have several insights per symbol and therefore need to distribute the result per symbol to each insight of this symbol using (1. / symbol_insight_count) pct_by_symbol = {symbol: (1./symbols_count) * (direction_sum / symbol_insight_count[symbol]) * (1./symbol_insight_count[symbol]) if symbol_insight_count[symbol] > 0 else 0 for symbol, direction_sum in symbol_insight_dir_sum.items()} # Fill the target percent dict with the calculated percents for each insight targetPercent = {} for insight in activeInsights: if self.use_direction_averaged_weighting == False: ## 'Equal Weighting' of each insight # We apply percents indexed by insight percent = pct_by_insight.get(insight, 0) targetPercent[insight] = percent else: ## 'Direction Averaged Weighting' per source Alpha model of each insight # We apply percents indexed by symbol percent = pct_by_symbol.get(insight.Symbol, 0) # We need to switch the sign of the weight, if the signs of insight direction and weight are not the same if percent * insight.Direction < 0: percent = -percent # If the portfolio bias and the sign of the weight are not the same, we need to filter by neglecting the weight # We do this 'late' in the process, so we use an adverse direction in the averaging differently than 'Flat', even if we never enter in that direction # This has to be conceptionally balanced with the Alpha Models (a) only emitting insights in case of entry (b) constantly emitting insights also in case of flat if self.portfolioBias != PortfolioBias.LongShort and percent * self.portfolioBias < 0: percent = 0 targetPercent[insight] = percent return targetPercent #---------------------------------------------------------------------------------------- # # Minimum Changed Quantity ExecutionModel # class MultiAlphaMinQuantityChangeExecutionModel(ExecutionModel): """ An execution model that immediately submits market orders to achieve the desired portfolio targets, if the change in quantity is significant enough based on a specified threshold. This helps avoid executing insignificant trades. Based on ImmediateExecutionModel, added: AboveMinimumQuantityChange to check if the quantity alters the current holdings by at least minimumOrderQuantityChangePercentage of the currently held quantity 'minimumOrderQuantityChangePercentage': The minimum percentage change in quantity required to execute an order, relative to the currently held quantity Usage: self.SetExecution(MultiAlphaMinQuantityChangeExecutionModel(minimumOrderQuantityChangePercentage=0.10)) """ def __init__(self, minimumOrderQuantityChangePercentage=0.10): # Initializes a new instance of the ImmediateExecutionModel class self.minimumOrderQuantityChangePercentage = minimumOrderQuantityChangePercentage self.targetsCollection = PortfolioTargetCollection() def Execute(self, algorithm, targets): """ Immediately submits orders for the specified portfolio targets Implementation: The method first adds the incoming targets to the internal `targetsCollection`. It then iterates over the targets, checking if the quantity to be ordered meets both the minimum order margin and the minimum quantity change criteria. If both criteria are met, a market order is submitted for the target quantity. After execution, fulfilled targets are removed from the collection. """ # for performance we check count value, OrderByMarginImpact and ClearFulfilled are expensive to call self.targetsCollection.AddRange(targets) if not self.targetsCollection.IsEmpty: for target in self.targetsCollection.OrderByMarginImpact(algorithm): security = algorithm.Securities[target.Symbol] # calculate remaining quantity to be ordered quantity = OrderSizing.GetUnorderedQuantity(algorithm, target, security) if quantity != 0: aboveMinimumPortfolio = BuyingPowerModelExtensions.AboveMinimumOrderMarginPortfolioPercentage(security.BuyingPowerModel, security, quantity, algorithm.Portfolio, algorithm.Settings.MinimumOrderMarginPortfolioPercentage) aboveMinimumQuantityChange = self.AboveMinimumQuantityChange(security, quantity, algorithm, self.minimumOrderQuantityChangePercentage) #if aboveMinimumPortfolio: if aboveMinimumPortfolio and aboveMinimumQuantityChange: algorithm.MarketOrder(security, quantity) elif not PortfolioTarget.MinimumOrderMarginPercentageWarningSent: # will trigger the warning if it has not already been sent PortfolioTarget.MinimumOrderMarginPercentageWarningSent = False self.targetsCollection.ClearFulfilled(algorithm) def AboveMinimumQuantityChange(self, security, quantity, algorithm, minimumOrderQuantityChangePercentage=0.1): """ Returns True, if the calculated percentage change in quantity is greater than or equal to the specified minimum percentage False, if the quantity does not alter the current holdings by at least minimumOrderQuantityChangePercentage """ # Calculate the percentage change in quantity relative to current holdings currentHoldings = security.Holdings.Quantity if currentHoldings == 0: # If there are no current holdings, any quantity is significant return True # Calculate the percentage change percentage_change = abs(quantity) / abs(currentHoldings) # Check if the change is above the minimum threshold return percentage_change >= minimumOrderQuantityChangePercentage
# region imports from AlgorithmImports import * # endregion class IbkrSecurityInitializer(BrokerageModelSecurityInitializer): def __init__(self, algorithm: QCAlgorithm, brokerage_model: IBrokerageModel, security_seeder: ISecuritySeeder) -> None: self.algorithm = algorithm super().__init__(brokerage_model, security_seeder) def initialize(self, security: Security) -> None: super().initialize(security) security.set_shortable_provider(InteractiveBrokersShortableProvider()) if not self.algorithm.config.costs_enabled: #security.set_slippage_model(NullSlippageModel()) #security.set_slippage_model(HalvedSpreadSlippageModel()) security.set_slippage_model(FullSpreadSlippageModel()) security.set_fee_model(ConstantFeeModel(0)) class HalvedSpreadSlippageModel: def GetSlippageApproximation(self, asset: Security, order: Order) -> float: slippage = 0 if order.type is OrderType.MARKET: # Arthur, ich habe hier das Vorzeichen verändert, da ja durch Slippage die PnL schlechter werden sollte (sie wurde besser) slippage = +0.5 * max(0, (asset.ask_price - asset.bid_price)) return slippage class FullSpreadSlippageModel: def GetSlippageApproximation(self, asset: Security, order: Order) -> float: slippage = 0 if order.type is OrderType.MARKET: slippage = +1.0 * max(0, (asset.ask_price - asset.bid_price)) return slippage ''' class ZeroSlippageFillModel(FillModel): def market_fill(self, security: Security, order: Order) -> OrderEvent: fill = super().market_fill(security, order) fill_price = security.bid_price if order.quantity > 0 else security.ask_price fill.fill_price = fill_price return fill def combo_market_fill(self, order: Order, parameters: FillModelParameters) -> List[OrderEvent]: fills = super().combo_market_fill(order, parameters) for kvp, fill in zip(sorted(parameters.securities_for_orders, key=lambda x: x.Key.Id), fills): _security = kvp.value fill_Price = _security.bid_price if fill.fill_quantity > 0 else _security.ask_price fill.fill_price = fill_price return fills def stop_market_fill(self, security: Security, order: StopMarketOrder) -> OrderEvent: fill = super().stop_market_fill(security, order) fill_price = security.bid_price if order.quantity > 0 else security.ask_price fill.fill_price = fill_price return fill '''
# region imports from AlgorithmImports import * from pydantic import BaseModel, ConfigDict from algo_config import AlgorithmConfig # endregion def read_config(algorithm: QCAlgorithm) -> AlgorithmConfig: params = {param.key.lower(): param.value for param in algorithm.get_parameters()} algo_config = AlgorithmConfig(**params) algorithm.config = algo_config QCAlgorithm.config = algo_config return algo_config class ExtendedBaseModel(BaseModel): model_config = ConfigDict(arbitrary_types_allowed=True) class TimeFrameHelper: def __init__(self, security: Security, resolution: Resolution): bars_per_day = max(1, security.exchange.hours.regular_market_duration.total_seconds() / Extensions.to_time_span(resolution).total_seconds()) self.year = int(round(bars_per_day * security.exchange.trading_days_per_year, 0)) self.half = int(round(self.year/2, 0)) self.quarter = int(round(self.year/4, 0)) self.twomonths = int(round(self.year/6, 0)) self.month = int(round(self.year/12, 0)) self.week = int(round(self.year/52, 0)) self.day = int(round(bars_per_day, 0))