Overall Statistics
Total Trades
1768
Average Win
0.60%
Average Loss
-0.63%
Compounding Annual Return
1.375%
Drawdown
8.000%
Expectancy
0.048
Net Profit
25.724%
Sharpe Ratio
0.452
Probabilistic Sharpe Ratio
0.283%
Loss Rate
46%
Win Rate
54%
Profit-Loss Ratio
0.95
Alpha
0
Beta
0
Annual Standard Deviation
0.021
Annual Variance
0
Information Ratio
0.452
Tracking Error
0.021
Treynor Ratio
0
Total Fees
$27721.41
Estimated Strategy Capacity
$11000.00
Lowest Capacity Asset
ETG SVQ3U3IGJZ39
from datetime import timedelta
from typing import NamedTuple
import itertools
from numpy import mean
from AlgorithmImports import *
from collections import namedtuple

CcgTreshold = namedtuple('CcgTreshold', ['long_entry','long_exit','short_entry','short_exit'])

class CcgPair:
    """ All the config for trading a single pair.
    """
    def __init__(self, tickers, tresholds, lookback_period) -> None:
        self.tickers = tickers
        self.tresholds = tresholds
        self.lookback_period = lookback_period
        self.pair_identifier = tickers[0] + '-' + tickers[1]
    
    def other_ticker(self, ticker):
        for tick in self.tickers:
            if tick != ticker:
                return tick 

class CcgPairsTradingConfig:
    pairs = []

    @classmethod
    def tickers(cls):
        return list(set(itertools.chain(*[p.tickers for p in cls.pairs])))

    @classmethod
    def initalize(cls, pairs):
        cls.pairs = pairs


class PairsData:
    def __init__(self, pair, algo) -> None:
        self.pair = pair
        self.ratios = RollingWindow[float](2)
        self.ups = RollingWindow[float](self.pair.lookback_period)
        self.downs = RollingWindow[float](self.pair.lookback_period)
        self.algo = algo
        self.pair_pnl = 0
        self.prev_upa = None
        self.prev_dna = None
        self.p1 = None
        self.p2 = None

    def up(self):
        """Note: RollingWindow is always reversed."""
        return max(self.ratios[0] - self.ratios[1], 0) if self.ratios.IsReady else 0
    
    def down(self):
        """Note: RollingWindow is always reversed."""
        return max(self.ratios[1] - self.ratios[0], 0) if self.ratios.IsReady else 0
        
    def update(self, ratio, p1=0, p2=0):
        self.ratios.Add(ratio)
        self.ups.Add(self.up())
        self.downs.Add(self.down())
        
        if self.ups.IsReady and self.prev_upa is None:
            self.prev_upa = mean([float(r) for r in self.ups])
        elif self.prev_upa is not None:
            self.prev_upa = (self.ups[0] + 2 * self.prev_upa) / 3
            
        if self.downs.IsReady and self.prev_dna is None:
            self.prev_dna = mean([float(r) for r in self.downs])
        elif self.prev_dna is not None:
            self.prev_dna = (self.downs[0] + 2 * self.prev_dna) / 3
            
        self.p1 = p1
        self.p2 = p2
            
    def upa(self):
        return 0 if self.prev_upa is None else self.prev_upa
    
    def dna(self):
        return 0 if self.prev_upa is None else self.prev_dna

    def rs(self):
        return self.upa()/self.dna() if self.upa() and self.dna() else 0
    
    def rsi(self):
        return 100 - 100/(1 + self.rs())
    
    def long_entry_signal(self):
        return 1 if self.rsi() and self.rsi() < self.pair.tresholds.long_entry else 0

    def long_exit_signal(self):
        return 1 if self.rsi() > self.pair.tresholds.long_exit else 0

    def short_entry_signal(self):
        return 1 if self.rsi() > self.pair.tresholds.short_entry else 0

    def short_exit_signal(self):
        return 1 if self.rsi() < self.pair.tresholds.short_exit else 0
    
    def get_insights(self):
        insights = []
        # Entries
        if self.long_entry_signal():
            insights.append(Insight.Price(self.pair.tickers[0], timedelta(days = 1), InsightDirection.Up))
            second_leg_insight = Insight.Price(self.pair.tickers[1], timedelta(days = 1), InsightDirection.Down)
            second_leg_insight.Weight = self.ratios[0]
            insights.append(second_leg_insight)
        if self.short_entry_signal():
            insights.append(Insight.Price(self.pair.tickers[0], timedelta(days = 1), InsightDirection.Down))
            second_leg_insight = Insight.Price(self.pair.tickers[1], timedelta(days = 1), InsightDirection.Up)
            second_leg_insight.Weight = self.ratios[0]
            insights.append(second_leg_insight)
        
        # Exits
        holding = self.algo.Portfolio.get(self.pair.tickers[0])
        
        holding_is_long = holding and holding.Invested and holding.IsLong
        holding_is_short = holding and holding.Invested and holding.IsShort
        
        if holding_is_long and self.long_exit_signal():
            insights.append(Insight.Price(self.pair.tickers[0], timedelta(days = 1), InsightDirection.Down))
            insights.append(Insight.Price(self.pair.tickers[1], timedelta(days = 1), InsightDirection.Up))
            
        if holding_is_short and self.short_exit_signal():
            insights.append(Insight.Price(self.pair.tickers[0], timedelta(days = 1), InsightDirection.Up))
            insights.append(Insight.Price(self.pair.tickers[1], timedelta(days = 1), InsightDirection.Down))
        
        # log_string = (
        #     f'{self.p1:.5f} '
        #     f'{self.p2:.5f} '
        #     f'{self.ratios[0]:.5f} '
        #     f'{self.ups[0]:.5f} '
        #     f'{self.downs[0]:.5f} '
        #     f'{self.upa():.5f} '
        #     f'{self.dna():.5f} '
        #     f'{self.rs():.5f} '
        #     f'{self.rsi():.5f} '
        #     f'{self.long_entry_signal()} '
        #     f'{self.long_exit_signal()} '
        #     f'{self.short_entry_signal()} '
        #     f'{self.short_exit_signal()} '
        # )
        
        # self.algo.Debug(log_string)
        
        return insights

