Overall Statistics
Total Orders
2905
Average Win
0.06%
Average Loss
-0.05%
Compounding Annual Return
0.364%
Drawdown
5.900%
Expectancy
0.010
Start Equity
100000
End Equity
100516.13
Net Profit
0.516%
Sharpe Ratio
-0.658
Sortino Ratio
-0.829
Probabilistic Sharpe Ratio
9.569%
Loss Rate
53%
Win Rate
47%
Profit-Loss Ratio
1.13
Alpha
-0.031
Beta
-0.113
Annual Standard Deviation
0.037
Annual Variance
0.001
Information Ratio
0.167
Tracking Error
0.205
Treynor Ratio
0.216
Total Fees
$2913.05
Estimated Strategy Capacity
$25000000.00
Lowest Capacity Asset
ALL R735QTJ8XC9X
Portfolio Turnover
10.51%
from AlgorithmImports import *
from technical_pattern import TechnicalPattern
from scipy.stats import norm, uniform

class HeadAndShouldersPattern(TechnicalPattern):
    def __init__(self, sequence, window_size, max_lookback, step_size):
        self.sequence = np.array(sequence)
        self.window_size = window_size
        self.max_lookback = max_lookback
        self.step_size = step_size
        
        # Create pattern references
        if not hasattr(HeadAndShouldersPattern, "ref"):
            np.random.seed(1)
            ref_count = 100
            v1 = np.array([0] * ref_count) + 0.02 * norm.rvs(size=(ref_count, ))
            p1 = np.array([1] * ref_count) + 0.2 * norm.rvs(size=(ref_count, ))
            v2 = v1 + 0.2 * norm.rvs(size=(ref_count, ))
            v3 = v1 + 0.2 * norm.rvs(size=(ref_count, ))
            p3 = p1 + 0.02 * norm.rvs(size=(ref_count, ))
            p2 = 1.5 * np.maximum(p1, p3) + abs(uniform.rvs(size=(ref_count, )))
            v4 = v1 + 0.02 * norm.rvs(size=(ref_count, ))
            ref = np.array([
                v1, 
                (v1*.75+p1*.25) + 0.2 * norm.rvs(size=(ref_count, )), 
                (v1+p1)/2 + 0.2 * norm.rvs(size=(ref_count, )), 
                (v1*.25+p1*.75) + 0.2 * norm.rvs(size=(ref_count, )), 
                p1, 
                (v2*.25+p1*.75) + 0.2 * norm.rvs(size=(ref_count, )), 
                (v2+p1)/2 + 0.2 * norm.rvs(size=(ref_count, )), 
                (v2*.75+p1*.25) + 0.2 * norm.rvs(size=(ref_count, )), 
                v2, 
                (v2*.75+p2*.25) + 0.2 * norm.rvs(size=(ref_count, )), 
                (v2+p2)/2 + 0.2 * norm.rvs(size=(ref_count, )), 
                (v2*.25+p2*.75) + 0.2 * norm.rvs(size=(ref_count, )), 
                p2, 
                (v3*.25+p2*.75) + 0.2 * norm.rvs(size=(ref_count, )), 
                (v3+p2)/2 + 0.2 * norm.rvs(size=(ref_count, )), 
                (v3*.75+p2*.25) + 0.2 * norm.rvs(size=(ref_count, )), 
                v3, 
                (v3*.75+p3*.25) + 0.2 * norm.rvs(size=(ref_count, )), 
                (v3+p3)/2 + 0.2 * norm.rvs(size=(ref_count, )), 
                (v3*.25+p3*.75) + 0.2 * norm.rvs(size=(ref_count, )), 
                p3, 
                (v4*.25+p3*.75) + 0.2 * norm.rvs(size=(ref_count, )), 
                (v4+p3)/2 + 0.2 * norm.rvs(size=(ref_count, )), 
                (v4*.75+p3*.25) + 0.2 * norm.rvs(size=(ref_count, )), 
                v4
            ])
            HeadAndShouldersPattern.ref = ((ref - ref.mean(axis=1, keepdims=True)) / ref.std(axis=1, keepdims=True)).T

        # Warm up the factor values
        self.rows = HeadAndShouldersPattern.ref.shape[0]
        self.scan()

    
    def update(self, price):
        # Update the trailing window
        self.sequence = np.append(self.sequence, price)[-self.max_lookback:]
        
        # Update the factor values
        self.scan()

    def scan(self):
        self.corr = 0
        self.similarity_score = 0
        
        # Select varying lengths of trailing windows
        for i in range(self.window_size, self.max_lookback, self.step_size):
            # Check if enough history to fill the trailing window
            if len(self.sequence[-i:]) != i:
                break 

            # Select the trailing data and downsample it to 25 data points
            sub_sequence = self.downsample(self.sequence[-i:], self.window_size)
            
            # Normalize the data in the trailing window
            sub_sequence_std = np.std(sub_sequence)
            if sub_sequence_std == 0:
                continue
            norm_sub_sequence = (sub_sequence - np.mean(sub_sequence)) / sub_sequence_std
            
            # Evaluate the pattern presence
            # Calculate correlation and similarity scores for each reference pattern
            corr_scores = np.ndarray(shape=(self.rows))
            similarity_scores = np.ndarray(shape=(self.rows))
            for j in range(self.rows):
                score, similarity = self.matching(norm_sub_sequence, HeadAndShouldersPattern.ref[j, :])
                corr_scores[j] = score
                similarity_scores[j] = similarity

            # Aggregate the results to produce a single value for each factor
            self.corr = max(self.corr, np.mean(corr_scores))
            self.similarity_score = max(self.similarity_score, np.mean(similarity_scores))

    def downsample(self, data, target_length):
        factor = len(data) // target_length
        return [np.mean(data[i*factor:(i+1)*factor]) for i in range(target_length)]

    def matching(self, sub_sequence, ref_pattern):
        correlation = np.corrcoef(sub_sequence, ref_pattern)[0, 1]
        similarity = np.sum((sub_sequence - ref_pattern) ** 2)
        return correlation, similarity
