Overall Statistics |
Total Orders 1113 Average Win 0.78% Average Loss -0.40% Compounding Annual Return 16.240% Drawdown 16.700% Expectancy 0.246 Start Equity 1000000 End Equity 1570420.32 Net Profit 57.042% Sharpe Ratio 0.653 Sortino Ratio 0.677 Probabilistic Sharpe Ratio 27.105% Loss Rate 58% Win Rate 42% Profit-Loss Ratio 1.96 Alpha 0 Beta 0 Annual Standard Deviation 0.169 Annual Variance 0.028 Information Ratio 0.746 Tracking Error 0.169 Treynor Ratio 0 Total Fees $5701.79 Estimated Strategy Capacity $0 Lowest Capacity Asset GC Y9O6T2ED3VRX Portfolio Turnover 7.57% |
#region imports from AlgorithmImports import * from utils import GetPositionSize from futures import categories #endregion class CarryAndTrendAlphaModel(AlphaModel): futures = [] BUSINESS_DAYS_IN_YEAR = 256 TREND_FORECAST_SCALAR_BY_SPAN = {64: 1.91, 32: 2.79, 16: 4.1, 8: 5.95, 4: 8.53, 2: 12.1} # Table 29 on page 177 CARRY_FORECAST_SCALAR = 30 # Provided on p.216 FDM_BY_RULE_COUNT = { # Table 52 on page 234 1: 1.0, 2: 1.02, 3: 1.03, 4: 1.23, 5: 1.25, 6: 1.27, 7: 1.29, 8: 1.32, 9: 1.34, } def __init__(self, algorithm, emac_filters, abs_forecast_cap, sigma_span, target_risk, blend_years): self.algorithm = algorithm self.emac_spans = [2**x for x in range(4, emac_filters+1)] self.fast_ema_spans = self.emac_spans self.slow_ema_spans = [fast_span * 4 for fast_span in self.emac_spans] # "Any ratio between the two moving average lengths of two and six gives statistically indistinguishable results." (p.165) self.all_ema_spans = sorted(list(set(self.fast_ema_spans + self.slow_ema_spans))) self.carry_spans = [5, 20, 60, 120] self.annulaization_factor = self.BUSINESS_DAYS_IN_YEAR ** 0.5 self.abs_forecast_cap = abs_forecast_cap self.sigma_span = sigma_span self.target_risk = target_risk self.blend_years = blend_years self.idm = 1.5 # Instrument Diversification Multiplier. Hardcoded in https://gitfront.io/r/user-4000052/iTvUZwEUN2Ta/AFTS-CODE/blob/chapter8.py self.categories = categories self.total_lookback = timedelta(sigma_span*(7/5) + blend_years*365) self.day = -1 def update(self, algorithm: QCAlgorithm, data: Slice) -> List[Insight]: if data.quote_bars.count: for future in self.futures: future.latest_mapped = future.mapped # Rebalance daily if self.day == data.time.day or data.quote_bars.count == 0: return [] # Update annualized carry data for future in self.futures: # Get the near and far contracts contracts = self.get_near_and_further_contracts(algorithm.securities, future.mapped) if contracts is None: continue near_contract, further_contract = contracts[0], contracts[1] # Save near and further contract for later future.near_contract = near_contract future.further_contract = further_contract # Check if the daily consolidator has provided a bar for these contracts yet if not hasattr(near_contract, "raw_history") or not hasattr(further_contract, "raw_history") or near_contract.raw_history.empty or further_contract.raw_history.empty: continue # Update annualized raw carry history raw_carry = near_contract.raw_history.iloc[0] - further_contract.raw_history.iloc[0] months_between_contracts = round((further_contract.expiry - near_contract.expiry).days / 30) expiry_difference_in_years = abs(months_between_contracts) / 12 annualized_raw_carry = raw_carry / expiry_difference_in_years future.annualized_raw_carry_history.loc[near_contract.raw_history.index[0]] = annualized_raw_carry # If warming up and still > 7 days before start date, don't do anything # We use a 7-day buffer so that the algorithm has active insights when warm-up ends if algorithm.start_date - algorithm.time > timedelta(7): self.day = data.time.day return [] # Estimate the standard deviation of % daily returns for each future sigma_pcts_by_future = {} for future in self.futures: sigma_pcts = self.estimate_std_of_pct_returns(future.raw_history, future.adjusted_history) # Check if there is sufficient history if sigma_pcts is None: continue sigma_pcts_by_future[future] = sigma_pcts # Create insights insights = [] weight_by_symbol = GetPositionSize({future.symbol: self.categories[future.symbol].classification for future in sigma_pcts_by_future.keys()}) for symbol, instrument_weight in weight_by_symbol.items(): future = algorithm.securities[symbol] target_contract = [future.near_contract, future.further_contract][self.categories[future.symbol].contract_offset] sigma_pct = sigma_pcts_by_future[future] daily_risk_price_terms = sigma_pct / (self.annulaization_factor) * target_contract.price # "The price should be for the expiry date we currently hold (not the back-adjusted price)" (p.55) # Calculate target position position = (algorithm.portfolio.total_portfolio_value * self.idm * instrument_weight * self.target_risk) /(future.symbol_properties.contract_multiplier * daily_risk_price_terms * (self.annulaization_factor)) # Calculate forecast type 1: EMAC trend_forecasts = self.calculate_emac_forecasts(future.ewmac_by_span, daily_risk_price_terms) if not trend_forecasts: continue emac_combined_forecasts = sum(trend_forecasts) / len(trend_forecasts) # Aggregate EMAC factors -- equal-weight # Calculate factor type 2: Carry carry_forecasts = self.calculate_carry_forecasts(future.annualized_raw_carry_history, daily_risk_price_terms) if not carry_forecasts: continue carry_combined_forecasts = sum(carry_forecasts) / len(carry_forecasts) # Aggregate Carry factors -- equal-weight # Aggregate factors -- 60% for trend, 40% for carry raw_combined_forecast = 0.6 * emac_combined_forecasts + 0.4 * carry_combined_forecasts scaled_combined_forecast = raw_combined_forecast * self.FDM_BY_RULE_COUNT[len(trend_forecasts) + len(carry_forecasts)] # Apply a forecast diversification multiplier to keep the average forecast at 10 (p 193-194) capped_combined_forecast = max(min(scaled_combined_forecast, self.abs_forecast_cap), -self.abs_forecast_cap) if capped_combined_forecast * position == 0: continue target_contract.forecast = capped_combined_forecast target_contract.position = position local_time = Extensions.convert_to(algorithm.time, algorithm.time_zone, future.exchange.time_zone) expiry = future.exchange.hours.get_next_market_open(local_time, False) - timedelta(seconds=1) insights.append(Insight.price(target_contract.symbol, expiry, InsightDirection.UP if capped_combined_forecast * position > 0 else InsightDirection.DOWN)) if insights: self.day = data.time.day return insights def align_history(self, a, b): idx = sorted(list(set(a.index).intersection(set(b.index)))) return a.loc[idx], b.loc[idx] def calculate_emac_forecasts(self, ewmac_by_span, daily_risk_price_terms): forecasts = [] for span in self.emac_spans: risk_adjusted_ewmac = ewmac_by_span[span].current.value / daily_risk_price_terms scaled_forecast_for_ewmac = risk_adjusted_ewmac * self.TREND_FORECAST_SCALAR_BY_SPAN[span] capped_forecast_for_ewmac = max(min(scaled_forecast_for_ewmac, self.abs_forecast_cap), -self.abs_forecast_cap) forecasts.append(capped_forecast_for_ewmac) return forecasts def calculate_carry_forecasts(self, annualized_raw_carry, daily_risk_price_terms): carry_forecast = annualized_raw_carry / daily_risk_price_terms forecasts = [] for span in self.carry_spans: ## Smooth out carry forecast smoothed_carry_forecast = carry_forecast.ewm(span=span, min_periods=span).mean().dropna() if smoothed_carry_forecast.empty: continue smoothed_carry_forecast = smoothed_carry_forecast.iloc[-1] ## Apply forecast scalar (p. 264) scaled_carry_forecast = smoothed_carry_forecast * self.CARRY_FORECAST_SCALAR ## Cap forecast capped_carry_forecast = max(min(scaled_carry_forecast, self.abs_forecast_cap), -self.abs_forecast_cap) forecasts.append(capped_carry_forecast) return forecasts def get_near_and_further_contracts(self, securities, mapped_symbol): ## Gather and align history of near/further contracts contracts_sorted_by_expiry = sorted( [ kvp.Value for kvp in securities if not kvp.key.is_canonical() and kvp.key.canonical == mapped_symbol.canonical and kvp.Value.Expiry >= securities[mapped_symbol].Expiry ], key=lambda contract: contract.expiry ) if len(contracts_sorted_by_expiry) < 2: return None near_contract = contracts_sorted_by_expiry[0] further_contract = contracts_sorted_by_expiry[1] return near_contract, further_contract def estimate_std_of_pct_returns(self, raw_history, adjusted_history): # Align history of raw and adjusted prices raw_history_aligned, adjusted_history_aligned = self.align_history(raw_history, adjusted_history) # Calculate exponentially weighted standard deviation of returns returns = adjusted_history_aligned.diff().dropna() / raw_history_aligned.shift(1).dropna() rolling_ewmstd_pct_returns = returns.ewm(span=self.sigma_span, min_periods=self.sigma_span).std().dropna() if rolling_ewmstd_pct_returns.empty: # Not enough history return None # Annualize sigma estimate annulized_rolling_ewmstd_pct_returns = rolling_ewmstd_pct_returns * (self.annulaization_factor) # Blend the sigma estimate (p.80) blended_estimate = 0.3*annulized_rolling_ewmstd_pct_returns.mean() + 0.7*annulized_rolling_ewmstd_pct_returns.iloc[-1] return blended_estimate def consolidation_handler(self, sender: object, consolidated_bar: TradeBar) -> None: security = self.algorithm.securities[consolidated_bar.symbol] end_date = consolidated_bar.end_time.date() if security.symbol.is_canonical(): # Update adjusted history security.adjusted_history.loc[end_date] = consolidated_bar.close security.adjusted_history = security.adjusted_history[security.adjusted_history.index >= end_date - self.total_lookback] else: # Update raw history continuous_contract = self.algorithm.securities[security.symbol.canonical] if hasattr(continuous_contract, "latest_mapped") and consolidated_bar.symbol == continuous_contract.latest_mapped: continuous_contract.raw_history.loc[end_date] = consolidated_bar.close continuous_contract.raw_history = continuous_contract.raw_history[continuous_contract.raw_history.index >= end_date - self.total_lookback] # Update raw carry history security.raw_history.loc[end_date] = consolidated_bar.close security.raw_history = security.raw_history.iloc[-1:] def on_securities_changed(self, algorithm: QCAlgorithm, changes: SecurityChanges) -> None: for security in changes.added_securities: symbol = security.symbol # Create a consolidator to update the history security.consolidator = TradeBarConsolidator(timedelta(1)) security.consolidator.data_consolidated += self.consolidation_handler algorithm.subscription_manager.add_consolidator(symbol, security.consolidator) # Get raw and adjusted history security.raw_history = pd.Series() if symbol.is_canonical(): security.adjusted_history = pd.Series() security.annualized_raw_carry_history = pd.Series() # Create indicators for the continuous contract ema_by_span = {span: algorithm.EMA(symbol, span, Resolution.DAILY) for span in self.all_ema_spans} security.ewmac_by_span = {} for i, fast_span in enumerate(self.emac_spans): security.ewmac_by_span[fast_span] = IndicatorExtensions.minus(ema_by_span[fast_span], ema_by_span[self.slow_ema_spans[i]]) security.automatic_indicators = ema_by_span.values() self.futures.append(security) for security in changes.removed_securities: # Remove consolidator + indicators algorithm.subscription_manager.remove_consolidator(security.symbol, security.consolidator) if security.symbol.is_canonical(): for indicator in security.automatic_indicators: algorithm.deregister_indicator(indicator)
# region imports from AlgorithmImports import * # endregion class FutureData: def __init__(self, classification, contract_offset): self.classification = classification self.contract_offset = contract_offset categories = { pair[0]: FutureData(pair[1], pair[2]) for pair in [ (Symbol.create(Futures.Indices.SP_500_E_MINI, SecurityType.FUTURE, Market.CME), ("Equity", "US"), 0), (Symbol.create(Futures.Indices.NASDAQ_100_E_MINI, SecurityType.FUTURE, Market.CME), ("Equity", "US"), 0), (Symbol.create(Futures.Indices.RUSSELL_2000_E_MINI, SecurityType.FUTURE, Market.CME), ("Equity", "US"), 0), (Symbol.create(Futures.Indices.VIX, SecurityType.FUTURE, Market.CFE), ("Volatility", "US"), 0), (Symbol.create(Futures.Energies.NATURAL_GAS, SecurityType.FUTURE, Market.NYMEX), ("Energies", "Gas"), 1), (Symbol.create(Futures.Energies.CRUDE_OIL_WTI, SecurityType.FUTURE, Market.NYMEX), ("Energies", "Oil"), 0), (Symbol.create(Futures.Grains.CORN, SecurityType.FUTURE, Market.CBOT), ("Agricultural", "Grain"), 0), (Symbol.create(Futures.Metals.COPPER, SecurityType.FUTURE, Market.COMEX), ("Metals", "Industrial"), 0), (Symbol.create(Futures.Metals.GOLD, SecurityType.FUTURE, Market.COMEX), ("Metals", "Precious"), 0), (Symbol.create(Futures.Metals.SILVER, SecurityType.FUTURE, Market.COMEX), ("Metals", "Precious"), 0) ] }
# region imports from AlgorithmImports import * #from futures import future_datas from universe import AdvancedFuturesUniverseSelectionModel from alpha import CarryAndTrendAlphaModel from portfolio import BufferedPortfolioConstructionModel from utils import GetPositionSize # endregion class FuturesCombinedCarryAndTrendAlgorithm(QCAlgorithm): undesired_symbols_from_previous_deployment = [] checked_symbols_from_previous_deployment = False def initialize(self): self.set_start_date(2020, 7, 1) self.set_end_date(2023, 7, 1) self.set_cash(1_000_000) self.set_brokerage_model(BrokerageName.INTERACTIVE_BROKERS_BROKERAGE, AccountType.MARGIN) self.set_security_initializer(BrokerageModelSecurityInitializer(self.brokerage_model, FuncSecuritySeeder(self.get_last_known_prices))) self.settings.minimum_order_margin_portfolio_percentage = 0 self.universe_settings.data_normalization_mode = DataNormalizationMode.BACKWARDS_PANAMA_CANAL self.universe_settings.data_mapping_mode = DataMappingMode.LAST_TRADING_DAY self.add_universe_selection(AdvancedFuturesUniverseSelectionModel()) self.add_alpha(CarryAndTrendAlphaModel( self, self.get_parameter("emac_filters", 6), self.get_parameter("abs_forecast_cap", 20), # Hardcoded on p.173 self.get_parameter("sigma_span", 32), self.get_parameter("target_risk", 0.2), # Recommend value is 0.2 on p.75 self.get_parameter("blend_years", 3) # Number of years to use when blending sigma estimates )) self.settings.rebalance_portfolio_on_security_changes = False self.settings.rebalance_portfolio_on_insight_changes = False self.total_count = 0 self.day = -1 self.set_portfolio_construction(BufferedPortfolioConstructionModel( self.rebalance_func, self.get_parameter("buffer_scaler", 0.1) # Hardcoded on p.167 & p.173 )) self.add_risk_management(NullRiskManagementModel()) self.set_execution(ImmediateExecutionModel()) # We need several years of data to warm-up. Data before 2014 can have issues. self.set_warm_up(self.start_date - datetime(2014, 1, 1)) def rebalance_func(self, time): if (self.total_count != self.insights.total_count or self.day != self.time.day) and not self.is_warming_up and self.current_slice.quote_bars.count > 0: self.total_count = self.insights.total_count self.day = self.time.day return time return None def on_data(self, data): # Exit positions that aren't backed by existing insights. # If you don't want this behavior, delete this method definition. if not self.is_warming_up and not self.checked_symbols_from_previous_deployment: for security_holding in self.portfolio.Values: if not security_holding.invested: continue symbol = security_holding.symbol if not self.insights.has_active_insights(symbol, self.utc_time): self.undesired_symbols_from_previous_deployment.append(symbol) self.checked_symbols_from_previous_deployment = True for symbol in self.undesired_symbols_from_previous_deployment[:]: if self.is_market_open(symbol): self.liquidate(symbol, tag="Holding from previous deployment that's no longer desired") self.undesired_symbols_from_previous_deployment.remove(symbol)
#region imports from AlgorithmImports import * #endregion class BufferedPortfolioConstructionModel(EqualWeightingPortfolioConstructionModel): def __init__(self, rebalance, buffer_scaler): super().__init__(rebalance) self.buffer_scaler = buffer_scaler def create_targets(self, algorithm: QCAlgorithm, insights: List[Insight]) -> List[PortfolioTarget]: targets = super().create_targets(algorithm, insights) adj_targets = [] for insight in insights: future_contract = algorithm.securities[insight.symbol] optimal_position = future_contract.forecast * future_contract.position / 10 # Create buffer zone to reduce churn buffer_width = self.buffer_scaler * abs(future_contract.position) upper_buffer = round(optimal_position + buffer_width) lower_buffer = round(optimal_position - buffer_width) # Determine quantity to put holdings into buffer zone current_holdings = future_contract.holdings.quantity if lower_buffer <= current_holdings <= upper_buffer: continue quantity = lower_buffer if current_holdings < lower_buffer else upper_buffer # Place trades adj_targets.append(PortfolioTarget(insight.symbol, quantity)) # Liquidate contracts that have an expired insight for target in targets: if target.quantity == 0: adj_targets.append(target) return adj_targets
# region imports from AlgorithmImports import * from Selection.FutureUniverseSelectionModel import FutureUniverseSelectionModel from futures import categories # endregion class AdvancedFuturesUniverseSelectionModel(FutureUniverseSelectionModel): def __init__(self) -> None: super().__init__(timedelta(1), self.select_future_chain_symbols) self.symbols = list(categories.keys()) def select_future_chain_symbols(self, utc_time: datetime) -> List[Symbol]: return self.symbols def filter(self, filter: FutureFilterUniverse) -> FutureFilterUniverse: return filter.expiration(0, 365)
#region imports from AlgorithmImports import * #endregion def GetPositionSize(group): subcategories = {} for category, subcategory in group.values(): if category not in subcategories: subcategories[category] = {subcategory: 0} elif subcategory not in subcategories[category]: subcategories[category][subcategory] = 0 subcategories[category][subcategory] += 1 category_count = len(subcategories.keys()) subcategory_count = {category: len(subcategory.keys()) for category, subcategory in subcategories.items()} weights = {} for symbol in group: category, subcategory = group[symbol] weight = 1 / category_count / subcategory_count[category] / subcategories[category][subcategory] weights[symbol] = weight return weights