Overall Statistics |
Total Orders 112 Average Win 6.26% Average Loss -4.60% Compounding Annual Return 41.824% Drawdown 44.600% Expectancy 0.881 Start Equity 5000 End Equity 29215.40 Net Profit 484.308% Sharpe Ratio 0.945 Sortino Ratio 1.037 Probabilistic Sharpe Ratio 37.206% Loss Rate 20% Win Rate 80% Profit-Loss Ratio 1.36 Alpha 0 Beta 0 Annual Standard Deviation 0.34 Annual Variance 0.116 Information Ratio 1.024 Tracking Error 0.34 Treynor Ratio 0 Total Fees $116.44 Estimated Strategy Capacity $3700000.00 Lowest Capacity Asset BRKB R735QTJ8XC9X Portfolio Turnover 2.63% |
# region imports from AlgorithmImports import * # endregion class MarketCapFactor: def __init__(self, security): self._security = security @property def value(self): return self._security.fundamentals.market_cap class SortinoFactor: def __init__(self, algorithm, symbol, lookback): self._sortino = algorithm.sortino(symbol, lookback, resolution=Resolution.DAILY) @property def value(self): return self._sortino.current.value class KERFactor: def __init__(self, algorithm, symbol, lookback): self._ker = algorithm.ker(symbol, lookback, resolution=Resolution.DAILY) @property def value(self): return self._ker.current.value class FCFYieldFactor: """Free Cash Flow Yield factor""" def __init__(self, security): self._security = security @property def value(self): try: fundamentals = self._security.fundamentals return fundamentals.valuation_ratios.FCFYield except: return np.nan class CorrFactor: def __init__(self, algorithm, symbol, reference, lookback): self._c = algorithm.c(symbol, reference, lookback, correlation_type=CorrelationType.Pearson, resolution=Resolution.DAILY) @property def value(self): return 1 - abs(self._c.current.value) class ROCFactor: def __init__(self, algorithm, symbol, lookback): self._roc = algorithm.roc(symbol, lookback, resolution=Resolution.DAILY) @property def value(self): return self._roc.current.value
# region imports from AlgorithmImports import * from scipy import optimize from scipy.optimize import Bounds from factors import * # endregion class FactorWeightOptimizationAlgorithm(QCAlgorithm): _selection_data_by_symbol = {} def initialize(self): self.set_start_date(2020, 1, 1) self.set_cash(5000) self.settings.automatic_indicator_warm_up = True spy = Symbol.create('SPY', SecurityType.EQUITY, Market.USA) # Add a universe of hourly data. self.universe_settings.resolution = Resolution.Minute self.universe_size = self.get_parameter('universe_size', 8) self.set_brokerage_model(BrokerageName.INTERACTIVE_BROKERS_BROKERAGE, AccountType.MARGIN) self.set_security_initializer( BrokerageModelSecurityInitializer(self.brokerage_model, FuncSecuritySeeder(self.get_last_known_prices)) ) #self._universe = self.add_universe(self.universe.etf(spy, self.UniverseSettings, self.ETFConstituentsFilter2)) self._universe = self.add_universe(self.universe.etf(spy, universe_filter_func=lambda constituents: [c.symbol for c in sorted([c for c in constituents if c.weight], key=lambda c: c.weight)[-self.universe_size:]])) self._lookback = self.get_parameter('lookback', 21) # Set a 21-day trading lookback. self.set_execution(FSLBSpreadExecutionModel(0.009)) self.leverageset = 1.9 # Create a Schedule Event to rebalance the portfolio. self.schedule.on(self.date_rules.month_start(spy), self.time_rules.after_market_open(spy, 31), self._rebalance) #self.schedule.on(self.date_rules.week_start(spy), self.time_rules.after_market_open(spy, 35), self._rebalance) #self.schedule.on(self.date_rules.every([5]), self.time_rules.before_market_close(spy, 1), self._rebalance) #self.schedule.on(self.date_rules.every_day(spy), self.TimeRules.Every(TimeSpan.FromHours(24*(self._lookback))), self._rebalance) self.set_warm_up(timedelta(self._lookback+1)) def ETFConstituentsFilter(self, constituents): # Laden historischer Daten für alle Symbole symbols = [c.symbol for c in constituents if c.weight] history = self.History(symbols, 1, Resolution.Daily) # Speichern der letzten bekannten Preise prices = {} for symbol in symbols: if not history.empty and symbol in history.index.levels[0]: prices[symbol] = history.loc[symbol].iloc[-1]['close'] else: prices[symbol] = None # Kein Preis verfügbar # Filtern Sie die Konstituenten basierend auf Gewicht und Preis filtered_constituents = [ c for c in constituents if c.weight and prices.get(c.symbol) is not None and prices.get(c.symbol) < 50 ] return [c.symbol for c in sorted(filtered_constituents, key=lambda c: c.weight)[-self.universe_size:]] def ETFConstituentsFilter2(self, constituents): # Laden historischer Daten für alle Symbole filtered_constituents = [c for c in constituents if c.weight is not None] return [c.symbol for c in sorted(filtered_constituents, key=lambda c: c.weight)[-self.universe_size:]] def on_securities_changed(self, changes): for security in changes.added_securities: # Create factors for assets that enter the universe. # security.factors = [MarketCapFactor(security), SortinoFactor(self, security.symbol, self._lookback)] security.factors = [FCFYieldFactor(security), KERFactor(self, security.symbol, self._lookback)] def _rebalance1(self): if self.IsMarketOpen("SPY"): # Get raw factor values of the universe constituents. factors_df = pd.DataFrame() for symbol in self._universe.selected: for i, factors in enumerate(self.securities[symbol].factors): factors_df.loc[symbol, i] = factors.value # Calculate the factor z-scores. factor_zscores = (factors_df - factors_df.mean()) / factors_df.std() # Run an optimization to find optimal factor weights. Objective: Maximize trailing return. Initial guess: Equal-weighted. trailing_return = self.history(list(self._universe.selected), self._lookback, Resolution.DAILY).close.unstack(0).pct_change(self._lookback-1).iloc[-1] num_factors = factors_df.shape[1] factor_weights = optimize.minimize(lambda weights: -(np.dot(factor_zscores, weights) * trailing_return).sum(), x0=np.array([1.0/num_factors] * num_factors), method='Nelder-Mead', bounds=Bounds([0] * num_factors, [1] * num_factors), options={'maxiter': 10}).x # Calculate the portfolio weights. Ensure the portfolio is long-only with 100% exposure, then rebalance the portfolio. portfolio_weights = (factor_zscores * factor_weights).sum(axis=1) portfolio_weights = portfolio_weights[portfolio_weights > 0] self.set_holdings([PortfolioTarget(symbol, self.leverageset*weight/portfolio_weights.sum()) for symbol, weight in portfolio_weights.items()], True) pass def _rebalance(self): # Überprüfen, ob der Markt offen ist if not self.IsMarketOpen("SPY"): return # 1. Erstellen des Factors-DataFrame factors_df = pd.DataFrame() for symbol in self._universe.selected: if symbol in self.securities: for i, factors in enumerate(self.securities[symbol].factors): factors_df.loc[symbol, i] = factors.value # Sicherstellen, dass es Faktoren gibt if factors_df.empty: self.Debug("Factors DataFrame is empty. Skipping rebalance.") return # Berechnen der Z-Scores factor_zscores = (factors_df - factors_df.mean()) / factors_df.std() # 2. Berechnung der trailing returns history_data = self.history(list(self._universe.selected), self._lookback, Resolution.DAILY) if 'close' not in history_data.columns.get_level_values(-1): self.Debug("Close prices not available in historical data.") return # Zugriff auf 'close'-Preise if isinstance(history_data.columns, pd.MultiIndex): # Zugriff auf die 'close'-Spalte des MultiIndex close_prices = history_data.xs('close', axis=1, level=-1).unstack(level=0) else: # Falls kein MultiIndex vorhanden ist close_prices = history_data['close'].unstack(level=0) # Berechnung der Rückgaben trailing_return = close_prices.pct_change(self._lookback - 1).iloc[-1] # 3. Optimierung der Faktor-Gewichte num_factors = factors_df.shape[1] initial_guess = np.array([1.0 / num_factors] * num_factors) def objective_function(weights): portfolio_return = np.dot(factor_zscores, weights) * trailing_return return -portfolio_return.sum() # Maximieren, daher negatives Vorzeichen result = optimize.minimize( objective_function, x0=initial_guess, method='Nelder-Mead', bounds=Bounds([0] * num_factors, [1] * num_factors), options={'maxiter': 10} ) # Überprüfen, ob die Optimierung erfolgreich war if not result.success: self.Debug(f"Optimization failed: {result.message}") return factor_weights = result.x # 4. Portfolio-Gewichte berechnen und normalisieren portfolio_weights = (factor_zscores @ factor_weights).clip(0) portfolio_weights /= portfolio_weights.sum() # Normalisierung auf 100% # 5. Rebalancieren des Portfolios targets = [ PortfolioTarget(symbol, self.leverageset * weight) for symbol, weight in portfolio_weights.items() if weight > 0 ] self.SetHoldings(targets, True) class FSLBSpreadExecutionModel(ExecutionModel): '''Execution model that submits orders while the current spread is tight. Note this execution model will not work using Resolution.DAILY since Exchange.exchange_open will be false, suggested resolution is Minute ''' def __init__(self, accepting_spread_percent=0.005): '''Initializes a new instance of the SpreadExecutionModel class''' self.targets_collection = PortfolioTargetCollection() # Gets or sets the maximum spread compare to current price in percentage. self.accepting_spread_percent = Math.abs(accepting_spread_percent) def execute(self, algorithm, targets): '''Executes market orders if the spread percentage to price is in desirable range, prioritizing sell orders. Args: algorithm: The algorithm instance targets: The portfolio targets''' # update the complete set of portfolio targets with the new targets self.targets_collection.add_range(targets) # for performance we check count value, OrderByMarginImpact and # ClearFulfilled are expensive to call if not self.targets_collection.is_empty: # 1. Separate targets into sell and buy orders using get_unordered_quantity sell_targets = [] buy_targets = [] for target in self.targets_collection: security = algorithm.securities[target.symbol] quantity = OrderSizing.get_unordered_quantity(algorithm, target, security, True) if quantity < 0: sell_targets.append(target) elif quantity > 0: buy_targets.append(target) # 2. Process sell orders first for target in sell_targets: symbol = target.symbol security = algorithm.securities[symbol] unordered_quantity = OrderSizing.get_unordered_quantity(algorithm, target, security, True) buying_power = algorithm.portfolio.get_buying_power(symbol, OrderDirection.SELL) margin_remaining = algorithm.portfolio.margin_remaining ''' if(margin_remaining <= 0): algorithm.liquidate(symbol) self.targets_collection.remove(target) continue''' if unordered_quantity != 0: algorithm.market_order(symbol, unordered_quantity) # 3. Process buy orders if algorithm.portfolio.margin_remaining>0: for target in buy_targets: symbol = target.symbol security = algorithm.securities[symbol] unordered_quantity = OrderSizing.get_unordered_quantity(algorithm, target, security, True) margin_remaining = algorithm.portfolio.margin_remaining if self.spread_is_favorable(security): # Berechnung des maximalen Kaufkraftniveaus basierend auf der Margin if 1.2 * algorithm.Securities[symbol].AskPrice * unordered_quantity > algorithm.Portfolio.MarginRemaining and unordered_quantity != 0: unordered_quantity = math.floor(algorithm.Portfolio.MarginRemaining / algorithm.Securities[symbol].AskPrice) # Berechnung des initialen Margin-Bedarfs für die angeforderte Menge if unordered_quantity > 0: # Wenn genügend freie Margin verfügbar ist, um die Order auszuführen if algorithm.Portfolio.MarginRemaining - algorithm.Securities[symbol].BuyingPowerModel.GetInitialMarginRequirement(InitialMarginParameters(algorithm.Securities[symbol], unordered_quantity)).Value >= 0: #algorithm.Debug(f"Genügend Kaufkraft für {unordered_quantity} {symbol}. Order wird ausgeführt.") algorithm.MarketOrder(symbol, unordered_quantity) self.targets_collection.clear_fulfilled(algorithm) def spread_is_favorable(self, security): '''Determines if the spread is in desirable range.''' # Price has to be larger than zero to avoid zero division error, or # negative price causing the spread percentage < 0 by error # Has to be in opening hours of exchange to avoid extreme spread in # OTC period return security.exchange.exchange_open \ and security.price > 0 and security.ask_price > 0 and security.bid_price > 0 \ and (security.ask_price - security.bid_price) / security.price <= self.accepting_spread_percent class MarketCapFactor: def __init__(self, security): self._security = security @property def value(self): return self._security.fundamentals.market_cap class SortinoFactor: def __init__(self, algorithm, symbol, lookback): self._sortino = algorithm.sortino(symbol, lookback, resolution=Resolution.DAILY) @property def value(self): return self._sortino.current.value class KERFactor: def __init__(self, algorithm, symbol, lookback): self._ker = algorithm.ker(symbol, lookback, resolution=Resolution.DAILY) @property def value(self): return self._ker.current.value class FCFYieldFactor: """Free Cash Flow Yield factor""" def __init__(self, security): self._security = security @property def value(self): try: fundamentals = self._security.fundamentals return fundamentals.valuation_ratios.FCFYield except: return np.nan class CorrFactor: def __init__(self, algorithm, symbol, reference, lookback): self._c = algorithm.c(symbol, reference, lookback, correlation_type=CorrelationType.Pearson, resolution=Resolution.DAILY) @property def value(self): return 1 - abs(self._c.current.value) class ROCFactor: def __init__(self, algorithm, symbol, lookback): self._roc = algorithm.roc(symbol, lookback, resolution=Resolution.DAILY) @property def value(self): return self._roc.current.value