Overall Statistics
Total Trades
0
Average Win
0%
Average Loss
0%
Compounding Annual Return
0%
Drawdown
0%
Expectancy
0
Net Profit
0%
Sharpe Ratio
0
Probabilistic Sharpe Ratio
0%
Loss Rate
0%
Win Rate
0%
Profit-Loss Ratio
0
Alpha
0
Beta
0
Annual Standard Deviation
0
Annual Variance
0
Information Ratio
-46.106
Tracking Error
0.221
Treynor Ratio
0
Total Fees
$0.00
import math

res = Resolution.Minute
asset_class = 'stocks'
pairs_list = {
    'stocks': [
        ('CVX', 'XOM'),
        ('EGBN', 'FMBI'),
        ('SPY', 'IWM')
    ],
    'forex': [
        ('EURUSD', 'AUDUSD')
    ],
    'crypto': [
        ('BTCUSD', 'ETHUSD')
    ]
}
symbol1, symbol2 = pairs_list[asset_class][2]
infinity = float('inf')
NoneType = type(None)

class MeanReversionResearch(QCAlgorithm):
    
    def Initialize(self):
        self.SetStartDate(2020, 11, 4)
        self.SetEndDate(2020, 11, 6)
        self.SetCash(100_000)
        
        self.ZScoreSeries = Series("Z-Score", SeriesType.Line, 0)
        chart = Chart("Z-Score")
        chart.AddSeries(self.ZScoreSeries)
        self.AddChart(chart)
        
        if asset_class == 'stocks':
            self.AddEquity(symbol1, res)
            self.AddEquity(symbol2, res)
        elif asset_class == 'forex':
            self.AddForex(symbol1, res)
            self.AddForex(symbol2, res)
        elif asset_class == 'crypto':
            self.AddCrypto(symbol1, res)
            self.AddCrypto(symbol2, res)
        
        self.last_price1 = None
        self.last_price2 = None
        self.max_delta = 0
        self.min_delta = 0
        self.lowest_zscore = infinity
        
        self.Spread = SpreadRatio(f'{symbol1}_{symbol2}')


    def OnData(self, data):
        if res == Resolution.Tick:
            if data.ContainsKey(symbol1) and data[symbol1] != None:
                self.last_price1 = data[symbol1][-1].Price
            if data.ContainsKey(symbol2) and data[symbol2] != None:
                self.last_price2 = data[symbol2][-1].Price
        else:
            if data.ContainsKey(symbol1) and data[symbol1] != None:
                self.last_price1 = data[symbol1].Close
            if data.ContainsKey(symbol2) and data[symbol2] != None:
                self.last_price2 = data[symbol2].Close
            
        if self.last_price1 != None and self.last_price2 != None:
            price_delta = self.last_price1 - self.last_price2
            
            zscore = self.Spread.GetZScore(price_delta)
            self.ZScoreSeries.AddPoint(self.Time, zscore)
            
            if abs(zscore) < self.lowest_zscore:
                self.min_delta = price_delta
                self.lowest_zscore = zscore
            
            if price_delta > self.max_delta:
                self.max_delta = price_delta
                self.Debug(f'max delta: {self.max_delta - self.min_delta}')
            
            self.last_price1 = None
            self.last_price2 = None


class SpreadRatio:

    def __init__(self, label):
        self.label = label
        self.squared_differences = 0
        self.mean = 0
        self.var = 0
        self.N = 1

    def GetZScore(self, ratio):
        # Incrementally update sample estimates via Welford (1962)
        new_mean = self.mean + (ratio - self.mean) / self.N
        self.squared_differences += (ratio - self.mean) * (ratio - new_mean)
        self.var = self.squared_differences / self.N
        
        self.N += 1
        self.mean = new_mean
        
        sqrt = math.sqrt(self.var)
        
        if sqrt == 0.0:
            return ratio - self.mean
        else:
            return (ratio - self.mean) / sqrt