class PairsTradingAlpha(AlphaModel):
    def __init__(self, algorithm) -> None:
        self.pairs_data = {}
        self.ticker_data = {}
        self.algo = algorithm

    def Update(self, algorithm, data):
        insights = []
        if not self.algo.can_trade:
            return []
        for pair in self.algo.pairs_config.pairs:
            close_first = algorithm.Securities[pair.tickers[0]].Price
            close_second = algorithm.Securities[pair.tickers[1]].Price
            ratio = close_first/close_second
            pair_data = self.pairs_data[pair.tickers[0]]
            pair_data.update(ratio, close_first, close_second)
            insights.extend(pair_data.get_insights())

        return insights

    def OnSecuritiesChanged(self, algorithm, changes) -> None:
        for added in changes.AddedSecurities:
            ticker = added.Symbol
            ticker_pair_data = self.pairs_data.get(ticker)
            if ticker_pair_data is None:
                for pair in self.algo.pairs_config.pairs:
                    if ticker in pair.tickers and self.pairs_data.get(pair.other_ticker(ticker)) is None:
                        pair_data = PairsData(pair, self.algo)
                        self.pairs_data[ticker] = pair_data
                        self.pairs_data[pair.other_ticker(ticker)] = pair_data

            
        return super().OnSecuritiesChanged(algorithm, changes)



class PairsTradingPortfolioConstructionModel(PortfolioConstructionModel):

    def __init__(self, algo):
        self.algo = algo
        self.portfolioUsage = 0.5
        self.weight = round(1/len(self.algo.pairs_config.pairs), 2) * self.portfolioUsage
        super().__init__()

    def CreateTargets(self, algorithm, insights):
        targets = {}
        for insight in insights:
            holding = algorithm.Portfolio[insight.Symbol]
            if holding.Invested:
                if (
                    holding.IsShort or insight.Direction != InsightDirection.Up
                ) and (
                    holding.IsLong or insight.Direction != InsightDirection.Down
                ):
                    targets[str(insight.Symbol)] = PortfolioTarget(insight.Symbol, -holding.Quantity)
                continue
            targets[str(insight.Symbol)] = PortfolioTarget.Percent(algorithm, insight.Symbol, insight.Direction * self.weight * (insight.Weight or 1))
        return list(targets.values())

class PairsTradingExecution(ExecutionModel):
    # Fill the supplied portfolio targets efficiently
    def Execute(self, algorithm, targets):
        for target in targets:
            algorithm.MarketOnCloseOrder(target.Symbol, target.Quantity)


class PairsTrading(QCAlgorithm):
    def Initialize(self):
        self.SetStartDate(2004, 1, 29)  # Set Start Date
        self.SetEndDate(2020, 10, 30)  # Set End Date
        self.SetCash(100000)  # Set Strategy Cash
        self.run_hour = 15
        self.run_minute = 42
        self.can_trade = False
        self.SetWarmUp(timedelta(3))

        pairs = [
            #pairs             #treshold of entry/exit                                # lookback_period
            [('ETG', 'EVT'), CcgTreshold(long_entry=20, long_exit=30, short_entry=80, short_exit=70), 3],
        ]
        ccg_pairs = []
        for pair_init in pairs:
            pair = CcgPair(*pair_init)
            ccg_pairs.append(pair)
        
        CcgPairsTradingConfig.initalize(ccg_pairs)

        self.SetBrokerageModel(BrokerageName.InteractiveBrokersBrokerage, AccountType.Margin)
        # Universe 
        self.UniverseSettings.Resolution = Resolution.Minute
        self.UniverseSettings.DataNormalizationMode = DataNormalizationMode.Raw

        symbols = []
        for ticker in CcgPairsTradingConfig.tickers():
            symbol = Symbol.Create(ticker, SecurityType.Equity, Market.USA)
            symbols.append(symbol)

        self.AddUniverseSelection(ManualUniverseSelectionModel(symbols))

        self.pairs_config = CcgPairsTradingConfig

        # Alphas
        self.AddAlpha(PairsTradingAlpha(self))

        # Portfolio Construction
        self.SetPortfolioConstruction( PairsTradingPortfolioConstructionModel(self))

        # Order Execution
        self.SetExecution( PairsTradingExecution() ) 



    def OnData(self, data):
        """OnData event is the primary entry point for your algorithm. Each new data point will be pumped in here.
            Arguments:
                data: Slice object keyed by symbol containing the stock data
        """
        self.can_trade = (self.Time.hour == self.run_hour and self.Time.minute == self.run_minute)