Overall Statistics |
Total Orders 2905 Average Win 0.06% Average Loss -0.05% Compounding Annual Return 0.364% Drawdown 5.900% Expectancy 0.010 Start Equity 100000 End Equity 100516.13 Net Profit 0.516% Sharpe Ratio -0.658 Sortino Ratio -0.829 Probabilistic Sharpe Ratio 9.569% Loss Rate 53% Win Rate 47% Profit-Loss Ratio 1.13 Alpha -0.031 Beta -0.113 Annual Standard Deviation 0.037 Annual Variance 0.001 Information Ratio 0.167 Tracking Error 0.205 Treynor Ratio 0.216 Total Fees $2913.05 Estimated Strategy Capacity $25000000.00 Lowest Capacity Asset ALL R735QTJ8XC9X Portfolio Turnover 10.51% |
from AlgorithmImports import * from technical_pattern import TechnicalPattern from scipy.stats import norm, uniform class HeadAndShouldersPattern(TechnicalPattern): def __init__(self, sequence, window_size, max_lookback, step_size): self.sequence = np.array(sequence) self.window_size = window_size self.max_lookback = max_lookback self.step_size = step_size # Create pattern references if not hasattr(HeadAndShouldersPattern, "ref"): np.random.seed(1) ref_count = 100 v1 = np.array([0] * ref_count) + 0.02 * norm.rvs(size=(ref_count, )) p1 = np.array([1] * ref_count) + 0.2 * norm.rvs(size=(ref_count, )) v2 = v1 + 0.2 * norm.rvs(size=(ref_count, )) v3 = v1 + 0.2 * norm.rvs(size=(ref_count, )) p3 = p1 + 0.02 * norm.rvs(size=(ref_count, )) p2 = 1.5 * np.maximum(p1, p3) + abs(uniform.rvs(size=(ref_count, ))) v4 = v1 + 0.02 * norm.rvs(size=(ref_count, )) ref = np.array([ v1, (v1*.75+p1*.25) + 0.2 * norm.rvs(size=(ref_count, )), (v1+p1)/2 + 0.2 * norm.rvs(size=(ref_count, )), (v1*.25+p1*.75) + 0.2 * norm.rvs(size=(ref_count, )), p1, (v2*.25+p1*.75) + 0.2 * norm.rvs(size=(ref_count, )), (v2+p1)/2 + 0.2 * norm.rvs(size=(ref_count, )), (v2*.75+p1*.25) + 0.2 * norm.rvs(size=(ref_count, )), v2, (v2*.75+p2*.25) + 0.2 * norm.rvs(size=(ref_count, )), (v2+p2)/2 + 0.2 * norm.rvs(size=(ref_count, )), (v2*.25+p2*.75) + 0.2 * norm.rvs(size=(ref_count, )), p2, (v3*.25+p2*.75) + 0.2 * norm.rvs(size=(ref_count, )), (v3+p2)/2 + 0.2 * norm.rvs(size=(ref_count, )), (v3*.75+p2*.25) + 0.2 * norm.rvs(size=(ref_count, )), v3, (v3*.75+p3*.25) + 0.2 * norm.rvs(size=(ref_count, )), (v3+p3)/2 + 0.2 * norm.rvs(size=(ref_count, )), (v3*.25+p3*.75) + 0.2 * norm.rvs(size=(ref_count, )), p3, (v4*.25+p3*.75) + 0.2 * norm.rvs(size=(ref_count, )), (v4+p3)/2 + 0.2 * norm.rvs(size=(ref_count, )), (v4*.75+p3*.25) + 0.2 * norm.rvs(size=(ref_count, )), v4 ]) HeadAndShouldersPattern.ref = ((ref - ref.mean(axis=1, keepdims=True)) / ref.std(axis=1, keepdims=True)).T # Warm up the factor values self.rows = HeadAndShouldersPattern.ref.shape[0] self.scan() def update(self, price): # Update the trailing window self.sequence = np.append(self.sequence, price)[-self.max_lookback:] # Update the factor values self.scan() def scan(self): self.corr = 0 self.similarity_score = 0 # Select varying lengths of trailing windows for i in range(self.window_size, self.max_lookback, self.step_size): # Check if enough history to fill the trailing window if len(self.sequence[-i:]) != i: break # Select the trailing data and downsample it to 25 data points sub_sequence = self.downsample(self.sequence[-i:], self.window_size) # Normalize the data in the trailing window sub_sequence_std = np.std(sub_sequence) if sub_sequence_std == 0: continue norm_sub_sequence = (sub_sequence - np.mean(sub_sequence)) / sub_sequence_std # Evaluate the pattern presence # Calculate correlation and similarity scores for each reference pattern corr_scores = np.ndarray(shape=(self.rows)) similarity_scores = np.ndarray(shape=(self.rows)) for j in range(self.rows): score, similarity = self.matching(norm_sub_sequence, HeadAndShouldersPattern.ref[j, :]) corr_scores[j] = score similarity_scores[j] = similarity # Aggregate the results to produce a single value for each factor self.corr = max(self.corr, np.mean(corr_scores)) self.similarity_score = max(self.similarity_score, np.mean(similarity_scores)) def downsample(self, data, target_length): factor = len(data) // target_length return [np.mean(data[i*factor:(i+1)*factor]) for i in range(target_length)] def matching(self, sub_sequence, ref_pattern): correlation = np.corrcoef(sub_sequence, ref_pattern)[0, 1] similarity = np.sum((sub_sequence - ref_pattern) ** 2) return correlation, similarity
# region imports from AlgorithmImports import * from head_and_shoulders import HeadAndShouldersPattern # endregion class HeadAndShouldersPatternDetectionAlgorithm(QCAlgorithm): trades = [] new_symbols = [] pattern_by_symbol = {} def initialize(self): self.set_start_date(2022, 1, 1) self.set_end_date(2023, 6, 1) self.set_cash(100000) # Pattern detection settings self.window_size = self.get_parameter("window_size", 25) self.max_lookback = self.get_parameter("max_lookback", 50) self.step_size = self.get_parameter("step_size", 10) # lookback jumps from window_size -> window_size+step_size -> window_size+2*step_size ... -> max_lookback # Universe settings self.coarse_size = self.get_parameter("coarse_size", 500) self.universe_size = self.get_parameter("universe_size", 5) # Trade settings self.hold_duration = timedelta(self.get_parameter("hold_duration", 3)) # Define universe self.universe_settings.resolution = Resolution.DAILY self.add_universe(self.coarse_filter_function) def coarse_filter_function(self, coarse: List[CoarseFundamental]) -> List[Symbol]: coarse = sorted(coarse, key=lambda x: x.dollar_volume, reverse=True)[:self.coarse_size] # Create pattern detection objects for new securities coarse_symbols = [c.symbol for c in coarse] new_symbols = [symbol for symbol in coarse_symbols if symbol not in self.pattern_by_symbol] if new_symbols: history = self.history(new_symbols, self.max_lookback, Resolution.DAILY) for symbol in new_symbols: self.pattern_by_symbol[symbol] = HeadAndShouldersPattern( history.loc[symbol]['close'].values if symbol in history.index else np.array([]), self.window_size, self.max_lookback, self.step_size ) # Remove pattern detection objects for delisted securities delisted_symbols = [symbol for symbol in self.pattern_by_symbol.keys() if symbol not in coarse_symbols] for symbol in delisted_symbols: self.pattern_by_symbol.pop(symbol) # Scan for new patterns for c in coarse: if c.symbol not in new_symbols: self.pattern_by_symbol[c.symbol].update(c.adjusted_price) # Select symbols with high correlation and low DTW distance to the reference patterns # Step 1: Sort symbols by correlation in descending order reverse_sorted_by_corr = [symbol for symbol, _ in sorted(self.pattern_by_symbol.items(), key=lambda x: x[1].corr, reverse=True)] # Step 2: Sort symbols by DTW distance in ascending order sorted_by_dtw = [symbol for symbol, _ in sorted(self.pattern_by_symbol.items(), key=lambda x: x[1].similarity_score, reverse=False)] # Step 3: Add the ranks of each factor together rank_by_symbol = {symbol: reverse_sorted_by_corr.index(symbol)+sorted_by_dtw.index(symbol) for symbol in self.pattern_by_symbol.keys()} # Step 4: Select the symbols with the best combined rank across both factors return [ symbol for symbol, _ in sorted(rank_by_symbol.items(), key=lambda x: x[1])[:self.universe_size] ] def on_data(self, data: Slice): # Short every stock when it first enters the universe (when we first detect the pattern) for symbol in self.new_symbols: self.trades.append(Trade(self, symbol, self.hold_duration)) self.new_symbols = [] # Scan for exits closed_trades = [] for i, trade in enumerate(self.trades): trade.scan(self) if trade.closed: closed_trades.append(i) # Delete closed trades for i in closed_trades[::-1]: del self.trades[i] def on_securities_changed(self, changes): for security in changes.added_securities: if security.symbol not in self.new_symbols: self.new_symbols.append(security.symbol) class Trade: def __init__(self, algorithm, symbol, hold_duration): self.symbol = symbol # Determine position size self.quantity = -int(2_000 / algorithm.securities[symbol].price) if self.quantity == 0: self.closed = True return # Enter trade algorithm.market_order(symbol, self.quantity) self.closed = False # Set variable for exit logic self.exit_date = algorithm.time + hold_duration def scan(self, algorithm): # Simple time-based exit if not self.closed and self.exit_date <= algorithm.time: algorithm.market_order(self.symbol, -self.quantity) self.closed = True
from AlgorithmImports import * from scipy.signal import detrend, savgol_filter from tslearn.metrics import dtw_path from scipy.interpolate import interp1d class TechnicalPattern: def matching(self, series: np.array, ref: np.array) -> np.array: # Smoothen the series from noise, # Savgol filter is a method that smoothen the noise but preserve sharp changes series_ = savgol_filter(series, 3, 2) # normalize for distance-efficient match series_ = (series_ - series_.mean()) / series_.std() # Compute the DTW path and obtain similarity score (DTW distance) path, similarity = dtw_path(ref, series_) # Match the series by DTW path series_ = np.array([series_[x[1]] for x in path]) ref = np.array([ref[x[0]] for x in path]) # Calculate if the series are similar, we use correlation coefficient here # You may also use SWZ algorithm (Savin et al, 2007), fourier transform component-matching, etc. score = np.corrcoef(series_, ref)[0, 1] return score, similarity def downsample(self, values, num_points): if num_points == len(values): return values adj_values = [] duplicates = int(2 * len(values) / num_points) if duplicates > 0: for x in values: for i in range(duplicates): adj_values.append(x) else: adj_values = values num_steps = num_points - 2 step_size = int(len(adj_values) / num_steps) smoothed_data = [adj_values[0]] for i in range(num_steps): start_idx = i * step_size end_idx = len(adj_values) - 1 if i == num_steps-1 else (i+1)*step_size - 1 segment = np.array(adj_values[start_idx:end_idx+1]) avg = sum(segment) / len(segment) smoothed_data.append(avg) smoothed_data.append(adj_values[-1]) return smoothed_data