Overall Statistics |
Total Trades 604 Average Win 0.92% Average Loss -0.41% Compounding Annual Return 5.612% Drawdown 36.700% Expectancy 0.128 Net Profit 17.792% Sharpe Ratio 0.297 Probabilistic Sharpe Ratio 7.043% Loss Rate 65% Win Rate 35% Profit-Loss Ratio 2.24 Alpha 0 Beta 0 Annual Standard Deviation 0.199 Annual Variance 0.04 Information Ratio 0.297 Tracking Error 0.199 Treynor Ratio 0 Total Fees $0.00 Estimated Strategy Capacity $6700000000.00 Lowest Capacity Asset ES Y9CDFY0C6TXD Portfolio Turnover 19.19% |
#region imports from AlgorithmImports import * from utils import GetPositionSize from futures import categories #endregion class FastTrendFollowingLongAndShortWithTrendStrenthAlphaModel(AlphaModel): futures = [] BUSINESS_DAYS_IN_YEAR = 256 FORECAST_SCALAR_BY_SPAN = {64: 1.91, 32: 2.79, 16: 4.1, 8: 5.95, 4: 8.53, 2: 12.1} # Given by author on https://gitfront.io/r/user-4000052/iTvUZwEUN2Ta/AFTS-CODE/blob/chapter7.py def __init__(self, algorithm, slow_ema_span, abs_forecast_cap, sigma_span, target_risk, blend_years): self.algorithm = algorithm self.slow_ema_span = slow_ema_span self.fast_ema_span = int(self.slow_ema_span / 4) # "Any ratio between the two moving average lengths of two and six gives statistically indistinguishable results." (p.165) self.annulaization_factor = self.BUSINESS_DAYS_IN_YEAR ** 0.5 self.abs_forecast_cap = abs_forecast_cap self.sigma_span = sigma_span self.target_risk = target_risk self.blend_years = blend_years self.idm = 1.5 # Instrument Diversification Multiplier. Hardcoded in https://gitfront.io/r/user-4000052/iTvUZwEUN2Ta/AFTS-CODE/blob/chapter8.py self.forecast_scalar = self.FORECAST_SCALAR_BY_SPAN[self.fast_ema_span] self.categories = categories self.total_lookback = timedelta(365*self.blend_years+self.slow_ema_span) self.day = -1 def Update(self, algorithm: QCAlgorithm, data: Slice) -> List[Insight]: # Record the new contract in the continuous series if data.QuoteBars.Count: for future in self.futures: future.latest_mapped = future.Mapped # If warming up and still > 7 days before start date, don't do anything # We use a 7-day buffer so that the algorithm has active insights when warm-up ends if algorithm.StartDate - algorithm.Time > timedelta(7): return [] if self.day == data.Time.day or data.Bars.Count == 0: return [] # Estimate the standard deviation of % daily returns for each future sigma_pct_by_future = {} for future in self.futures: # Estimate the standard deviation of % daily returns sigma_pct = self.estimate_std_of_pct_returns(future.raw_history, future.adjusted_history) if sigma_pct is None: continue sigma_pct_by_future[future] = sigma_pct # Create insights insights = [] weight_by_symbol = GetPositionSize({future.Symbol: self.categories[future.Symbol] for future in sigma_pct_by_future.keys()}) for symbol, instrument_weight in weight_by_symbol.items(): future = algorithm.Securities[symbol] current_contract = algorithm.Securities[future.Mapped] daily_risk_price_terms = sigma_pct_by_future[future] / (self.annulaization_factor) * current_contract.Price # "The price should be for the expiry date we currently hold (not the back-adjusted price)" (p.55) # Calculate target position position = (algorithm.Portfolio.TotalPortfolioValue * self.idm * instrument_weight * self.target_risk) \ /(future.SymbolProperties.ContractMultiplier * daily_risk_price_terms * (self.annulaization_factor)) # Adjust target position based on forecast risk_adjusted_ewmac = future.ewmac.Current.Value / daily_risk_price_terms scaled_forecast_for_ewmac = risk_adjusted_ewmac * self.forecast_scalar forecast = max(min(scaled_forecast_for_ewmac, self.abs_forecast_cap), -self.abs_forecast_cap) if forecast * position == 0: continue # Save some data for the PCM current_contract.forecast = forecast current_contract.position = position # Create the insights local_time = Extensions.ConvertTo(algorithm.Time, algorithm.TimeZone, future.Exchange.TimeZone) expiry = future.Exchange.Hours.GetNextMarketOpen(local_time, False) - timedelta(seconds=1) insights.append(Insight.Price(future.Mapped, expiry, InsightDirection.Up if forecast * position > 0 else InsightDirection.Down)) if insights: self.day = data.Time.day return insights def estimate_std_of_pct_returns(self, raw_history, adjusted_history): # Align history of raw and adjusted prices idx = sorted(list(set(adjusted_history.index).intersection(set(raw_history.index)))) adjusted_history_aligned = adjusted_history.loc[idx] raw_history_aligned = raw_history.loc[idx] # Calculate exponentially weighted standard deviation of returns returns = adjusted_history_aligned.diff().dropna() / raw_history_aligned.shift(1).dropna() rolling_ewmstd_pct_returns = returns.ewm(span=self.sigma_span, min_periods=self.sigma_span).std().dropna() if rolling_ewmstd_pct_returns.empty: # Not enough history return None # Annualize sigma estimate annulized_rolling_ewmstd_pct_returns = rolling_ewmstd_pct_returns * (self.annulaization_factor) # Blend the sigma estimate (p.80) blended_estimate = 0.3*annulized_rolling_ewmstd_pct_returns.mean() + 0.7*annulized_rolling_ewmstd_pct_returns.iloc[-1] return blended_estimate def consolidation_handler(self, sender: object, consolidated_bar: TradeBar) -> None: security = self.algorithm.Securities[consolidated_bar.Symbol] end_date = consolidated_bar.EndTime.date() if security.Symbol.IsCanonical(): # Update adjusted history security.adjusted_history.loc[end_date] = consolidated_bar.Close security.adjusted_history = security.adjusted_history[security.adjusted_history.index >= end_date - self.total_lookback] else: # Update raw history continuous_contract = self.algorithm.Securities[security.Symbol.Canonical] if consolidated_bar.Symbol == continuous_contract.latest_mapped: continuous_contract.raw_history.loc[end_date] = consolidated_bar.Close continuous_contract.raw_history = continuous_contract.raw_history[continuous_contract.raw_history.index >= end_date - self.total_lookback] def OnSecuritiesChanged(self, algorithm: QCAlgorithm, changes: SecurityChanges) -> None: for security in changes.AddedSecurities: symbol = security.Symbol # Create a consolidator to update the history security.consolidator = TradeBarConsolidator(timedelta(1)) security.consolidator.DataConsolidated += self.consolidation_handler algorithm.SubscriptionManager.AddConsolidator(symbol, security.consolidator) if security.Symbol.IsCanonical(): # Add some members to track price history security.adjusted_history = pd.Series() security.raw_history = pd.Series() # Create indicators for the continuous contract security.fast_ema = algorithm.EMA(security.Symbol, self.fast_ema_span, Resolution.Daily) security.slow_ema = algorithm.EMA(security.Symbol, self.slow_ema_span, Resolution.Daily) security.ewmac = IndicatorExtensions.Minus(security.fast_ema, security.slow_ema) security.automatic_indicators = [security.fast_ema, security.slow_ema] self.futures.append(security) for security in changes.RemovedSecurities: # Remove consolidator + indicators algorithm.SubscriptionManager.RemoveConsolidator(security.Symbol, security.consolidator) if security.Symbol.IsCanonical(): for indicator in security.automatic_indicators: algorithm.DeregisterIndicator(indicator)
# region imports from AlgorithmImports import * # endregion categories = { Symbol.Create(Futures.Financials.Y10TreasuryNote, SecurityType.Future, Market.CBOT): ("Fixed Income", "Bonds"), Symbol.Create(Futures.Indices.SP500EMini, SecurityType.Future, Market.CME): ("Equity", "US") }
# region imports from AlgorithmImports import * #from futures import future_datas from universe import AdvancedFuturesUniverseSelectionModel from alpha import FastTrendFollowingLongAndShortWithTrendStrenthAlphaModel from portfolio import BufferedPortfolioConstructionModel # endregion class FuturesFastTrendFollowingLongAndShortWithTrendStrenthAlgorithm(QCAlgorithm): undesired_symbols_from_previous_deployment = [] checked_symbols_from_previous_deployment = False futures = [] def Initialize(self): self.SetStartDate(2020, 7, 1) self.SetEndDate(2023, 7, 1) self.SetCash(1_000_000) self.SetBrokerageModel(BrokerageName.InteractiveBrokersBrokerage, AccountType.Margin) self.SetSecurityInitializer(MySecurityInitializer(self.BrokerageModel, FuncSecuritySeeder(self.GetLastKnownPrices))) self.Settings.MinimumOrderMarginPortfolioPercentage = 0 self.UniverseSettings.DataNormalizationMode = DataNormalizationMode.BackwardsPanamaCanal self.UniverseSettings.DataMappingMode = DataMappingMode.OpenInterest self.AddUniverseSelection(AdvancedFuturesUniverseSelectionModel()) slow_ema_span = 2 ** self.GetParameter("slow_ema_span_exponent", 6) # Should be >= 5. "It's convenient to stick to a series of parameter values that are powers of two" (p.131) blend_years = self.GetParameter("blend_years", 3) # Number of years to use when blending sigma estimates self.AddAlpha(FastTrendFollowingLongAndShortWithTrendStrenthAlphaModel( self, slow_ema_span, self.GetParameter("abs_forecast_cap", 20), # Hardcoded on p.173 self.GetParameter("sigma_span", 32), # Hardcoded to 32 on p.604 self.GetParameter("target_risk", 0.2), # Recommend value is 0.2 on p.75 blend_years )) self.Settings.RebalancePortfolioOnSecurityChanges = False self.Settings.RebalancePortfolioOnInsightChanges = False self.total_count = 0 self.day = -1 self.SetPortfolioConstruction(BufferedPortfolioConstructionModel( self.rebalance_func, self.GetParameter("buffer_scaler", 0.1) # Hardcoded on p.167 & p.173 )) self.AddRiskManagement(NullRiskManagementModel()) self.SetExecution(ImmediateExecutionModel()) self.SetWarmUp(timedelta(365*blend_years + slow_ema_span + 7)) def rebalance_func(self, time): if (self.total_count != self.Insights.TotalCount or self.day != self.Time.day) and not self.IsWarmingUp and self.CurrentSlice.QuoteBars.Count > 0: self.total_count = self.Insights.TotalCount self.day = self.Time.day return time return None def OnData(self, data): # Exit positions that aren't backed by existing insights. # If you don't want this behavior, delete this method definition. if not self.IsWarmingUp and not self.checked_symbols_from_previous_deployment: for security_holding in self.Portfolio.Values: if not security_holding.Invested: continue symbol = security_holding.Symbol if not self.Insights.HasActiveInsights(symbol, self.UtcTime): self.undesired_symbols_from_previous_deployment.append(symbol) self.checked_symbols_from_previous_deployment = True for symbol in self.undesired_symbols_from_previous_deployment[:]: if self.IsMarketOpen(symbol): self.Liquidate(symbol, tag="Holding from previous deployment that's no longer desired") self.undesired_symbols_from_previous_deployment.remove(symbol) class MySecurityInitializer(BrokerageModelSecurityInitializer): def __init__(self, brokerage_model: IBrokerageModel, security_seeder: ISecuritySeeder) -> None: super().__init__(brokerage_model, security_seeder) def Initialize(self, security: Security) -> None: # First, call the superclass definition # This method sets the reality models of each security using the default reality models of the brokerage model super().Initialize(security) # Next, overwrite some of the reality models security.SetFeeModel(ConstantFeeModel(0, "USD"))
#region imports from AlgorithmImports import * #endregion class BufferedPortfolioConstructionModel(EqualWeightingPortfolioConstructionModel): def __init__(self, rebalance, buffer_scaler): super().__init__(rebalance) self.buffer_scaler = buffer_scaler def CreateTargets(self, algorithm: QCAlgorithm, insights: List[Insight]) -> List[PortfolioTarget]: targets = super().CreateTargets(algorithm, insights) adj_targets = [] for insight in insights: future_contract = algorithm.Securities[insight.Symbol] optimal_position = future_contract.forecast * future_contract.position / 10 # Create buffer zone to reduce churn buffer_width = self.buffer_scaler * abs(future_contract.position) upper_buffer = round(optimal_position + buffer_width) lower_buffer = round(optimal_position - buffer_width) # Determine quantity to put holdings into buffer zone current_holdings = future_contract.Holdings.Quantity if lower_buffer <= current_holdings <= upper_buffer: continue quantity = lower_buffer if current_holdings < lower_buffer else upper_buffer # Place trades adj_targets.append(PortfolioTarget(insight.Symbol, quantity)) # Liquidate contracts that have an expired insight for target in targets: if target.Quantity == 0: adj_targets.append(target) return adj_targets
# region imports from AlgorithmImports import * from Selection.FutureUniverseSelectionModel import FutureUniverseSelectionModel from futures import categories # endregion class AdvancedFuturesUniverseSelectionModel(FutureUniverseSelectionModel): def __init__(self) -> None: super().__init__(timedelta(1), self.select_future_chain_symbols) self.symbols = list(categories.keys()) def select_future_chain_symbols(self, utc_time: datetime) -> List[Symbol]: return self.symbols def Filter(self, filter: FutureFilterUniverse) -> FutureFilterUniverse: return filter.Expiration(0, 365)
#region imports from AlgorithmImports import * #endregion def GetPositionSize(group): subcategories = {} for category, subcategory in group.values(): if category not in subcategories: subcategories[category] = {subcategory: 0} elif subcategory not in subcategories[category]: subcategories[category][subcategory] = 0 subcategories[category][subcategory] += 1 category_count = len(subcategories.keys()) subcategory_count = {category: len(subcategory.keys()) for category, subcategory in subcategories.items()} weights = {} for symbol in group: category, subcategory = group[symbol] weight = 1 / category_count / subcategory_count[category] / subcategories[category][subcategory] weights[symbol] = weight return weights