Overall Statistics |
Total Orders 6673 Average Win 0.20% Average Loss -0.16% Compounding Annual Return 19.956% Drawdown 20.000% Expectancy 0.242 Start Equity 1000000 End Equity 2335989.10 Net Profit 133.599% Sharpe Ratio 0.89 Sortino Ratio 1.012 Probabilistic Sharpe Ratio 50.563% Loss Rate 45% Win Rate 55% Profit-Loss Ratio 1.24 Alpha 0.08 Beta 0.434 Annual Standard Deviation 0.134 Annual Variance 0.018 Information Ratio 0.197 Tracking Error 0.149 Treynor Ratio 0.274 Total Fees $27051.17 Estimated Strategy Capacity $13000000.00 Lowest Capacity Asset CHTR UPXX4G43SIN9 Portfolio Turnover 11.43% |
# region imports from AlgorithmImports import * # endregion class EmpiricalCumulativeDensityFunction(PythonIndicator): def __init__(self, roc_period, lookback_period): self.name = f'ECDF_{roc_period}_{lookback_period}' self.warm_up_period = lookback_period self.time = datetime.min self.value = 0 self.current = IndicatorDataPoint(self.time, self.value) self.roc = RateOfChange(roc_period) self.roc.name = 'ECDF_ROC' self._returns = np.array([]) def update(self, t, price): self.time = t if not self.roc.update(t, price): self.is_ready2 = False return roc = self.roc.current.value if len(self._returns) < self.warm_up_period: self._returns = np.append(self._returns, roc) self.is_ready2 = False return if roc > 0: denominator = len(self._returns[self._returns >= 0]) self.value = len(self._returns[self._returns >= roc]) / denominator if denominator else 0 else: denominator = len(self._returns[self._returns <= 0]) self.value = len(self._returns[self._returns <= roc]) / denominator if denominator else 0 self._returns = np.append(self._returns, roc)[1:] self.is_ready2 = True self.current = IndicatorDataPoint(t, self.value)
# region imports from AlgorithmImports import * from collections import deque # endregion class HurstExponent(PythonIndicator): def __init__(self, period): self.name = 'hurst' self.warm_up_period = period self.time = datetime.min self.value = 0 self.current = IndicatorDataPoint(self.time, self.value) self._queue = deque(maxlen=period) def update(self, t, price): self._queue.append(price) if len(self._queue) < self.warm_up_period: self.is_ready2 = False return prices = np.array(self._queue) lags = range(2, 20) # Calculate differences for each lag. tau = [np.std(np.diff(prices, lag)) for lag in lags] # Perform regression to find Hurst exponent reg = np.polyfit(np.log(lags), np.log(tau), 1) self.value = reg[0] / 2 self.is_ready2 = True self.current = IndicatorDataPoint(t, self.value)
# region imports from AlgorithmImports import * from symbol_data import SymbolData from trade import Trade import xgboost as xgb from sklearn.preprocessing import StandardScaler # endregion # Sources: # - https://www.quantitativo.com/p/a-mean-reversion-strategy-from-first # - https://www.quantitativo.com/p/machine-learning-and-the-probability # - https://www.quantitativo.com/p/long-and-short-mean-reversion-machine class ProbabilityOfBouncingBackAlgorithm(QCAlgorithm): def initialize(self): self.set_start_date(2020, 1, 1) self.set_end_date(2024, 8, 29) self.set_cash(1_000_000) self.universe_settings.resolution = Resolution.HOUR self.add_universe(self._get_asset_prices) etf = Symbol.create('QQQ', SecurityType.EQUITY, Market.USA) self._universe = self.add_universe(self.universe.etf(etf, universe_filter_func=self._select_assets)) self._vix = self.add_data(CBOE, "VIX", Resolution.DAILY) self._vix.sma = self.sma(self._vix.symbol, 15) self._vix_sma_factor = self.get_parameter('vix_sma_factor', 0.15) self._long_exposure_by_regime = [0.1, 1.1] # bear, bull self.train(self.date_rules.year_start(), self.time_rules.midnight, self._train_model) self.schedule.on(self.date_rules.every_day(etf), self.time_rules.after_market_open(etf, 30), self._trade) self.set_warm_up(self.start_date - datetime(2015, 1, 1)) # Min start date for QQQ self._symbol_data_by_symbol = {} self._trade_by_symbol = {} self._sorted_by_probability = [] self._scaler = StandardScaler() self._max_universe_size = self.get_parameter('max_universe_size', 20) self._ecdf_threshold = self.get_parameter('ecdf_threshold', 0.15) self._probability_threshold = self.get_parameter('probability_threshold', 0.55) self._stop_loss_factor = self.get_parameter('stop_loss_factor', 0.95) self._max_hold_duration = timedelta(self.get_parameter('max_hold_duration', 6)) def _get_asset_prices(self, fundamentals): # Save the current price of the assets so we can update the # factors in _select_assets. self._fundamental_by_symbol = {f.symbol: f for f in fundamentals} return [] def _select_assets(self, constituents): # Create SymbolData objects for assets that just entered the ETF. etf_symbols = [c.symbol for c in constituents] new_symbols = [] for symbol in etf_symbols: if symbol not in self._symbol_data_by_symbol: new_symbols.append(symbol) self._symbol_data_by_symbol[symbol] = SymbolData(self._max_hold_duration.days - 1) # Warm up the factors for assets that just entered the ETF (or we # haven't seen yet). history = self.history[TradeBar](new_symbols, 2*252, Resolution.DAILY) history_length = len(list(history)) for i, bars in enumerate(history): for symbol, bar in bars.items(): self._symbol_data_by_symbol[symbol].update(bar.end_time, bar.close, bar.close * bar.volume, i == history_length-1) # Update the factors for the rest of the assets we're tracking. for symbol, symbol_data in self._symbol_data_by_symbol.items(): if symbol not in new_symbols and symbol in self._fundamental_by_symbol: f = self._fundamental_by_symbol[symbol] self._symbol_data_by_symbol[symbol].update(self.time, f.price, f.dollar_volume, symbol in etf_symbols) if self.is_warming_up: return [] # Select a subset of the current ETF constituents. probability_by_symbol = {} for c in constituents: # Filter 1: price >= $1 if c.symbol not in self._fundamental_by_symbol or not self._fundamental_by_symbol[c.symbol].price >= 1: continue symbol_data = self._symbol_data_by_symbol[c.symbol] # Filter 2: Factor values are ready. if not symbol_data.is_ready: continue # Filter 3: ROC(3) < 0 and 3-day ECDF < self._ecdf_threshold. if not (symbol_data.ecdf.roc.current.value < 0 and symbol_data.ecdf.value < self._ecdf_threshold): continue # Filter 4: P(bouncing back) > self._probability_threshold raw_factors = symbol_data.factor_history.iloc[-1].drop(['in_etf', 'ECDF_ROC']).values.reshape(1, -1) p = self._model.predict(xgb.DMatrix(self._scaler.transform(raw_factors)))[0] if p > self._probability_threshold: probability_by_symbol[c.symbol] = p self.plot('Universe', 'Size', len(probability_by_symbol)) # Return <=10 assets with the greatest P(bouncing back). self._sorted_by_probability = [symbol for symbol, _ in sorted(probability_by_symbol.items(), key=lambda x: x[1])[-self._max_universe_size:]] return self._sorted_by_probability def on_warmup_finished(self): self._train_model() def _train_model(self): if self.is_warming_up: return # Get training samples. factors = [] labels = [] for symbol, symbol_data in self._symbol_data_by_symbol.items(): if not symbol_data.is_ready: continue # Select samples that have `in_etf`, `ECDF_3_252` < self._ecdf_threshold, and ECDF_ROC < 0. factor_history = symbol_data.factor_history[ (symbol_data.factor_history['in_etf']) & (symbol_data.factor_history['ECDF_3_252'] < self._ecdf_threshold) & (symbol_data.factor_history['ECDF_ROC'] < 0) ].dropna().drop(['in_etf', 'ECDF_ROC'], axis=1) # Align this asset's factor and labels. label_history = symbol_data.label_history idx = sorted(list(set(label_history.index).intersection(set(factor_history.index)))) factor_history = factor_history.loc[idx] label_history = label_history.loc[idx] # Append this asset's factors and labels to the total set of # factors/labels. if not (factor_history.empty or label_history.empty): factors.extend(factor_history.values.tolist()) labels.extend(label_history.values.tolist()) factors = np.array(factors) labels = np.array(labels) # Apply pre-processing to the factors. factors = self._scaler.fit_transform(factors) # Train the model. self._model = xgb.train( {'objective': 'binary:logistic'}, xgb.DMatrix(factors, label=labels), num_boost_round=2 ) def _trade(self): # Manage existing trades. for symbol in list(self._trade_by_symbol.keys()): if self._trade_by_symbol[symbol].closed(self.time): self.liquidate(symbol) del self._trade_by_symbol[symbol] # Get the market regime (1 = bull; 0 = bear). regime = int(self._vix.price <= (1+self._vix_sma_factor) * self._vix.sma.current.value) weight = self._long_exposure_by_regime[regime] / self._max_universe_size # Open new trades if we haven't hit the maximum yet. for symbol in self._sorted_by_probability: if len(self._trade_by_symbol) >= self._max_universe_size or symbol in self._trade_by_symbol: continue quantity = self.calculate_order_quantity(symbol, weight) if quantity: self._trade_by_symbol[symbol] = Trade(self, symbol, quantity, self._stop_loss_factor, self._max_hold_duration)
# region imports from AlgorithmImports import * from ecdf import EmpiricalCumulativeDensityFunction from hurst import HurstExponent # endregion class SymbolData: def __init__(self, hold_duration): self._hold_duration = hold_duration # Define features. self.ecdf = EmpiricalCumulativeDensityFunction(3, 252) ecdf_indicators = [EmpiricalCumulativeDensityFunction(3, 21*i) for i in [3, 6, 9]] self.roc = RateOfChange(21) roc_indicators = [RateOfChange(21*i) for i in [1, 3, 6, 9, 12]] self.rsi = RelativeStrengthIndex(14) rsi_indicators = [RelativeStrengthIndex(21*i) for i in [3, 6, 9, 12]] self._sma = SimpleMovingAverage(200) self._price = Identity('price') self.sma_distance = IndicatorExtensions.over(self._sma, self._price) self.dollar_volume = Identity('dollar_volume') self.relative_dollar_volume = IndicatorExtensions.of(RateOfChange(21), self.dollar_volume) self.hurst_exponent = HurstExponent(252) self.factor_history = pd.DataFrame() self.factors_to_update = [self.ecdf, self.roc, self.rsi, self._sma, self._price, self.hurst_exponent] self.factors_to_update.extend(ecdf_indicators) self.factors_to_update.extend(roc_indicators) self.factors_to_update.extend(rsi_indicators) self.factors_to_record = [self.ecdf, self.ecdf.roc, self.roc, self.rsi, self.sma_distance, self.hurst_exponent, self.dollar_volume, self.relative_dollar_volume] self.factors_to_record.extend(ecdf_indicators) self.factors_to_record.extend(roc_indicators) self.factors_to_record.extend(rsi_indicators) self._prices = pd.Series() def update(self, t, price, dollar_volume, in_etf): self.factor_history.loc[t, 'in_etf'] = in_etf # Calculate the latest factor values give the new price. for factor in self.factors_to_update: factor.update(t, price) self.dollar_volume.update(t, dollar_volume) for factor in self.factors_to_record: if factor.is_ready or (hasattr(factor, 'is_ready2') and factor.is_ready2): self.factor_history.loc[t, factor.name] = factor.current.value # Calculate the latest label values give the new price. self._prices.loc[t] = price self.label_history = (self._prices.shift(-self._hold_duration) > self._prices).iloc[:-self._hold_duration].astype(int) @property def is_ready(self): return all([factor.is_ready or (hasattr(factor, 'is_ready2') and factor.is_ready2) for factor in self.factors_to_update])
# region imports from AlgorithmImports import * # endregion class Trade: def __init__(self, algorithm, symbol, quantity, stop_price_factor, max_hold_duration): # Enter trade. algorithm.market_order(symbol, quantity) # Set the stop loss order. self._ticket = algorithm.stop_market_order( symbol, -quantity, (1-stop_price_factor) * algorithm.securities[symbol].price ) # Set time-based exit. self._exit_time = algorithm.time + max_hold_duration def closed(self, time): return self._ticket.status == OrderStatus.FILLED or time >= self._exit_time