# region imports
from AlgorithmImports import *

from head_and_shoulders import HeadAndShouldersPattern
# endregion

class HeadAndShouldersPatternDetectionAlgorithm(QCAlgorithm):

    trades = []
    new_symbols = []
    pattern_by_symbol = {}

    def initialize(self):
        self.set_start_date(2022, 1, 1)
        self.set_end_date(2023, 6, 1)
        self.set_cash(100000)

        # Pattern detection settings
        self.window_size = self.get_parameter("window_size", 25)
        self.max_lookback = self.get_parameter("max_lookback", 50)
        self.step_size = self.get_parameter("step_size", 10) # lookback jumps from window_size -> window_size+step_size -> window_size+2*step_size ... -> max_lookback
        
        # Universe settings
        self.coarse_size = self.get_parameter("coarse_size", 500)
        self.universe_size = self.get_parameter("universe_size", 5)

        # Trade settings
        self.hold_duration = timedelta(self.get_parameter("hold_duration", 3))

        # Define universe
        self.universe_settings.resolution = Resolution.DAILY
        self.add_universe(self.coarse_filter_function)


    def coarse_filter_function(self, coarse: List[CoarseFundamental]) -> List[Symbol]:
        coarse = sorted(coarse, key=lambda x: x.dollar_volume, reverse=True)[:self.coarse_size]

        # Create pattern detection objects for new securities
        coarse_symbols = [c.symbol for c in coarse]
        new_symbols = [symbol for symbol in coarse_symbols if symbol not in self.pattern_by_symbol]
        if new_symbols:
            history = self.history(new_symbols, self.max_lookback, Resolution.DAILY)
            for symbol in new_symbols:
                self.pattern_by_symbol[symbol] = HeadAndShouldersPattern(
                    history.loc[symbol]['close'].values if symbol in history.index else np.array([]), 
                    self.window_size, 
                    self.max_lookback, 
                    self.step_size
                )
        
        # Remove pattern detection objects for delisted securities
        delisted_symbols = [symbol for symbol in self.pattern_by_symbol.keys() if symbol not in coarse_symbols]
        for symbol in delisted_symbols:
            self.pattern_by_symbol.pop(symbol)

        # Scan for new patterns
        for c in coarse:
            if c.symbol not in new_symbols:
                self.pattern_by_symbol[c.symbol].update(c.adjusted_price)

        # Select symbols with high correlation and low DTW distance to the reference patterns
        #  Step 1: Sort symbols by correlation in descending order
        reverse_sorted_by_corr = [symbol for symbol, _ in sorted(self.pattern_by_symbol.items(), key=lambda x: x[1].corr, reverse=True)]
        #  Step 2: Sort symbols by DTW distance in ascending order
        sorted_by_dtw = [symbol for symbol, _ in sorted(self.pattern_by_symbol.items(), key=lambda x: x[1].similarity_score, reverse=False)]
        #  Step 3: Add the ranks of each factor together
        rank_by_symbol = {symbol: reverse_sorted_by_corr.index(symbol)+sorted_by_dtw.index(symbol) for symbol in self.pattern_by_symbol.keys()}
        #  Step 4: Select the symbols with the best combined rank across both factors
        return [ symbol for symbol, _ in sorted(rank_by_symbol.items(), key=lambda x: x[1])[:self.universe_size] ]



    def on_data(self, data: Slice):
        # Short every stock when it first enters the universe (when we first detect the pattern)
        for symbol in self.new_symbols:
            self.trades.append(Trade(self, symbol, self.hold_duration))
        self.new_symbols = []
        
        # Scan for exits
        closed_trades = []
        for i, trade in enumerate(self.trades):
            trade.scan(self)
            if trade.closed:
                closed_trades.append(i)
        
        # Delete closed trades
        for i in closed_trades[::-1]:
            del self.trades[i]

    def on_securities_changed(self, changes):
        for security in changes.added_securities:
            if security.symbol not in self.new_symbols:
                self.new_symbols.append(security.symbol)


