Overall Statistics |
Total Orders 1498 Average Win 0.06% Average Loss -0.01% Compounding Annual Return 22.663% Drawdown 19.300% Expectancy 5.688 Start Equity 100000 End Equity 184631.42 Net Profit 84.631% Sharpe Ratio 0.948 Sortino Ratio 0.974 Probabilistic Sharpe Ratio 49.514% Loss Rate 18% Win Rate 82% Profit-Loss Ratio 7.14 Alpha 0.087 Beta 0.614 Annual Standard Deviation 0.154 Annual Variance 0.024 Information Ratio 0.406 Tracking Error 0.124 Treynor Ratio 0.238 Total Fees $1498.00 Estimated Strategy Capacity $15000000.00 Lowest Capacity Asset AAPL R735QTJ8XC9X Portfolio Turnover 0.08% |
#region imports from AlgorithmImports import * from datetime import timedelta from utils import get_position_size from futures import categories #endregion import pandas as pd class FastTrendFollowingLongAndShortWithTrendStrenthAlphaModel(AlphaModel): _futures = [] _BUSINESS_DAYS_IN_YEAR = 256 _FORECAST_SCALAR_BY_SPAN = {64: 1.91, 32: 2.79, 16: 4.1, 8: 5.95, 4: 8.53, 2: 12.1} # Given by author on https://gitfront.io/r/user-4000052/iTvUZwEUN2Ta/AFTS-CODE/blob/chapter7.py def __init__(self, algorithm, slow_ema_span, abs_forecast_cap, sigma_span, target_risk, blend_years): self._algorithm = algorithm self._slow_ema_span = slow_ema_span self._fast_ema_span = int(self._slow_ema_span / 4) # "Any ratio between the two moving average lengths of two and six gives statistically indistinguishable results." (p.165) self._annulaization_factor = self._BUSINESS_DAYS_IN_YEAR ** 0.5 self._abs_forecast_cap = abs_forecast_cap self._sigma_span = sigma_span self._target_risk = target_risk self._blend_years = blend_years self._idm = 1.5 # Instrument Diversification Multiplier. Hardcoded in https://gitfront.io/r/user-4000052/iTvUZwEUN2Ta/AFTS-CODE/blob/chapter8.py self._forecast_scalar = self._FORECAST_SCALAR_BY_SPAN[self._fast_ema_span] self._categories = categories self._total_lookback = timedelta(365*self._blend_years+self._slow_ema_span) self._day = -1 def update(self, algorithm: QCAlgorithm, data: Slice) -> List[Insight]: # Record the new contract in the continuous series if data.quote_bars.count: for future in self._futures: future.latest_mapped = future.mapped # If warming up and still > 7 days before start date, don't do anything # We use a 7-day buffer so that the algorithm has active insights when warm-up ends if algorithm.start_date - algorithm.time > timedelta(7): return [] if self._day == data.time.day or data.bars.count == 0: return [] # Estimate the standard deviation of % daily returns for each future sigma_pct_by_future = {} for future in self._futures: # Estimate the standard deviation of % daily returns sigma_pct = self._estimate_std_of_pct_returns(future.raw_history, future.adjusted_history) if sigma_pct is None: continue sigma_pct_by_future[future] = sigma_pct # Create insights insights = [] weight_by_symbol = get_position_size({future.symbol: self._categories[future.symbol] for future in sigma_pct_by_future.keys()}) for symbol, instrument_weight in weight_by_symbol.items(): future = algorithm.securities[symbol] current_contract = algorithm.securities[future.mapped] daily_risk_price_terms = sigma_pct_by_future[future] / (self._annulaization_factor) * current_contract.price # "The price should be for the expiry date we currently hold (not the back-adjusted price)" (p.55) # Calculate target position position = (algorithm.portfolio.total_portfolio_value * self._idm * instrument_weight * self._target_risk) /(future.symbol_properties.contract_multiplier * daily_risk_price_terms * (self._annulaization_factor)) # Adjust target position based on forecast risk_adjusted_ewmac = future.ewmac.current.value / daily_risk_price_terms scaled_forecast_for_ewmac = risk_adjusted_ewmac * self._forecast_scalar forecast = max(min(scaled_forecast_for_ewmac, self._abs_forecast_cap), -self._abs_forecast_cap) if forecast * position == 0: continue # Save some data for the PCM current_contract.forecast = forecast current_contract.position = position # Create the insights local_time = Extensions.convert_to(algorithm.time, algorithm.time_zone, future.exchange.time_zone) expiry = future.exchange.hours.get_next_market_open(local_time, False) - timedelta(seconds=1) insights.append(Insight.price(future.mapped, expiry, InsightDirection.UP if forecast * position > 0 else InsightDirection.DOWN)) if insights: self._day = data.time.day return insights def _estimate_std_of_pct_returns(self, raw_history, adjusted_history): # Align history of raw and adjusted prices idx = sorted(list(set(adjusted_history.index).intersection(set(raw_history.index)))) adjusted_history_aligned = adjusted_history.loc[idx] raw_history_aligned = raw_history.loc[idx] # Calculate exponentially weighted standard deviation of returns returns = adjusted_history_aligned.diff().dropna() / raw_history_aligned.shift(1).dropna() rolling_ewmstd_pct_returns = returns.ewm(span=self._sigma_span, min_periods=self._sigma_span).std().dropna() if rolling_ewmstd_pct_returns.empty: # Not enough history return None # Annualize sigma estimate annulized_rolling_ewmstd_pct_returns = rolling_ewmstd_pct_returns * (self._annulaization_factor) # Blend the sigma estimate (p.80) blended_estimate = 0.3*annulized_rolling_ewmstd_pct_returns.mean() + 0.7*annulized_rolling_ewmstd_pct_returns.iloc[-1] return blended_estimate def _consolidation_handler(self, sender: object, consolidated_bar: TradeBar) -> None: security = self._algorithm.securities[consolidated_bar.symbol] end_date = consolidated_bar.end_time.date() if security.symbol.is_canonical(): # Update adjusted history security.adjusted_history.loc[end_date] = consolidated_bar.close security.adjusted_history = security.adjusted_history[security.adjusted_history.index >= end_date - self._total_lookback] else: # Update raw history continuous_contract = self._algorithm.securities[security.symbol.canonical] if consolidated_bar.symbol == continuous_contract.latest_mapped: continuous_contract.raw_history.loc[end_date] = consolidated_bar.close continuous_contract.raw_history = continuous_contract.raw_history[continuous_contract.raw_history.index >= end_date - self._total_lookback] def on_securities_changed(self, algorithm: QCAlgorithm, changes: SecurityChanges) -> None: for security in changes.added_securities: symbol = security.symbol # Create a consolidator to update the history security.consolidator = TradeBarConsolidator(timedelta(1)) security.consolidator.data_consolidated += self._consolidation_handler algorithm.subscription_manager.add_consolidator(symbol, security.consolidator) if security.symbol.is_canonical(): # Add some members to track price history security.adjusted_history = pd.Series() security.raw_history = pd.Series() # Create indicators for the continuous contract security.fast_ema = algorithm.EMA(security.symbol, self._fast_ema_span, Resolution.DAILY) security.slow_ema = algorithm.EMA(security.symbol, self._slow_ema_span, Resolution.DAILY) security.ewmac = IndicatorExtensions.minus(security.fast_ema, security.slow_ema) security.automatic_indicators = [security.fast_ema, security.slow_ema] self._futures.append(security) for security in changes.removed_securities: # Remove consolidator + indicators algorithm.subscription_manager.remove_consolidator(security.symbol, security.consolidator) if security.symbol.is_canonical(): for indicator in security.automatic_indicators: algorithm.deregister_indicator(indicator)
# region imports from AlgorithmImports import * # endregion categories = { Symbol.create(Futures.Financials.Y_10_TREASURY_NOTE, SecurityType.FUTURE, Market.CBOT): ("Fixed Income", "Bonds"), Symbol.create(Futures.Indices.SP_500_E_MINI, SecurityType.FUTURE, Market.CME): ("Equity", "US") }
# region imports from datetime import timedelta from AlgorithmImports import * from QuantConnect.DataSource import * import math #from futures import future_datas from universe import AdvancedFuturesUniverseSelectionModel from alpha import FastTrendFollowingLongAndShortWithTrendStrenthAlphaModel from portfolio import BufferedPortfolioConstructionModel # endregion from QuantConnect.DataSource import * class USEquityDataAlgorithm(QCAlgorithm): position: int = 0 # Current position, in contract units buy_qty: int = 0 # Number of long contract sides traded sell_qty: int = 0 # Number of short contract sides traded real_total_buy_px: float = 0 ## Total realized buy price real_total_sell_px: float = 0 ## Total realized sell price theo_total_buy_px: float = 0 # Total buy price to liquidate current position theo_total_sell_px: float = 0 # Total sell price to liquidate current position POS_ALPHA_THRESHOLD = 1 NEG_ALPHA_THRESHOLD = POS_ALPHA_THRESHOLD * 2 POSITION_MAX = 1000 def initialize(self) -> None: self.set_start_date(2018, 1, 1) self.set_end_date(2021, 1, 1) self.set_cash(100000) self.universe_settings.resolution = Resolution.TICK symbols = [Symbol.create("AAPL", SecurityType.EQUITY, Market.USA)] self.add_universe_selection(ManualUniverseSelectionModel(symbols)) self.aapl = self.add_equity("AAPL", Resolution.MINUTE).symbol def on_data(self, slice: Slice) -> None: if self.aapl in slice.quote_bars: tick = slice.quote_bars[self.aapl] ask_size = tick.last_ask_size bid_size = tick.last_bid_size ask_price = tick.ask.close bid_price = tick.bid.close try: skew = math.log10(bid_size) - math.log10(ask_size) mid_price = (ask_price + bid_price) / 2 except: return # Buy/sell based when skew signal is large if skew > self.POS_ALPHA_THRESHOLD and self.position < self.POSITION_MAX: pos = round(1 * skew) self.position += pos self.market_order(self.aapl, pos) elif skew < -self.NEG_ALPHA_THRESHOLD and self.position > -self.POSITION_MAX: pos = round(1 * skew) self.position += pos self.market_order(self.aapl, pos) def BookPressure(self): pass
#region imports from AlgorithmImports import * #endregion class BufferedPortfolioConstructionModel(EqualWeightingPortfolioConstructionModel): def __init__(self, rebalance, buffer_scaler): super().__init__(rebalance) self._buffer_scaler = buffer_scaler def create_targets(self, algorithm: QCAlgorithm, insights: List[Insight]) -> List[PortfolioTarget]: targets = super().create_targets(algorithm, insights) adj_targets = [] for insight in insights: future_contract = algorithm.securities[insight.symbol] optimal_position = future_contract.forecast * future_contract.position / 10 ## Create buffer zone to reduce churn buffer_width = self._buffer_scaler * abs(future_contract.position) upper_buffer = round(optimal_position + buffer_width) lower_buffer = round(optimal_position - buffer_width) # Determine quantity to put holdings into buffer zone current_holdings = future_contract.holdings.quantity if lower_buffer <= current_holdings <= upper_buffer: continue quantity = lower_buffer if current_holdings < lower_buffer else upper_buffer # Place trades adj_targets.append(PortfolioTarget(insight.symbol, quantity)) # Liquidate contracts that have an expired insight for target in targets: if target.quantity == 0: adj_targets.append(target) return adj_targets
#region imports from AlgorithmImports import * #endregion # 08/29/2023: -Adjusted insight expiry so all insights end at the same time each day # https://www.quantconnect.com/terminal/processCache?request=embedded_backtest_e1c8af207b1a4da945a4696f7db3ef9a.html # # 08/31/2023: -Adjusted universe filter to ensure the Mapped contract is always in the universe # -Updated the Alpha model to rely on warm-up rather than history requests # -Reduced the `blend_years` parameter to 3 to avoid any data issues from far in the past # https://www.quantconnect.com/terminal/processCache?request=embedded_backtest_ecb85ecf7a6ea332088f4b369017fa09.html # # 04/15/2024: -Updated to PEP8 style # https://www.quantconnect.com/terminal/processCache?request=embedded_backtest_f8e01739e5624ee03aa3a6e2ac5c5108.html
# region imports from AlgorithmImports import * from pandas import Timedelta as timedelta from datetime import datetime from Selection.FutureUniverseSelectionModel import FutureUniverseSelectionModel from futures import categories # endregion class AdvancedFuturesUniverseSelectionModel(FutureUniverseSelectionModel): def __init__(self) -> None: super().__init__(timedelta(1), self.select_future_chain_symbols) self.symbols = list(categories.keys()) def select_future_chain_symbols(self, utc_time: datetime) -> List[Symbol]: return self.symbols def filter(self, filter: FutureFilterUniverse) -> FutureFilterUniverse: return filter.expiration(0, 365)
#region imports from AlgorithmImports import * #endregion def get_position_size(group): subcategories = {} for category, subcategory in group.values(): if category not in subcategories: subcategories[category] = {subcategory: 0} elif subcategory not in subcategories[category]: subcategories[category][subcategory] = 0 subcategories[category][subcategory] += 1 category_count = len(subcategories.keys()) subcategory_count = {category: len(subcategory.keys()) for category, subcategory in subcategories.items()} weights = {} for symbol in group: category, subcategory = group[symbol] weight = 1 / category_count / subcategory_count[category] / subcategories[category][subcategory] weights[symbol] = weight return weights