Overall Statistics |
Total Orders 2489 Average Win 1.10% Average Loss -0.51% Compounding Annual Return 20.463% Drawdown 16.100% Expectancy 0.293 Start Equity 100000 End Equity 602713.25 Net Profit 502.713% Sharpe Ratio 1.415 Sortino Ratio 1.415 Probabilistic Sharpe Ratio 89.472% Loss Rate 59% Win Rate 41% Profit-Loss Ratio 2.13 Alpha 0.12 Beta 0.228 Annual Standard Deviation 0.101 Annual Variance 0.01 Information Ratio 0.297 Tracking Error 0.149 Treynor Ratio 0.628 Total Fees $0.00 Estimated Strategy Capacity $21000000.00 Lowest Capacity Asset QQQ RIWIV7K5Z9LX Portfolio Turnover 99.79% |
# region imports from AlgorithmImports import * # endregion ''' 1. Iteration Nochmal großen Dank, das war wieder extrem hilfreich und gut strukturiert! Beim einen oder andere Code Schnipsel wäre es hilfreich, wenn wir den nochmal durchgehen und ich Kommentare ergänze. 2. Iteration - grünes Licht für diese Punkte: Entry-Logik -------------- Hier die Punkte vom letzten Mal. Zur Re-Entry nur nach mindestens x Minuten - ist aus meienr Sicht doch nicht nötig, da man es über die Toleranzen steuern kann. Task ------ Entry nur zwischen Start- und Endzeit - bitte die Ende-Zeit noch in config.json übernehmen und mir erklären wie das geht ;) Entry-Modularisierung der Daten der Vortage ------------------------------------------- Modularisierung Task (ca. 2 h) ---------------- Bitte eine Logik ergänzen, mit der ich Filter anlegen kann, auf die ich über einen Integer Parameter abprüfen kann. Dazu habe ich mal meine GlobalSignals class beigefügt, da ist eine Logik mit einem Dictionary drin welche True oder False wiedergibt. Du kannst gern aber auch was Schlankeres nehmen. Siehe im Code: # Arthur, das sind Filter, die man immer wieder benutzen kann - wie kann man das modularisieren, zB mit einer Logik ähnlich wie in den Global Signals? Task (ca. 2 h) ----------------- Bitte einen Zugriff auf folgende Datenpunkte verfügbar machen. Idealerweise in einem in wiederverwendbaren Moduloder oder auch einem Monster-Indikator (es Security spezifische Datenpunkte). EMA(close, daily, n days) EMA(close, 4 Stunden, n perioden) EMA(volume, daily, n days) ATR(daily, n days) Properties value_area_high und value_area_low aus dem Volume Profile des Vortages (https://www.quantconnect.com/docs/v2/writing-algorithms/indicators/supported-indicators/volume-profile) Pre-Market High und Low. Dazu musst Du wohl auf extended Market Hours umswitchen. O, H, L, C der letzten 5 Sessions, gern indiziert zB closeday[0] ist der Close von gestern. Würde zB diese Logik passen oder hast Du eine bessere idee? self.close = self.algorithm.SMA(self.symbol, 1) self.high = self.algorithm.MAX(self.symbol, 1) self.low = self.algorithm.MIN(self.symbol, 1) self.close.Window.Size = 5 self.high.Window.Size = 5 self.low.Window.Size = 5 Exit-Logik ------------ Hier vom letzten Mal. Task (ca. 1.5 h) -------------------- Indicator: Bitte die Toleranzen auf x * ATR,daily ausbauen Architecture: Bitte ein Modul ergänzen, in dem ich dann verschiedene Exit-Kriterien ergänzen kann. Siehe im Code: # Arthur, das ist nur exemplarisch - wie kann man die SL Logic modularisieren, zB einen Indicator? PCM & Execution ---------------------- Da Du das EqualWeighting PCM genutzt hattest, habe ich mal das MultAlpha PCM eingebaut, denn es setzt auch auf dem EqualWeighting PCM auf. Die Ergebnisse sind identisch. Ich bin mit der Modularisierung eigentlich soweit zufrieden. Tasks (ca. 1.5 h) ------------------------ OnData: Geht das auch ohne OnData oder wie kann man das besser aufrufen? Kannst Du hier ggf. ein Code Beispiel machen oder es direkt umsetzen? Vola Sizing: Könntest Du bitte die Volatiltiy (den ATR daily?) verfgbar machen? Dann könnte ich das nutzen, um 'switchable' ein Volatility Sizing einzubauen. Kannst Du bitte zB im Security Init noch einen Leverage Factor hinterlegen? Im PCM habe ich folgenden Code: adjusted_quantity = x.Quantity * algorithm.Securities[x.Symbol].Leverage * long_short_factor GlobalSignals ------------------ Gerne können wir das im nächsten Call besprechen, hier suche ich auch nach Hilfe beim Modularisieren. Tasks (ca. 1 h) ------------------------ Architecture: Wie kann man die globalen Variablen verfügbar machen ohne überall self.algorithm zu ergänzen? Kannst Du hier ggf. ein Code Beispiel machen oder es direkt umsetzen? Indicator: Kannst Du bitte den SMA vom VIX schlank einbauen? Siehe Code: self.vix_sma = 0 # Arthur, kannst Du dies hier bitte ergänzen? Ich hatte einen Mittelwert auf ein deque genutzt, geht aber sicher schlanker? 3. Iteration - Gerne können wir das im nächsten Call besprechen. „Life Ready“ Themen - welche ToDo's siehst Du hier noch? CFD Option Weitere Alpha-Modelle '''
# region imports from typing_extensions import Annotated from AlgorithmImports import * from pydantic import BaseModel, Field, field_serializer import dateutil from datetime import datetime, timedelta, timezone, date, time # endregion class AlgorithmConfig(BaseModel): def model_post_init(self, __context) -> None: if isinstance(self.start_date, str): self.start_date = dateutil.parser.parse(self.start_date).date() if isinstance(self.end_date, str): self.end_date = dateutil.parser.parse(self.end_date).date() if isinstance(self.tickers, str): self.tickers = [*map(str.strip, self.tickers.split(','))] @field_serializer('start_date', 'end_date') def serialize_dates(self, dt: date, _info) -> str: pass def to_string(self): self.model_dump_json() # CFD doku https://www.quantconnect.com/announcements/16792/interactive-brokers-live-cfd-support/p1 """ -4051578501 SPY SPDR S&P 500 ETF Trust North America $563,753.00 18.70% 50,833,240 $560.62 0.34% IVV iShares Core S&P 500 ETF North America $515,162.00 18.81% 4,411,436 $563.96 0.39% VOO Vanguard S&P 500 ETF North America $502,881.00 18.77% 4,912,354 $515.30 0.35% VTI Vanguard Total Stock Market ETF North America $425,315.00 17.36% 2,976,359 $276.46 0.50% QQQ Invesco QQQ Trust Series I North America $290,430.00 18.16% 35,649,760 $482.50 0.47% VUG Vanguard Growth ETF North America $136,742.00 22.25% 1,039,311 $379.07 0.47% VTV Vanguard Value ETF North America $123,006.00 14.43% 1,752,614 $168.94 0.38% AGG iShares Core U.S. Aggregate Bond ETF North America $116,585.00 3.66% 6,740,469 $100.72 0.19% BND Vanguard Total Bond Market ETF North America $114,136.00 3.73% 5,596,866 $74.72 0.21% IWF iShares Russell 1000 Growth ETF North America $96,784.60 22.13% 1,350,995 $369.34 0.42% IJH iShares Core S&P Mid-Cap ETF North America $87,292.80 10.66% 7,440,881 $61.01 1.21% IJR iShares Core S&P Small-Cap ETF North America $83,813.20 5.95% 3,545,806 $113.93 1.15% VIG Vanguard Dividend Appreciation ETF North America $83,166.30 14.08% 746,291 $192.63 0.49% VGT Vanguard Information Technology ETF North America $76,063.10 20.70% 500,006 $582.04 0.57% XLK Technology Select Sector SPDR Fund North America $70,888.20 17.59% 5,437,528 $225.58 0.59% IWM iShares Russell 2000 ETF North America $67,623.60 7.92% 31,859,480 $215.46 1.30% VO Vanguard Mid-Cap ETF North America $66,594.90 10.14% 523,689 $254.24 0.95% TLT iShares 20+ Year Treasury Bond ETF North America $62,883.00 2.18% 37,098,212 $98.73 0.06% ITOT iShares Core S&P Total U.S. Stock Market ETF North America $60,175.40 17.27% 1,296,591 $122.72 0.50% RSP Invesco S&P 500® Equal Weight ETF North America $59,787.90 9.83% 6,234,414 $172.60 0.70% SCHD Schwab US Dividend Equity ETF North America $58,693.00 10.46% 2,932,697 $82.57 0.43% IWD iShares Russell 1000 Value ETF North America $58,080.70 12.23% 1,528,567 $183.92 0.53% VB Vanguard Small Cap ETF North America $57,414.90 8.27% 597,130 $229.39 1.23% VYM Vanguard High Dividend Yield Index ETF North America $56,584.50 13.10% 788,634 $124.48 0.46% IVW iShares S&P 500 Growth ETF North America $53,311.30 25. """ tickers: str | list[str] = ["QQQ"] # 'QQQ', 'SPY', 'DIA', 'IWF', 'IWM', 'XLU', 'META', 'VTI' leverage: float = 1.5 freePortfolioValuePercentage: float = 0.01 # Default Value: 0.0025 directional_bias: int = 0 # -1=short only, +1=long only # Define long and short multipliers which are used in the PCM. For testing, set to 1. long_factor: float = 1. short_factor: float = 0.75 start_date: str | date = "2015-01-01" end_date: str | date = "2024-12-31" wfo: int initial_capital: Annotated[int, Field(strict=False, gt=0)] = 100_000 costs_enabled: bool = False trading_start_time: time = time(hour=9, minute=45) trading_end_time: time = time(hour=15, minute=59) eod_exit: bool = False # Min order margin portfolio percentage to ignore bad orders and orders with small sizes in PCM. For testing, set to 0. minimumOrderMarginPortfolioPercentage: float = 0. #minimumOrderMarginPortfolioPercentage: float = 0.003 # 0.003 using $300 for a $100_000 portfolio # Min order quantity change percentage to ignore bad orders and orders with small sizes. For testing, set to 0. #minimumOrderQuantityChangePercentage: float = 0. minimumOrderQuantityChangePercentage: float = 0.1 # Maximum spread compare to current price in percentage. For testing, set to a large percentage suach as 1. acceptingSpreadPercentRTH: float = 1. acceptingSpreadPercentETH: float = 1. #acceptingSpreadPercentRTH: float = 0.003 #acceptingSpreadPercentETH: float = 0.006 # Max percentage of portfolio of one security per position. For testing, set to 1. #max_percentage_per_position: float = 1. max_percentage_per_position: float = 3.0 # see leverage # Benchmark myBenchmark: str = 'SPY' # Global Signals global_case_filter_condition: int = 1 # always True # Noise Area Indicator scaling_factor: float = 0.95 # reduces the noise area, as breakouts already happen for smaller noise areas than the average gap_stretch_factor: float = 1.90 # increases the noise area asymmetrically to the gap side period: int = 15 * 5 #self.noise_area_exit_tol = -0.15 # Exclude extreme gaps gap_min_long: float = -0.035 gap_max_short: float = +0.035 # MA trailing_ema_period: int = 90 trailing_ema_exit_tol: float = -0.30 #trailing_max_period: int = 30 #trailing_max_exit_tol: float = -0.50 #trailing_min_period: int = 30 #trailing_min_exit_tol: float = -0.50 eod_ma_period: int = 120 # SL versus average entry price long_atr_stop: float = 0.40 short_atr_stop: float = 0.40 #long_atr_tp: float = 4.0 #short_atr_tp: float = 4.0 # VWAP vwap_entry_tol: float = 0.080 vwap_exit_tol: float = -0.025
# region imports from AlgorithmImports import * from analytics import SecurityAnalytics # endregion class CustomAlphaModel(AlphaModel): def __init__(self): self.name = self.__class__.__name__ self.securities = [] def update(self, algorithm: QCAlgorithm, data: Slice) -> list[Insight]: insights = [] for security in self.securities: insight = security.analytics.create_insight(algorithm, data) if insight: insights.append(insight) return insights def on_securities_changed(self, algorithm, changes): for security in changes.added_securities: if security.type is not SecurityType.EQUITY: continue if security in self.securities: continue security.analytics = SecurityAnalytics(algorithm, security) self.securities.append(security) for security in changes.removed_securities: if security not in self.securities: continue self.securities.remove(security) security.analytics.reset()
# region imports from AlgorithmImports import * from indicators import NoiseAreaIndicator, NoiseAreaIndicatorEMA, IntradayVWAP from toolbox import TimeFrameHelper # endregion class SecurityAnalytics: def __init__(self, algorithm: QCAlgorithm, security: Security) -> None: self.algorithm = algorithm self.security = security self.symbol = security.symbol tf_helper = TimeFrameHelper(security, Resolution.MINUTE) self.insight = Insight.price(symbol=self.symbol, period=timedelta(1), direction=InsightDirection.FLAT) # NoiseAreaIndicator #scaling_factor reduces the noise area, as breakouts already happen for smaller noise areas than the average #gap_stretch_factor increases the noise area asymmetrically to the gap side #self.noise_area_exit_tol is currently unused #self.noise_area = NoiseAreaIndicator(tf_helper, self.algorithm.config.period, self.algorithm.config.scaling_factor, self.algorithm.config.gap_stretch_factor, use_ema_weighted=True) self.noise_area = NoiseAreaIndicatorEMA(tf_helper, self.algorithm.config.period, self.algorithm.config.scaling_factor, self.algorithm.config.gap_stretch_factor) algorithm.warm_up_indicator(security.symbol, self.noise_area, Resolution.MINUTE) algorithm.register_indicator(security.symbol, self.noise_area, Resolution.MINUTE) #--------------------------------- # Entry-Logik # Arthur, das sind Prior Day Filter, die man immer wieder benutzen kann - wie kann man das modularisieren, zB mit einer Logik ähnlich wie in den Global Signals? # Exclude extreme gaps self.gap_min_long = self.algorithm.config.gap_min_long self.gap_max_short = self.algorithm.config.gap_max_short # ATR self.atr_day = AverageTrueRange(63) algorithm.warm_up_indicator(security.symbol, self.atr_day, Resolution.DAILY) algorithm.register_indicator(security.symbol, self.atr_day, Resolution.DAILY) # VWAP self.vwap_entry_tol = self.algorithm.config.vwap_entry_tol # Arthur, müsste ein ATR Faktor sein self.vwap_exit_tol = self.algorithm.config.vwap_exit_tol # Arthur, müsste ein ATR Faktor sein. Vorschicht, ich habe die Logik der Vorzeichen verändert! self.vwap = IntradayVWAP() algorithm.warm_up_indicator(security.symbol, self.vwap, Resolution.MINUTE) algorithm.register_indicator(security.symbol, self.vwap, Resolution.MINUTE) # Regime Min, Max, MA regime_max_period = 3 self.regime_max = Maximum(period=regime_max_period) #algorithm.warm_up_indicator(security.symbol, self.regime_max, Resolution.DAILY, Field.CLOSE) #algorithm.register_indicator(security.symbol, self.regime_max, Resolution.DAILY, Field.CLOSE) #algorithm.warm_up_indicator(security.symbol, self.regime_max, Resolution.DAILY, Field.HIGH) #algorithm.register_indicator(security.symbol, self.regime_max, Resolution.DAILY, Field.HIGH) algorithm.warm_up_indicator(security.symbol, self.regime_max, Resolution.DAILY, Field.LOW) algorithm.register_indicator(security.symbol, self.regime_max, Resolution.DAILY, Field.LOW) #regime_min_period = 3 #self.regime_min = Maximum(period=regime_min_period) #algorithm.warm_up_indicator(security.symbol, self.regime_min, Resolution.DAILY, Field.CLOSE) #algorithm.register_indicator(security.symbol, self.regime_min, Resolution.DAILY, Field.CLOSE) #algorithm.warm_up_indicator(security.symbol, self.regime_min, Resolution.DAILY, Field.HIGH) #algorithm.register_indicator(security.symbol, self.regime_min, Resolution.DAILY, Field.HIGH) #algorithm.warm_up_indicator(security.symbol, self.regime_min, Resolution.DAILY, Field.LOW) #algorithm.register_indicator(security.symbol, self.regime_min, Resolution.DAILY, Field.LOW) #regime_ma_period = 5 #self.regime_ma = SimpleMovingAverage(period=regime_ma_period) #algorithm.warm_up_indicator(security.symbol, self.regime_ma, Resolution.DAILY, Field.CLOSE) #algorithm.register_indicator(security.symbol, self.regime_ma, Resolution.DAILY, Field.CLOSE) #--------------------------------- # Exit-Logik # Arthur, das ist nur exemplarisch - wie kann man die SL Logic modularisieren, zB einen Indicator? # Trailing EMA stop loss trailing_ema_period = self.algorithm.config.trailing_ema_period self.trailing_ema_exit_tol = self.algorithm.config.trailing_ema_exit_tol self.trailing_ema = SimpleMovingAverage(period=trailing_ema_period) algorithm.warm_up_indicator(security.symbol, self.trailing_ema, Resolution.MINUTE, Field.CLOSE) algorithm.register_indicator(security.symbol, self.trailing_ema, Resolution.MINUTE, Field.CLOSE) # Trailing MAX stop loss #trailing_max_period = self.algorithm.config.trailing_max_period #trailing_max_exit_tol = self.algorithm.config.trailing_max_exit_tol #self.trailing_max = SimpleMovingAverage(period=trailing_max_period) #algorithm.warm_up_indicator(security.symbol, self.trailing_max, Resolution.MINUTE, Field.CLOSE) #algorithm.register_indicator(security.symbol, self.trailing_max, Resolution.MINUTE, Field.CLOSE) # Trailing MIN stop loss #trailing_min_period = self.algorithm.config.trailing_min_period #trailing_min_exit_tol = self.algorithm.config.trailing_min_exit_tol #self.trailing_min = SimpleMovingAverage(period=trailing_min_period) #algorithm.warm_up_indicator(security.symbol, self.trailing_min, Resolution.MINUTE, Field.CLOSE) #algorithm.register_indicator(security.symbol, self.trailing_min, Resolution.MINUTE, Field.CLOSE) # Time SMA to avoid spikes to trigger stop loss time_sma_period = 3 self.time_sma = SimpleMovingAverage(period=time_sma_period) algorithm.warm_up_indicator(security.symbol, self.time_sma, Resolution.MINUTE, Field.CLOSE) algorithm.register_indicator(security.symbol, self.time_sma, Resolution.MINUTE, Field.CLOSE) # EoD MA to allow overnight holdings in case we are on the safe side of the moving average eod_ma_period = self.algorithm.config.eod_ma_period self.eod_ma = SimpleMovingAverage(period=eod_ma_period) algorithm.warm_up_indicator(security.symbol, self.eod_ma, Resolution.DAILY, Field.CLOSE) algorithm.register_indicator(security.symbol, self.eod_ma, Resolution.DAILY, Field.CLOSE) # SL versus average entry price self.long_atr_stop = self.algorithm.config.long_atr_stop self.short_atr_stop = self.algorithm.config.short_atr_stop #--------------------------------- def create_insight(self, algorithm: QCAlgorithm, data: Slice) -> Insight | None: if self.noise_area.is_ready: algorithm.plot("Noise Area", "Upper Bound", self.noise_area.upper_bound) algorithm.plot("Noise Area", "Lower Bound", self.noise_area.lower_bound) algorithm.plot("Noise Area", "Price", self.security.price) if self.vwap.is_ready: algorithm.plot("Noise Area", "VWAP", self.vwap.value) if not self.can_emit_insight: return if self.insight.direction is not InsightDirection.FLAT: # exit if self.exit_conditions_met: self.insight = Insight.price(symbol=self.symbol, period=timedelta(1), direction=InsightDirection.FLAT) return self.insight # exit-and-reverse if not self.algorithm.config.eod_exit: if self.insight.direction is InsightDirection.DOWN and self.long_entry_conditions_met: self.insight = Insight.price(symbol=self.symbol, period=timedelta(1), direction=InsightDirection.UP) return self.insight if self.insight.direction is InsightDirection.UP and self.short_entry_conditions_met: self.insight = Insight.price(symbol=self.symbol, period=timedelta(1), direction=InsightDirection.DOWN) return self.insight else: # long and short entry if self.long_entry_conditions_met: self.insight = Insight.price(symbol=self.symbol, period=timedelta(1), direction=InsightDirection.UP) return self.insight if self.short_entry_conditions_met: self.insight = Insight.price(symbol=self.symbol, period=timedelta(1), direction=InsightDirection.DOWN) return self.insight return @property def long_entry_conditions_met(self) -> bool: gap = (self.noise_area.day_open - self.noise_area.previous_day_close) / self.noise_area.previous_day_close if self.noise_area.previous_day_close != 0 else -1. #--------------------------------- # Arthur, das sind Prior Day Filter, die man immer wieder benutzen kann - wie kann man das modularisieren, zB mit einer Logik ähnlich wie in den Global Signals? prior_days_condition = ( gap > self.gap_min_long and #self.security.price < self.regime_max.Current.Value and #self.security.price < self.regime_ma.Current.Value and #self.security.price < self.regime_min.Current.Value and #self.security.price > self.regime_max.Current.Value and #self.security.price > self.regime_ma.Current.Value and #self.security.price > self.regime_min.Current.Value and True) #--------------------------------- current_day_condition = ( self.security.price > self.noise_area.upper_bound and #self.time_sma.Current.Value > self.noise_area.upper_bound and self.security.price > self.vwap.value + self.vwap_entry_tol * self.atr_day.Current.Value and #self.time_sma.Current.Value > self.vwap.value + self.vwap_entry_tol * self.atr_day.Current.Value and True) exit_preventing_condition = ( #self.security.price > self.trailing_ema.Current.Value - self.trailing_ema_exit_tol * self.atr_day.Current.Value and self.security.price > self.trailing_ema.Current.Value and #self.security.price > self.trailing_ema.Current.Value + self.trailing_ema_exit_tol * self.atr_day.Current.Value and #self.security.price > self.trailing_max.Current.Value + self.trailing_max_exit_tol * self.atr_day.Current.Value and True) # Arthur, wie kann ich hier auf den Case Filter zugreifen? case_filter_condition = True #case_filter_condition = self.algorithm.global_case_filter.check_condition(self.algorithm.config.global_case_filter_condition) if prior_days_condition and current_day_condition and exit_preventing_condition and case_filter_condition and self.algorithm.config.directional_bias >= 0: return True return False @property def short_entry_conditions_met(self) -> bool: gap = (self.noise_area.day_open - self.noise_area.previous_day_close) / self.noise_area.previous_day_close if self.noise_area.previous_day_close != 0 else +1. prior_days_condition = ( gap < self.gap_max_short and #self.security.price < self.regime_max.Current.Value and #self.security.price < self.regime_ma.Current.Value and # gutes ergebnis #self.security.price < self.regime_min.Current.Value and self.security.price > self.regime_max.Current.Value and #self.security.price > self.regime_ma.Current.Value and # gleichmässig #self.security.price > self.regime_min.Current.Value and True) current_day_condition = ( self.security.price < self.noise_area.lower_bound and #self.time_sma.Current.Value < self.noise_area.lower_bound and self.security.price < self.vwap.value - self.vwap_entry_tol * self.atr_day.Current.Value and #self.time_sma.Current.Value < self.vwap.value - self.vwap_entry_tol * self.atr_day.Current.Value and True) exit_preventing_condition = ( #self.security.price < self.trailing_ema.Current.Value + self.trailing_ema_exit_tol * self.atr_day.Current.Value and self.security.price < self.trailing_ema.Current.Value and #self.security.price < self.trailing_ema.Current.Value - self.trailing_ema_exit_tol * self.atr_day.Current.Value and #self.security.price < self.trailing_min.Current.Value - self.trailing_min_exit_tol * self.atr_day.Current.Value and True) case_filter_condition = True #case_filter_condition = self.algorithm.global_case_filter.check_condition(self.algorithm.config.global_case_filter_condition) if prior_days_condition and current_day_condition and exit_preventing_condition and case_filter_condition and self.algorithm.config.directional_bias <= 0: return True return False @property def exit_conditions_met(self) -> bool: if self.insight.direction is InsightDirection.UP: exit_standard_condition = ( (self.security.price < self.trailing_ema.Current.Value + self.trailing_ema_exit_tol * self.atr_day.Current.Value) or #(self.security.price < self.trailing_max.Current.Value + self.trailing_max_exit_tol * self.atr_day.Current.Value) or #(self.security.price < self.trailing_ema.Current.Value + self.trailing_ema_exit_tol * self.atr_day.Current.Value) and (self.time_sma.Current.Value < self.trailing_ema.Current.Value + self.trailing_ema_exit_tol * self.atr_day.Current.Value) or (self.security.price - self.algorithm.portfolio[self.security.symbol].AveragePrice) / self.atr_day.Current.Value < -self.algorithm.config.long_atr_stop or #(self.security.price - self.algorithm.portfolio[self.security.symbol].AveragePrice) / self.atr_day.Current.Value > +self.algorithm.config.long_atr_tp or False) exit_alpha_condition = ( #self.security.price < self.noise_area.upper_bound * (1 + self.noise_area_exit_tol) or #self.security.price < self.vwap.value + self.vwap_exit_tol * self.atr_day.Current.Value or (self.security.price < self.vwap.value + self.vwap_exit_tol * self.atr_day.Current.Value and self.time_sma.Current.Value < self.vwap.value + self.vwap_exit_tol * self.atr_day.Current.Value) or False) if exit_standard_condition or exit_alpha_condition: return True if self.insight.direction is InsightDirection.DOWN: exit_standard_condition = ( (self.security.price > self.trailing_ema.Current.Value - self.trailing_ema_exit_tol * self.atr_day.Current.Value) or #(self.security.price > self.trailing_min.Current.Value - self.trailing_min_exit_tol * self.atr_day.Current.Value) or #(self.security.price > self.trailing_ema.Current.Value - self.trailing_ema_exit_tol * self.atr_day.Current.Value) and (self.time_sma.Current.Value > self.trailing_ema.Current.Value - self.trailing_ema_exit_tol * self.atr_day.Current.Value) or (self.security.price - self.algorithm.portfolio[self.security.symbol].AveragePrice) / self.atr_day.Current.Value > +self.algorithm.config.short_atr_stop or #(self.security.price - self.algorithm.portfolio[self.security.symbol].AveragePrice) / self.atr_day.Current.Value > -self.algorithm.config.short_atr_tp or False) exit_alpha_condition = ( #self.security.price > self.noise_area.lower_bound - self.noise_area_exit_tol * self.atr_day.Current.Value or #self.security.price > self.vwap.value - self.vwap_exit_tol * self.atr_day.Current.Value or (self.security.price > self.vwap.value - self.vwap_exit_tol * self.atr_day.Current.Value and self.time_sma.Current.Value > self.vwap.value - self.vwap_exit_tol * self.atr_day.Current.Value) or False) if exit_standard_condition or exit_alpha_condition: return True if self.security.exchange.is_closing_soon(minutes_to_close=1): # Frank: minutes to close größer machen exit_eod_condition = ( self.algorithm.config.eod_exit or self.insight.direction is InsightDirection.UP and self.security.price < self.eod_ma.Current.Value or self.insight.direction is InsightDirection.DOWN and self.security.price > self.eod_ma.Current.Value or False) if exit_eod_condition: return True return False @property def can_emit_insight(self) -> bool: if not self.security.is_tradable: return False if not self.security.has_data: return False if not self.security.exchange.exchange_open: return False # no insights if not exchange open if self.algorithm.time.time() < self.algorithm.config.trading_start_time: return False # no entries and no exits prior start time if self.insight.direction is InsightDirection.FLAT: if self.algorithm.time.time() < self.algorithm.config.trading_start_time: return False # no entries prior start time if self.algorithm.time.time() >= self.algorithm.config.trading_end_time: return False # no entries after end time if self.security.exchange.is_closing_soon(minutes_to_close=1): return False # no entries prior EoD return True
#region imports from AlgorithmImports import * #endregion class GlobalCaseFilter: """ Applies a boolean filter based on the input variable using a dictionary of conditions. Condition 0 returns always True. Usage: def initialize(algorithm): global_case_filter = CaseFilter(algorithm) result = global_case_filter.check_condition(3) """ def __init__(self, algorithm): self.algorithm = algorithm # Placeholder condition methods def condition_1(self): return True def condition_2(self): return False def check_condition(self, input_var: int) -> bool: conditions = { # Direct boolean values 0: False, # always False as a benchmark and for up:True / down:not False 1: True, # always True as a benchmark and for up:True / down:not False # Existing variables 2: self.algorithm.Vix_less_SMA1, # Method references 98: self.condition_1, 99: self.condition_2, } # We can (a) call the condition method reference or (b) evaluate the direct boolean condition if input_var in conditions: condition = conditions[input_var] return condition() if callable(condition) else condition else: return False class GlobalSignals: """ Creates global indicators and manages their update. Usage: def initialize(algorithm): global_signals = GlobalSignals(algorithm) def OnData(algorithm, data: Slice): algorithm.global_signals.OnData(data) """ def __init__(self, algorithm): self.algorithm = algorithm # vix self.vix = algorithm.AddIndex("VIX").Symbol self.vix_sma = 0 # Arthur, kannst Du dies hier bitte ergänzen? Ich hatte einen Mittelwert auf ein deque genutzt, geht aber sicher schlanker? # make results available globally self.algorithm.Vix_Value = 0 self.algorithm.Vix_less_SMA = False def OnData(self, slice): # vix if slice.ContainsKey(self.vix): self.algorithm.Vix_Value = slice[self.vix].Close self.algorithm.Vix_less_SMA = self.algorithm.Vix_Value <= self.vix_sma
# region imports from AlgorithmImports import * from itertools import repeat from toolbox import TimeFrameHelper #from collections import deque import numpy as np # endregion class IntradayVWAP(PythonIndicator): def __init__(self, name='VWAP'): self.name = name self.value = 0 self.time = datetime.min self.sum_of_volume = 0 self.sum_of_dollar_volume = 0 self.count = 0 self.warm_up_period = 1 def update(self, data: TradeBar) -> bool: if data.is_fill_forward: return self.is_ready if data.end_time.day != self.time.day: self.sum_of_volume = 0 self.sum_of_dollar_volume = 0 self.count = 0 avg_price = (data.high + data.low + data.close) / 3 self.sum_of_volume += data.volume self.sum_of_dollar_volume += avg_price * data.volume if self.sum_of_volume == 0: self.value = data.value return self.is_ready self.value = self.sum_of_dollar_volume / self.sum_of_volume self.time = data.end_time self.count += 1 return self.is_ready @property def is_ready(self) -> bool: return self.sum_of_volume > 0 and self.count >= 1 class NoiseAreaIndicator(PythonIndicator): def __init__(self, tf_helper: TimeFrameHelper, period=63, scaling_factor=1.0, gap_stretch_factor=1.0, use_ema_weighted=False): self.time = datetime.min self.value = 0 self.period = period self.scaling_factor = scaling_factor self.gap_stretch_factor = gap_stretch_factor self.use_ema_weighted = use_ema_weighted self.warm_up_period = self.period + 200 self.count = 0 self.first_bar_of_day = TradeBar(time=self.time, symbol=None, open=0, high=0, low=0, close=0, volume=0) self.latest_time_for_reset = time(9,35) # Wert erhöht self.day_open = 0 self.previous_close = 0 self.previous_day_open = 0 self.previous_day_close = 0 self.upper_bound = 0 self.lower_bound = 0 #self.upper_bound_by_time = dict.fromkeys(range(1, tf_helper.day + 1), 0) #self.lower_bound_by_time = dict.fromkeys(range(1, tf_helper.day + 1), 0) self.sigma_by_time = dict(zip(range(1, tf_helper.day + 1), repeat(SimpleMovingAverage(self.period), tf_helper.day))) if not use_ema_weighted else np.zeros(tf_helper.day + 1) def update(self, data: TradeBar) -> bool: if self.first_bar_of_day.time.day != data.end_time.day: if data.end_time.time() > self.latest_time_for_reset: # Arthur, hier sollten wir eine Fehlermeldung ausgeben return #pass self.previous_day_open = self.day_open self.previous_day_close = self.previous_close self.first_bar_of_day = data self.day_open = self.first_bar_of_day.open abs_move = abs(data.close / self.first_bar_of_day.open - 1) if self.first_bar_of_day.open != 0 else 0 # Frank, update minutes_elapsed = int((data.end_time - self.first_bar_of_day.time).total_seconds() // 60) self.sigma_by_time[minutes_elapsed].update(data.end_time, abs_move) if not self.use_ema_weighted else self.ema_weighted(self.sigma_by_time[minutes_elapsed], abs_move, self.period) upper_bound_reference = lower_bound_reference = self.first_bar_of_day.open if self.previous_day_close is not None: #upper_bound_reference = max(upper_bound_reference, self.previous_day_close) #lower_bound_reference = min(lower_bound_reference, self.previous_day_close) upper_bound_reference = upper_bound_reference + max(0, self.previous_day_close-upper_bound_reference) * self.gap_stretch_factor lower_bound_reference = lower_bound_reference - max(0, lower_bound_reference-self.previous_day_close) * self.gap_stretch_factor if self.use_ema_weighted: self.upper_bound = upper_bound_reference * (1 + self.sigma_by_time[minutes_elapsed] * self.scaling_factor) self.lower_bound = lower_bound_reference * (1 - self.sigma_by_time[minutes_elapsed] * self.scaling_factor) else: self.upper_bound = upper_bound_reference * (1 + self.sigma_by_time[minutes_elapsed].current.value * self.scaling_factor) self.lower_bound = lower_bound_reference * (1 - self.sigma_by_time[minutes_elapsed].current.value * self.scaling_factor) #self.upper_bound_by_time[minutes_elapsed] = self.upper_bound #self.lower_bound_by_time[minutes_elapsed] = self.lower_bound self.previous_close = data.close self.count += 1 return self.is_ready @property def is_ready(self) -> bool: return self.count > self.warm_up_period def reset(self): self.time = datetime.min self.value = 0 def ema_weighted(self, sigma, abs_move, period=80, i=0): """ Calculate an EMA that is less influenced by recent changes, does not need any arrays, and allows plotting the EMA weight along i sigma: previous EMA value abs_move: absolute price move for the current period period: lookback period for the slow EMA period_fast: lookback period for the fast EMA period_fast2: optional lookback period for the second fast EMA period_even_weight: period over which the EMA is adjusted to control the weight of the recent period i: index for which weight is being calculated for plotting purpose """ period_change = 0.1 period_fast = int(period * (0.65-period_change)) period_fast2 = int(period * (0.80-period_change)) period_even_weight = int(period * (0.65+period_change)) # Warm up if sigma == 0: return abs_move if i==0 else abs_move, 1. # EMA alpha decay factors alpha_slow = 2. / (period + 1.) alpha_fast = 2. / (period_fast + 1.) alpha_fast2 = 2. / (period_fast2 + 1.) # EMA's ema_slow = alpha_slow * abs_move + (1 - alpha_slow) * sigma ema_fast = alpha_fast * abs_move + (1 - alpha_fast) * sigma ema_fast2 = alpha_fast2 * abs_move + (1 - alpha_fast2) * sigma # Pure EMA weight distribution weight_slow = alpha_slow * (1 - alpha_slow) ** i weight_fast = alpha_fast * (1 - alpha_fast) ** i weight_fast2 = alpha_fast2 * (1 - alpha_fast2) ** i # Beta factors for combined EMA's factor_slow = (1 - alpha_slow) ** period_even_weight factor_fast = (1 - alpha_fast) ** period_even_weight factor_fast2 = (1 - alpha_fast2) ** period_even_weight # EMA Standard if period_fast == 0: weight = weight_slow ema = ema_slow return ema if i==0 else ema, weight # EMA with period_fast adjusted less influence by recent changes elif period_fast2 == 0 and period_even_weight == 0: beta = alpha_slow / alpha_fast weight = weight_slow - beta * (weight_fast - weight_slow) ema = ema_slow - beta * (ema_fast - ema_slow) return ema if i==0 else ema, weight # EMA with period_fast adjusted less influence by recent changes and even weight elif period_fast2 == 0: beta = (alpha_slow * factor_slow - alpha_slow) / (alpha_fast * factor_fast - alpha_fast) weight = weight_slow - beta * (weight_fast - weight_slow) ema = ema_slow - beta * (ema_fast - ema_slow) return ema if i==0 else ema, weight # EMA with period_fast and period_fast2 adjusted less influence by recent changes and even weight elif period_even_weight != 0: beta = (alpha_slow * factor_slow - alpha_slow) / (alpha_fast * factor_fast - alpha_fast) beta2 = (alpha_slow * factor_slow - alpha_slow) / (alpha_fast2 * factor_fast2 - alpha_fast2) weight = weight_slow - beta * (weight_fast - weight_slow) - beta2 * (weight_fast2 - weight_slow) ema = ema_slow - beta * (ema_fast - ema_slow) - beta2 * (ema_fast2 - ema_slow) return ema if i==0 else ema, weight return 0. if i==0 else 0., 0. class NoiseAreaIndicatorEMA(PythonIndicator): def __init__(self, tf_helper: TimeFrameHelper, period=63, scaling_factor=1.0, gap_stretch_factor = 1.0): self.time = datetime.min self.value = 0 self.period = period self.scaling_factor = scaling_factor self.gap_stretch_factor = gap_stretch_factor self.warm_up_period = self.period + 200 self.count = 0 self.first_bar_of_day = TradeBar(time=self.time, symbol=None, open=0, high=0, low=0, close=0, volume=0) self.latest_time_for_reset = time(9,35) # Wert erhöht self.day_open = 0 self.previous_close = 0 self.previous_day_open = 0 self.previous_day_close = 0 self.upper_bound = 0 self.lower_bound = 0 #self.upper_bound_by_time = dict.fromkeys(range(1, tf_helper.day + 1), 0) #self.lower_bound_by_time = dict.fromkeys(range(1, tf_helper.day + 1), 0) #self.sigma_by_time = dict(zip(range(1, tf_helper.day + 1), repeat(SimpleMovingAverage(self.period), tf_helper.day))) self.sigma_by_time = np.zeros(tf_helper.day + 10) period_change = 0.1 self.period_fast = int(period * (0.65-period_change)) self.period_fast2 = int(period * (0.80-period_change)) self.period_even_weight = int(period * (0.65+period_change)) def update(self, data: TradeBar) -> bool: if self.first_bar_of_day.time.day != data.end_time.day: if data.end_time.time() > self.latest_time_for_reset: # Arthur, hier sollten wir eine Fehlermeldung ausgeben return #pass self.previous_day_open = self.day_open self.previous_day_close = self.previous_close self.first_bar_of_day = data self.day_open = self.first_bar_of_day.open abs_move = abs(data.close / self.first_bar_of_day.open - 1) if self.first_bar_of_day.open != 0 else 0 # Frank, update minutes_elapsed = int((data.end_time - self.first_bar_of_day.time).total_seconds() // 60) #self.sigma_by_time[minutes_elapsed].update(data.end_time, abs_move) self.sigma_by_time[minutes_elapsed], _ = self.ema_weighted(self.sigma_by_time[minutes_elapsed], abs_move, self.period, self.period_fast, self.period_fast2, self.period_even_weight) upper_bound_reference = lower_bound_reference = self.first_bar_of_day.open if self.previous_day_close is not None: #upper_bound_reference = max(upper_bound_reference, self.previous_day_close) #lower_bound_reference = min(lower_bound_reference, self.previous_day_close) upper_bound_reference = upper_bound_reference + max(0, self.previous_day_close-upper_bound_reference) * self.gap_stretch_factor lower_bound_reference = lower_bound_reference - max(0, lower_bound_reference-self.previous_day_close) * self.gap_stretch_factor #self.upper_bound = upper_bound_reference * (1 + self.sigma_by_time[minutes_elapsed].current.value * self.scaling_factor) #self.lower_bound = lower_bound_reference * (1 - self.sigma_by_time[minutes_elapsed].current.value * self.scaling_factor) self.upper_bound = upper_bound_reference * (1 + self.sigma_by_time[minutes_elapsed] * self.scaling_factor) self.lower_bound = lower_bound_reference * (1 - self.sigma_by_time[minutes_elapsed] * self.scaling_factor) #self.upper_bound_by_time[minutes_elapsed] = self.upper_bound #self.lower_bound_by_time[minutes_elapsed] = self.lower_bound self.previous_close = data.close self.count += 1 return self.is_ready @property def is_ready(self) -> bool: return self.count > self.warm_up_period def reset(self): self.time = datetime.min self.value = 0 def ema_weighted(self, sigma, abs_move, period=100, period_fast=20, period_fast2=50, period_even_weight=70, i=0): """ Calculate an EMA that is less influenced by recent changes, does not need any arrays, and allows plotting the EMA weight along i sigma: previous EMA value abs_move: absolute price move for the current period period: lookback period for the slow EMA period_fast: lookback period for the fast EMA period_fast2: optional lookback period for the second fast EMA period_even_weight: period over which the EMA is adjusted to control the weight of the recent period i: index for which weight is being calculated for plotting purpose """ # Warm up if sigma == 0: return abs_move, 1. # EMA alpha decay factors alpha_slow = 2. / (period + 1.) alpha_fast = 2. / (period_fast + 1.) alpha_fast2 = 2. / (period_fast2 + 1.) # EMA's ema_slow = alpha_slow * abs_move + (1 - alpha_slow) * sigma ema_fast = alpha_fast * abs_move + (1 - alpha_fast) * sigma ema_fast2 = alpha_fast2 * abs_move + (1 - alpha_fast2) * sigma # Pure EMA weight distribution weight_slow = alpha_slow * (1 - alpha_slow) ** i weight_fast = alpha_fast * (1 - alpha_fast) ** i weight_fast2 = alpha_fast2 * (1 - alpha_fast2) ** i # Beta factors for combined EMA's factor_slow = (1 - alpha_slow) ** period_even_weight factor_fast = (1 - alpha_fast) ** period_even_weight factor_fast2 = (1 - alpha_fast2) ** period_even_weight # EMA Standard if period_fast == 0: weight = weight_slow ema = ema_slow return ema, weight # EMA with period_fast adjusted less influence by recent changes elif period_fast2 == 0 and period_even_weight == 0: beta = alpha_slow / alpha_fast weight = weight_slow - beta * (weight_fast - weight_slow) ema = ema_slow - beta * (ema_fast - ema_slow) return ema, weight # EMA with period_fast adjusted less influence by recent changes and even weight elif period_fast2 == 0: beta = (alpha_slow * factor_slow - alpha_slow) / (alpha_fast * factor_fast - alpha_fast) weight = weight_slow - beta * (weight_fast - weight_slow) ema = ema_slow - beta * (ema_fast - ema_slow) return ema, weight # EMA with period_fast and period_fast2 adjusted less influence by recent changes and even weight elif period_even_weight != 0: beta = (alpha_slow * factor_slow - alpha_slow) / (alpha_fast * factor_fast - alpha_fast) beta2 = (alpha_slow * factor_slow - alpha_slow) / (alpha_fast2 * factor_fast2 - alpha_fast2) weight = weight_slow - beta * (weight_fast - weight_slow) - beta2 * (weight_fast2 - weight_slow) ema = ema_slow - beta * (ema_fast - ema_slow) - beta2 * (ema_fast2 - ema_slow) return ema, weight return 0., 0. """ def ema_weighted_old(self, sigma, abs_move, period=80, period_fast=0): # Provides an EMA that is less influenced by recent changes # EMA weight distribution = (1-alpha)^(k-1) * alpha if sigma == 0: return abs_move elif period_fast == 0: alpha_slow = 2. / (period + 1.) return alpha_slow * abs_move + (1-alpha_slow) * sigma else: alpha_slow = 2. / (period + 1.) alpha_fast = 2. / (period_fast + 1.) beta = alpha_slow / alpha_fast ema_slow = alpha_slow * abs_move + (1 - alpha_slow) * sigma ema_fast = alpha_fast * abs_move + (1 - alpha_fast) * sigma adjusted_ema = ema_slow - beta * (ema_fast - ema_slow) return adjusted_ema def ema_weighted_old2(self, sigma, abs_move, period=80, period_fast=15, period_even_weight=40, i=0): # Warm up if sigma == 0: return abs_move # EMA alpha decay factor alpha_slow = 2. / (period + 1.) alpha_fast = 2. / (period_fast + 1.) # EMA weight distribution = (1-alpha)^(k-1) * alpha weight_slow = alpha_slow * (1 - alpha_slow) ** i weight_fast = alpha_fast * (1 - alpha_fast) ** i # Standard EMA without fast adjustment if period_fast == 0: ema = alpha_slow * abs_move + (1 - alpha_slow) * sigma return ema, weight_slow # EMA with less influence by recent changes if period_even_weight == 0: beta = alpha_slow / alpha_fast else: factor_fast = (1 - alpha_fast) ** period_even_weight factor_slow = (1 - alpha_slow) ** period_even_weight beta = (alpha_slow * factor_slow - alpha_slow) / (alpha_fast * factor_fast - alpha_fast) ema_slow = alpha_slow * abs_move + (1 - alpha_slow) * sigma ema_fast = alpha_fast * abs_move + (1 - alpha_fast) * sigma ema = ema_slow - beta * (ema_fast - ema_slow) weight = weight_slow - beta * (weight_fast - weight_slow) return ema, weight """ """ class NoiseAreaIndicator_old(PythonIndicator): def __init__(self, tf_helper: TimeFrameHelper, period=63, scaling_factor=1.0, gap_stretch_factor = 1.0): self.time = datetime.min self.value = 0 self.period = period # tf_helper.quarter self.warm_up_period = self.period + 20 self.count = 0 self.first_bar_of_day = TradeBar(time=self.time, symbol=None, open=0, high=0, low=0, close=0, volume=0) self.day_open = 0 self.previous_close = 0 self.previous_day_open = 0 self.previous_day_close = 0 self.upper_bound_by_time = dict.fromkeys(range(1, tf_helper.day + 1), 0) self.lower_bound_by_time = dict.fromkeys(range(1, tf_helper.day + 1), 0) self.upper_bound = 0 self.lower_bound = 0 self.latest_time_for_reset = time(9,35) # Wert erhöht self.sigma_by_time = dict(zip(range(1, tf_helper.day + 1), repeat(SimpleMovingAverage(self.period), tf_helper.day))) self.scaling_factor = scaling_factor self.gap_stretch_factor = gap_stretch_factor def update(self, data: TradeBar) -> bool: if self.first_bar_of_day.time.day != data.end_time.day: if data.end_time.time() > self.latest_time_for_reset: # Arthur, hier sollten wir eine Fehlermeldung ausgeben #return pass self.previous_day_open = self.day_open self.previous_day_close = self.previous_close self.first_bar_of_day = data self.day_open = self.first_bar_of_day.open abs_move = abs(data.close / self.first_bar_of_day.open - 1) minutes_elapsed = int((data.end_time - self.first_bar_of_day.time).total_seconds() // 60) self.sigma_by_time[minutes_elapsed].update(data.end_time, abs_move) upper_bound_reference = lower_bound_reference = self.first_bar_of_day.open if self.previous_day_close is not None: #upper_bound_reference = max(upper_bound_reference, self.previous_day_close) upper_bound_reference = upper_bound_reference + max(0, self.previous_day_close-upper_bound_reference) * self.gap_stretch_factor #lower_bound_reference = min(lower_bound_reference, self.previous_day_close) lower_bound_reference = lower_bound_reference - max(0, lower_bound_reference-self.previous_day_close) * self.gap_stretch_factor self.upper_bound = upper_bound_reference * (1 + self.sigma_by_time[minutes_elapsed].current.value * self.scaling_factor) # scaling nur auf sigma angewandt self.upper_bound_by_time[minutes_elapsed] = self.upper_bound #lower_bound_reference = self.first_bar_of_day.open self.lower_bound = lower_bound_reference * (1 - self.sigma_by_time[minutes_elapsed].current.value * self.scaling_factor) # scaling nur auf sigma angewandt self.lower_bound_by_time[minutes_elapsed] = self.lower_bound self.previous_close = data.close self.count += 1 return self.is_ready @property def is_ready(self) -> bool: return self.count > self.warm_up_period def reset(self): self.time = datetime.min self.value = 0 class NoiseAreaIndicator3(PythonIndicator): def __init__(self, tf_helper: TimeFrameHelper, period=63, scaling_factor=1.0, gap_stretch_factor = 1.0): self.time = datetime.min self.value = 0 self.period = period # tf_helper.quarter self.scaling_factor = scaling_factor self.gap_stretch_factor = gap_stretch_factor self.bars_per_day = tf_helper.day self.warm_up_period = self.period + 20 self.count = 0 self.latest_time_for_reset = time(9,35) # Wert erhöht self.first_bar_of_day = TradeBar(time=self.time, symbol=None, open=0, high=0, low=0, close=0, volume=0) self.day_open = 0 self.previous_close = 0 self.previous_day_open = 0 self.previous_day_close = 0 #self.sigma_by_time = dict(zip(range(1, tf_helper.day + 1), repeat(SimpleMovingAverage(self.period), tf_helper.day))) self.sigma_by_time = np.zeros(self.bars_per_day + 1) self.upper_bound = 0 self.lower_bound = 0 #self.upper_bound_by_time = dict.fromkeys(range(1, tf_helper.day + 1), 0) #self.lower_bound_by_time = dict.fromkeys(range(1, tf_helper.day + 1), 0) def ema_weighted(self, sigma, abs_move, period=63): if sigma == 0: return abs_move else: alpha = 2. / (period + 1.) return alpha * abs_move + (1-alpha) * sigma def update(self, data: TradeBar) -> bool: if self.first_bar_of_day.time.day != data.end_time.day: if data.end_time.time() > self.latest_time_for_reset: # Arthur, hier sollten wir eine Fehlermeldung ausgeben return #pass self.previous_day_open = self.day_open self.previous_day_close = self.previous_close self.first_bar_of_day = data self.day_open = self.first_bar_of_day.open abs_move = abs(data.close / self.first_bar_of_day.open - 1) minutes_elapsed = int((data.end_time - self.first_bar_of_day.time).total_seconds() // 60) #self.sigma_by_time[minutes_elapsed].update(data.end_time, abs_move) self.sigma_by_time[minutes_elapsed] = self.ema_weighted(self.sigma_by_time[minutes_elapsed], abs_move, self.period) upper_bound_reference = lower_bound_reference = self.first_bar_of_day.open if self.previous_day_close is not None: upper_bound_reference = upper_bound_reference + max(0, self.previous_day_close-upper_bound_reference) * self.gap_stretch_factor lower_bound_reference = lower_bound_reference - max(0, lower_bound_reference-self.previous_day_close) * self.gap_stretch_factor #self.upper_bound = upper_bound_reference * (1 + self.sigma_by_time[minutes_elapsed].current.value * self.scaling_factor) #self.lower_bound = lower_bound_reference * (1 - self.sigma_by_time[minutes_elapsed].current.value * self.scaling_factor) self.upper_bound = upper_bound_reference * (1 + self.sigma_by_time[minutes_elapsed] * self.scaling_factor) self.lower_bound = lower_bound_reference * (1 - self.sigma_by_time[minutes_elapsed] * self.scaling_factor) #self.upper_bound_by_time[minutes_elapsed] = self.upper_bound #self.lower_bound_by_time[minutes_elapsed] = self.lower_bound self.previous_close = data.close self.count += 1 return self.is_ready @property def is_ready(self) -> bool: return self.count > self.warm_up_period def reset(self): self.time = datetime.min self.value = 0 """
# region imports from AlgorithmImports import * from toolbox import read_config from security_init import IbkrSecurityInitializer from alpha import CustomAlphaModel from global_signals import GlobalSignals, GlobalCaseFilter from pcm_execution import MultiAlphaHelpers, MultiAlphaAveragingDirectionPCM, MultiAlphaMinQuantityChangeSpreadExecutionModel # endregion class ConcretumIntradayMomentumStrategy(QCAlgorithm): def initialize(algorithm): config = read_config(algorithm) #trailing_ema_period = int(algorithm.get_parameter("TRAILING_EMA_PERIOD")) #trailing_ema_exit_tol = float(algorithm.get_parameter("TRAILING_EMA_EXIT_TOL")) # Backtest algorithm.set_start_date(config.start_date + timedelta(days=config.wfo)) algorithm.set_end_date(config.end_date + timedelta(days=config.wfo)) algorithm.set_cash(config.initial_capital) #algorithm.set_brokerage_model(BrokerageName.INTERACTIVE_BROKERS_BROKERAGE, AccountType.MARGIN) algorithm.set_brokerage_model(BrokerageName.ALPACA, AccountType.MARGIN) algorithm.set_risk_free_interest_rate_model(ConstantRiskFreeRateInterestRateModel(0)) algorithm.settings.free_portfolio_value_percentage = algorithm.config.freePortfolioValuePercentage # Universe algorithm.set_security_initializer(IbkrSecurityInitializer(algorithm, algorithm.brokerage_model, FuncSecuritySeeder(algorithm.get_last_known_price))) for ticker in config.tickers: security = algorithm.add_equity(ticker, resolution=Resolution.MINUTE, fill_forward=True, leverage=algorithm.config.leverage, extended_market_hours=False) # Benchmark algorithm.myBenchmark = algorithm.config.myBenchmark algorithm.SetBenchmark(algorithm.myBenchmark) # Alpha Models algorithm.add_alpha(CustomAlphaModel()) # PCM #algorithm.set_portfolio_construction(EqualWeightingPortfolioConstructionModel(lambda t: None)) algorithm.ma_helpers = MultiAlphaHelpers(algorithm) algorithm.set_portfolio_construction(MultiAlphaAveragingDirectionPCM(algorithm, rebalance=Resolution.Daily, portfolioBias=PortfolioBias.LongShort, long_factor=algorithm.config.long_factor, short_factor=algorithm.config.short_factor, use_multi_alpha_insights=True, use_direction_averaged_weighting=True, max_percentage_per_position=algorithm.config.max_percentage_per_position)) # ExecutionModel #algorithm.set_execution(ImmediateExecutionModel()) algorithm.set_execution(MultiAlphaMinQuantityChangeSpreadExecutionModel( algorithm.config.minimumOrderQuantityChangePercentage, algorithm.config.acceptingSpreadPercentRTH, algorithm.config.acceptingSpreadPercentETH)) # Global Signals algorithm.global_signals = GlobalSignals(algorithm) algorithm.global_case_filter = GlobalCaseFilter(algorithm) # Global warm up for live mode #algorithm.SetWarmup(timedelta(days=21 if algorithm.LiveMode else 1)) def OnData(algorithm, data: Slice): algorithm.ma_helpers.OnData(data) # list stock splits and dividends algorithm.global_signals.OnData(data) # update global signals
# region imports from AlgorithmImports import * from collections import defaultdict # endregion #---------------------------------------------------------------------------------------- # # Multi Alpha Model Helpers # class MultiAlphaHelpers: """ Provide OnData and configure the basic settings for MultiAlphaAveragingDirectionPCM and MinQuantityChangeImmediateExecutionModel Usage: def initialize(algorithm): algorithm.ma_helpers = MultiAlphaHelpers(algorithm) def OnData(algorithm, data: Slice): algorithm.ma_helpers.OnData(data) """ def __init__(self, algorithm): self.algorithm = algorithm self.ApplyStandardSettings() def ApplyStandardSettings(self): ## PCM # Enable rebalances when the Alpha model emits insights or when insights expire in PCM. For testing, set to False. self.algorithm.Settings.RebalancePortfolioOnInsightChanges = True # Default = True # Enable rebalances when security changes occur in PCM. For testing, set to False. self.algorithm.Settings.RebalancePortfolioOnSecurityChanges = True # Default = True # Min order margin portfolio percentage to ignore bad orders and orders with small sizes in PCM. For testing, set to 0. self.algorithm.Settings.MinimumOrderMarginPortfolioPercentage = 0.003 # Default = 0.001, better to use a min order margin of $300 for a $100_000 portfolio size # Define long and short multipliers which are used in the PCM. For testing, set to 1.0. self.algorithm.long_factor = 1.0 self.algorithm.short_factor = 1.0 ## Execution # Min order quantity change to ignore bad orders and orders with small sizes in EXECUTION. For testing, set to 0. self.algorithm.minimumOrderQuantityChangePercentage = 0.10 # Custom minimum order quantity change percentage of at least 10% of the currently held quantity def OnData(self, slice: Slice): """ Test data # MLI: Forward stock split 2 for 1 on 23.10.2023 # ADXN: Reverse stock split 1 for 20 on 23.10.2023 # 2023-09-01 00:00:00 2023-09-01 00:00:00 OnSecuritiesChanged received a removal for WGOV R735QTJ8XC9X. # 2023-09-01 00:00:00 2023-09-01 00:00:00 SymbolData disposed a WGOV R735QTJ8XC9X with 1. TODO Delistings etc. in depth testing https://www.quantconnect.com/docs/v2/writing-algorithms/securities/asset-classes/us-equity/corporate-actions TODO If you have indicators in your algorithm, reset and warm-up your indicators with ScaledRaw data when splits occur so that the data in your indicators account for the price adjustments that the splits cause. https://www.quantconnect.com/docs/v2/writing-algorithms/indicators/key-concepts#10-Reset-Indicators For notification in live mode, please check out this doc for reference to implementation as the "# notification action" in the attached backtest. This information is provided by QuantConnect, and is still available even if you choose other brokerages such as IB, as long as you chose QuantConnect data feed which is only available on QuantConnect Cloud. Alternatively, you can subscribe to the Security Master dataset, and use Lean-CLI to update the data every day to get the splits and dividents. https://www.quantconnect.com/forum/discussion/12273/will-i-get-split-dividends-events-on-live-if-i-am-using-interactive-brokers-data-feed/p1 """ """ ## Stock splits # TODO check if we have the first candle of the day + test if events come once a day or more often if self.algorithm.Time.hour == 9 and self.algorithm.Time.minute == 31: for kvp in slice.Splits: symbol = kvp.Key #self.algorithm.Debug(f'{self.algorithm.Time} OnData received a split event for {symbol}.') ''' # Handle stock splits for all alpha models with a 'ResetAndWarmUpIndicators' method in their SymbolData # TODO in life mode: refresh all indicators daily to ensure we have most recent historical data? Is a reco of Jared from 2017 for alphaModel in self.algorithm.instantiated_alpha_models: if hasattr(alphaModel, 'symbol_data') and symbol in alphaModel.symbol_data and hasattr(alphaModel.symbol_data[symbol], 'ResetAndWarmUpIndicators'): modelName = getattr(alphaModel, 'Name', type(alphaModel).__name__) #self.algorithm.Debug(f'{self.algorithm.Time} OnData handled a split event for {symbol} in {modelName}.') alphaModel.symbol_data[symbol].ResetAndWarmUpIndicators() ''' ## Dividends # TODO check if we have the first candle of the day + test if events come once a day or more often if self.algorithm.Time.hour == 9 and self.algorithm.Time.minute == 31: for kvp in slice.Dividends: symbol = kvp.Key #self.algorithm.Debug(f'{self.algorithm.Time} OnData received a dividend event for {symbol}.') """ pass #---------------------------------------------------------------------------------------- # # Multi Alpha Averaging Direction PCM # class MultiAlphaAveragingDirectionPCM(PortfolioConstructionModel): """ This PCM is designed to combine active insights from multiple Alpha Models based on the 'insight.Direction' using two methods: (1) Equal weighting of each insight 1 We allocate 100% equally weighted to each active insight (2) Directional averaging of each insight per symbol 1 We allocate 100% equally weighted to each symbol 2 We multiply the symbol share with the average direction from all insights for a symbol (value from -1 .. +1) For further processing, we then distribute this result to all active insights Insight Requirements: Active: Insight must not be expired Latest Insight per Alpha Model: Insight used is the most recent insight from its Alpha Model for a given symbol 'insight.Direction': The direction property is used to caclulate the portfolio share Effects of active insights from several Alpha Models for one symbol: 'insight.Direction' is long: Vote for a bullish portfolio weight. If we have 1 long insight, the weight will be 100%. 'insight.Direction' is short: Vote for a bearish portfolio weight. If we have 1 long and 1 short insight, the weight will be 0%. 'insight.Direction' is not active: Don't vote at all. If we have 2 long insights and a third Alpha Model does not vote, the weight will be 2/2 = 100%. 'insight.Direction' is flat: Vote for a neutral portfolio weight. If we have 2 long and 1 neutral insights, the weight will be 2/3 = 66.7%. !!! Note: This means that insights must be emitted as long as the Alpha Model sees a certain direction, not just once for an entry!!! Parameters and Switches: 'portfolioBias': Insight must align with the portfolio bias 'long_factor' and 'short_factor': To adjust the quantity in the portfolio 'use_multi_alpha_insights': Switch to activate the grouping of insights by symbol and apha model 'use_direction_averaged_weighting': Switch for (1) equal weighting or (2) directional averaging 'max_percentage_per_position': The resulting position size must be within the specified portfolio limits Implementation It overrides all common methods of the base class. Changes are made in the GetTargetInsights and the DetermineTargetPercent methods as suggested in the QC documentation. https://www.quantconnect.com/docs/v2/writing-algorithms/algorithm-framework/portfolio-construction/key-concepts GetTargetInsights: To combine the active insights differently, the GetTargetInsights returns all active insights. DetermineTargetPercent: Target weights are beeing derived based on the average direction of all active insights from all Alpha Models for a symbol. Usage: self.SetPortfolioConstruction(MultiAlphaAveragingDirectionPCM(self)) """ def __init__(self, algorithm, rebalance=Resolution.Daily, portfolioBias=PortfolioBias.LongShort, long_factor=1., short_factor=0.6, use_multi_alpha_insights=True, use_direction_averaged_weighting=True, max_percentage_per_position=0.1): super().__init__() self.algorithm = algorithm self.portfolioBias = portfolioBias self.use_multi_alpha_insights = use_multi_alpha_insights self.use_direction_averaged_weighting = use_direction_averaged_weighting # Define long and short multipliers self.long_factor = long_factor self.short_factor = short_factor # Define max percentage of portfolio of one security per position self.max_percentage_per_position = max_percentage_per_position def CreateTargets(self, algorithm, insights): """ Generates portfolio targets based on active insights from multiple Alpha Models. This method aggregates multiple insights per symbol into a single portfolio target, applying leverage and specified long/short factors. The resulting target ensures that the portfolio aligns with the combined directional insights provided by different Alpha Models while respecting a maximum percentage allocation per position. """ ## Get targets from insights using the base model targets_per_insight = super().CreateTargets(algorithm, insights) # Return, if no targets if len(targets_per_insight) == 0: return targets_per_insight # same as return [] ## Aggregate several targets per symbol to only one target per symbol # Note: Immediate Execution model fills a PortfolioTargetCollection dict(k=Symbol,v=PortfolioTarget) using AddRange, commented as "If a target for the same symbol already exists it will be overwritten." # So we have to ensure only one target per symbol is returned here. targets_per_symbol = defaultdict(int) for x in targets_per_insight: # Determine long_short_factor long_short_factor = self.long_factor if x.Quantity > 0 else self.short_factor # Apply leverage and the long_short_factor and aggregate adjusted_quantity = x.Quantity * algorithm.Securities[x.Symbol].Leverage * long_short_factor targets_per_symbol[x.Symbol] += adjusted_quantity ## Limit the quantity to the max quantity per security # Create new PortfolioTargets with aggregated quantities if not self.max_percentage_per_position: # Create new PortfolioTargets without limited quantities targets = [PortfolioTarget(symbol, quantity) for symbol, quantity in targets_per_symbol.items()] else: # Create new PortfolioTargets with quantities limited by max percentage total_portfolio_value = algorithm.Portfolio.TotalPortfolioValue max_value = total_portfolio_value * self.max_percentage_per_position targets = [PortfolioTarget(symbol, 0) if algorithm.Securities[symbol].Price == 0 else PortfolioTarget(symbol, np.sign(quantity) * int(min(abs(quantity), max_value / algorithm.Securities[symbol].Price))) for symbol, quantity in targets_per_symbol.items()] return targets def GetTargetInsights(self) -> List[Insight]: """ Gets the last generated active insight for each symbol """ # Get all insights from the algorithm that haven't expired yet, for each symbol that is still in the universe activeInsights = self.algorithm.Insights.GetActiveInsights(self.algorithm.UtcTime) if self.use_multi_alpha_insights: ## GetTargetInsights by symbol and model # Group insights by symbol and apha model using a nested defaultdict keyed by symbol and then source model; value = latest insight last_insights_per_symbol_model = defaultdict(lambda: defaultdict(lambda: None)) # Iterate over each active insight and store it, if the insight is more recent than the currently stored one for its symbol and source model for insight in activeInsights: if insight.CloseTimeUtc >= self.algorithm.UtcTime: # only consider insights that are not outdated current_stored_insight = last_insights_per_symbol_model[insight.Symbol][insight.SourceModel] # Check if we already have a stored insight for this symbol and model, and if the new one is more recent if current_stored_insight is None or insight.GeneratedTimeUtc > current_stored_insight.GeneratedTimeUtc: last_insights_per_symbol_model[insight.Symbol][insight.SourceModel] = insight # Flatten the nested dictionary to get a list of the latest active insights from each model for each symbol self.insights = [insight for symbol_insights in last_insights_per_symbol_model.values() for insight in symbol_insights.values()] else: ## GetTargetInsights by symbol only # Group insights by symbol and get the last generated insight for each symbol last_insights_per_symbol = defaultdict(list) for insight in activeInsights: last_insights_per_symbol[insight.Symbol].append(insight) # Select the last generated active insight for each symbol self.insights = [sorted(insights, key=lambda x: x.GeneratedTimeUtc)[-1] for insights in last_insights_per_symbol.values()] return self.insights def DetermineTargetPercent(self, activeInsights: List[Insight]) -> Dict[Insight, float]: """ Determines the target percentage allocation for each active insight based on the selected weighting method. The process considers various factors such as the portfolio bias, the direction of insights, and whether direction averaging or equal weighting is applied. The final output is a dictionary mapping each active insight to its corresponding portfolio target percentage. Parameters: activeInsights : List[Insight] A list of active insights that have not expired and are generated by various Alpha Models. Returns: A dictionary where each key is an active insight and the value is the target portfolio percentage allocated to that insight. Implementation Notes: The method calculates the percentage allocation for each insight considering the number of active insights and their respective directions. The resulting portfolio allocation respects the constraints imposed by the portfolio bias and maximum position size. The portfolio target percentage can be positive (long), negative (short), or zero (flat), depending on the calculated insights and the portfolio's overall strategy. """ # Define the threshold for the expiry date comparison (4 days) expiry_threshold = timedelta(days=4) if self.use_direction_averaged_weighting == False: ## 'Equal Weighting' of each insight # Same as EqualWeighting https://github.com/QuantConnect/Lean/blob/master/Algorithm.Framework/Portfolio/EqualWeightingPortfolioConstructionModel.cs#L118 insights_count = sum(1 for insight in activeInsights if insight.Direction != InsightDirection.Flat and self.RespectPortfolioBias(insight)) # we count all insights pct_by_insight = {insight: 1. / insights_count if insights_count > 0 else 0 for insight in activeInsights if self.RespectPortfolioBias(insight)} # we allocate 100% equally weighted to each insight else: ## 'Direction Averaged Weighting' per source Alpha model of each insight insights_count = 0 symbol_insight_count = defaultdict(int) symbol_insight_dir_sum = defaultdict(int) for insight in activeInsights: insights_count += 1 # we count all insights symbol_insight_count[insight.Symbol] += 1 # we count all insights per symbol symbol_insight_dir_sum[insight.Symbol] += insight.Direction # we add up all insight directions per symbol symbols_count = len(symbol_insight_count) # Arthur, bitte hier Zugriff auf die Vola ermöglichen # Step 1: we allocate 100% EQUALLY weighted to each symbol to get the symbol share using (1. / symbols_count) # Step 2: we multiply the symbol share with the average direction of this symbol (value from -1 .. +1) using (direction_sum / symbol_insight_count) # Step 3: as targetPercent is indexed by insight, we may have several insights per symbol and therefore need to distribute the result per symbol to each insight of this symbol using (1. / symbol_insight_count) pct_by_symbol = {symbol: (1./symbols_count) * (direction_sum / symbol_insight_count[symbol]) * (1./symbol_insight_count[symbol]) if symbol_insight_count[symbol] > 0 else 0 for symbol, direction_sum in symbol_insight_dir_sum.items()} # Fill the target percent dict with the calculated percents for each insight targetPercent = {} for insight in activeInsights: if self.use_direction_averaged_weighting == False: ## 'Equal Weighting' of each insight # We apply percents indexed by insight percent = pct_by_insight.get(insight, 0) targetPercent[insight] = percent else: ## 'Direction Averaged Weighting' per source Alpha model of each insight # We apply percents indexed by symbol percent = pct_by_symbol.get(insight.Symbol, 0) # We need to switch the sign of the weight, if the signs of insight direction and weight are not the same if percent * insight.Direction < 0: percent = -percent # If the portfolio bias and the sign of the weight are not the same, we need to filter by neglecting the weight # We do this 'late' in the process, so we use an adverse direction in the averaging differently than 'Flat', even if we never enter in that direction # This has to be conceptionally balanced with the Alpha Models (a) only emitting insights in case of entry (b) constantly emitting insights also in case of flat if self.portfolioBias != PortfolioBias.LongShort and percent * self.portfolioBias < 0: percent = 0 targetPercent[insight] = percent return targetPercent #---------------------------------------------------------------------------------------- # # Minimum Changed Quantity and Spread ExecutionModel # class MultiAlphaMinQuantityChangeSpreadExecutionModel(ExecutionModel): """ An execution model that submits market orders to achieve the desired portfolio targets, if - the change in quantity is significant enough based on a specified threshold to avoid executing insignificant trades - the current spread is tight Note this execution model will not work using Resolution.DAILY since Exchange.exchange_open will be false, suggested resolution is Minute Based on ImmediateExecutionModel, added: AboveMinimumQuantityChange to check if the quantity alters the current holdings by at least minimumOrderQuantityChangePercentage of the currently held quantity SpreadExecutionModel from QC 'minimumOrderQuantityChangePercentage': The minimum percentage change in quantity required to execute an order, relative to the currently held quantity 'accepting_spread_percent_rth': The maximum percentage of accepted spread in regular trading hours 'accepting_spread_percent_eth': The maximum percentage of accepted spread in extended trading hours Usage: self.SetExecution(MultiAlphaMinQuantityChangeSpreadExecutionModel(minimumOrderQuantityChangePercentage=0.10, accepting_spread_percent_rth=0.003, accepting_spread_percent_eth=0.006)) """ def __init__(self, minimumOrderQuantityChangePercentage=0.10, accepting_spread_percent_rth=0.003, accepting_spread_percent_eth=0.006): # Initializes a new instance of the MultiAlphaMinQuantityChangeSpreadExecutionModel class self.targetsCollection = PortfolioTargetCollection() self.minimumOrderQuantityChangePercentage = minimumOrderQuantityChangePercentage self.accepting_spread_percent_rth = Math.abs(accepting_spread_percent_rth) self.accepting_spread_percent_eth = Math.abs(accepting_spread_percent_eth) def Execute(self, algorithm, targets): """ Immediately submits orders for the specified portfolio targets Implementation: The method first adds the incoming targets to the internal `targetsCollection`. It then iterates over the targets, checking if the quantity to be ordered meets both the minimum order margin and the minimum quantity change criteria. If both criteria are met, a market order is submitted for the target quantity. After execution, fulfilled targets are removed from the collection. """ # update the complete set of portfolio targets with the new targets self.targetsCollection.AddRange(targets) # for performance we check count value, OrderByMarginImpact and ClearFulfilled are expensive to call if not self.targetsCollection.IsEmpty: for target in self.targetsCollection.OrderByMarginImpact(algorithm): security = algorithm.Securities[target.Symbol] # calculate remaining quantity to be ordered quantity = OrderSizing.GetUnorderedQuantity(algorithm, target, security) # check order entry conditions if quantity != 0: aboveMinimumPortfolio = BuyingPowerModelExtensions.AboveMinimumOrderMarginPortfolioPercentage(security.BuyingPowerModel, security, quantity, algorithm.Portfolio, algorithm.Settings.MinimumOrderMarginPortfolioPercentage) aboveMinimumQuantityChange = self.AboveMinimumQuantityChange(security, quantity, algorithm, self.minimumOrderQuantityChangePercentage) #if aboveMinimumPortfolio: if aboveMinimumPortfolio and aboveMinimumQuantityChange: if self.spread_is_favorable(security): ######################################### algorithm.MarketOrder(security, quantity) ######################################### elif not PortfolioTarget.MinimumOrderMarginPercentageWarningSent: # will trigger the warning if it has not already been sent PortfolioTarget.MinimumOrderMarginPercentageWarningSent = False self.targetsCollection.ClearFulfilled(algorithm) def AboveMinimumQuantityChange(self, security, quantity, algorithm, minimumOrderQuantityChangePercentage=0.1): """ Returns True, if the calculated percentage change in quantity is greater than or equal to the specified minimum percentage False, if the quantity does not alter the current holdings by at least minimumOrderQuantityChangePercentage """ # Calculate the percentage change in quantity relative to current holdings currentHoldings = security.Holdings.Quantity if currentHoldings == 0: # If there are no current holdings, any quantity is significant return True # Calculate the percentage change percentage_change = abs(quantity) / abs(currentHoldings) # Check if the change is above the minimum threshold return percentage_change >= minimumOrderQuantityChangePercentage def spread_is_favorable(self, security): '''Determines if the spread is in desirable range.''' # Price has to be larger than zero to avoid zero division error, or negative price causing the spread percentage < 0 by error if security.exchange.exchange_open: # In opening hours of exchange return security.price > 0 and security.ask_price > 0 and security.bid_price > 0 \ and (security.ask_price - security.bid_price) / security.price <= self.accepting_spread_percent_rth else: # Outside opening hours of exchange return security.price > 0 and security.ask_price > 0 and security.bid_price > 0 \ and (security.ask_price - security.bid_price) / security.price <= self.accepting_spread_percent_eth
# region imports from AlgorithmImports import * # endregion class IbkrSecurityInitializer(BrokerageModelSecurityInitializer): def __init__(self, algorithm: QCAlgorithm, brokerage_model: IBrokerageModel, security_seeder: ISecuritySeeder) -> None: self.algorithm = algorithm super().__init__(brokerage_model, security_seeder) def initialize(self, security: Security) -> None: super().initialize(security) security.set_shortable_provider(InteractiveBrokersShortableProvider()) if not self.algorithm.config.costs_enabled: #security.set_slippage_model(NullSlippageModel()) security.set_slippage_model(HalvedSpreadSlippageModel()) #security.set_slippage_model(FullSpreadSlippageModel()) security.set_fee_model(ConstantFeeModel(0)) class HalvedSpreadSlippageModel: def GetSlippageApproximation(self, asset: Security, order: Order) -> float: slippage = 0 if order.type is OrderType.MARKET: # Arthur, ich habe hier das Vorzeichen verändert, da ja durch Slippage die PnL schlechter werden sollte (sie wurde besser) slippage = +0.5 * max(0, (asset.ask_price - asset.bid_price)) return slippage class FullSpreadSlippageModel: def GetSlippageApproximation(self, asset: Security, order: Order) -> float: slippage = 0 if order.type is OrderType.MARKET: slippage = +1.0 * max(0, (asset.ask_price - asset.bid_price)) return slippage ''' class ZeroSlippageFillModel(FillModel): def market_fill(self, security: Security, order: Order) -> OrderEvent: fill = super().market_fill(security, order) fill_price = security.bid_price if order.quantity > 0 else security.ask_price fill.fill_price = fill_price return fill def combo_market_fill(self, order: Order, parameters: FillModelParameters) -> List[OrderEvent]: fills = super().combo_market_fill(order, parameters) for kvp, fill in zip(sorted(parameters.securities_for_orders, key=lambda x: x.Key.Id), fills): _security = kvp.value fill_Price = _security.bid_price if fill.fill_quantity > 0 else _security.ask_price fill.fill_price = fill_price return fills def stop_market_fill(self, security: Security, order: StopMarketOrder) -> OrderEvent: fill = super().stop_market_fill(security, order) fill_price = security.bid_price if order.quantity > 0 else security.ask_price fill.fill_price = fill_price return fill '''
# region imports from AlgorithmImports import * from pydantic import BaseModel, ConfigDict from algo_config import AlgorithmConfig # endregion def read_config(algorithm: QCAlgorithm) -> AlgorithmConfig: params = {param.key.lower(): param.value for param in algorithm.get_parameters()} algo_config = AlgorithmConfig(**params) algorithm.config = algo_config QCAlgorithm.config = algo_config return algo_config class ExtendedBaseModel(BaseModel): model_config = ConfigDict(arbitrary_types_allowed=True) class TimeFrameHelper: def __init__(self, security: Security, resolution: Resolution): bars_per_day = max(1, security.exchange.hours.regular_market_duration.total_seconds() / Extensions.to_time_span(resolution).total_seconds()) self.year = int(round(bars_per_day * security.exchange.trading_days_per_year, 0)) self.half = int(round(self.year/2, 0)) self.quarter = int(round(self.year/4, 0)) self.twomonths = int(round(self.year/6, 0)) self.month = int(round(self.year/12, 0)) self.week = int(round(self.year/52, 0)) self.day = int(round(bars_per_day, 0))