class Trade:
    def __init__(self, algorithm, symbol, hold_duration):
        self.symbol = symbol
        
        # Determine position size
        self.quantity = -int(2_000 / algorithm.securities[symbol].price)
        if self.quantity == 0:
            self.closed = True
            return
        
        # Enter trade
        algorithm.market_order(symbol, self.quantity)
        self.closed = False

        # Set variable for exit logic
        self.exit_date = algorithm.time + hold_duration

    def scan(self, algorithm):
        # Simple time-based exit
        if not self.closed and self.exit_date <= algorithm.time:
            algorithm.market_order(self.symbol, -self.quantity)    
            self.closed = True 
from AlgorithmImports import *

from scipy.signal import detrend, savgol_filter
from tslearn.metrics import dtw_path
from scipy.interpolate import interp1d


class TechnicalPattern:

    def matching(self, series: np.array, ref: np.array) -> np.array:
        # Smoothen the series from noise, 
        # Savgol filter is a method that smoothen the noise but preserve sharp changes
        series_ = savgol_filter(series, 3, 2)
        # normalize for distance-efficient match
        series_ = (series_ - series_.mean()) / series_.std()
        # Compute the DTW path and obtain similarity score (DTW distance)
        path, similarity = dtw_path(ref, series_)
        # Match the series by DTW path
        series_ = np.array([series_[x[1]] for x in path])
        ref = np.array([ref[x[0]] for x in path])
        # Calculate if the series are similar, we use correlation coefficient here
        # You may also use SWZ algorithm (Savin et al, 2007), fourier transform component-matching, etc.
        score = np.corrcoef(series_, ref)[0, 1]
        return score, similarity

    def downsample(self, values, num_points):
        if num_points == len(values):
            return values

        adj_values = []
        duplicates = int(2 * len(values) / num_points)
        if duplicates > 0:
            for x in values:
                for i in range(duplicates):
                    adj_values.append(x)
        else:
            adj_values = values
        
        num_steps = num_points - 2
        step_size = int(len(adj_values) / num_steps)

        smoothed_data = [adj_values[0]]
        for i in range(num_steps):
            start_idx = i * step_size
            end_idx = len(adj_values) - 1 if i == num_steps-1 else (i+1)*step_size - 1
            segment = np.array(adj_values[start_idx:end_idx+1])

            avg = sum(segment) / len(segment)
            smoothed_data.append(avg)
            
        smoothed_data.append(adj_values[-1])

        return smoothed_data