Overall Statistics
Total Trades
0
Average Win
0%
Average Loss
0%
Compounding Annual Return
0%
Drawdown
0%
Expectancy
0
Net Profit
0%
Sharpe Ratio
0
Probabilistic Sharpe Ratio
0%
Loss Rate
0%
Win Rate
0%
Profit-Loss Ratio
0
Alpha
0
Beta
0
Annual Standard Deviation
0
Annual Variance
0
Information Ratio
0
Tracking Error
0
Treynor Ratio
0
Total Fees
$0.00
Estimated Strategy Capacity
$0
Lowest Capacity Asset
from SimpleLinearRegressionChannel import SimpleLinearRegressionChannel
import collections 
#import sys as sys

class SimpleLinearRegressionChannelAlgorithm(QCAlgorithm):
    def Initialize(self):
        self.SetStartDate(2021, 12, 9)
        self.SetEndDate(2021, 12, 9)
        self.SetCash(100000)
        
        self.pair = self.AddCfd("US30USD", Resolution.Minute, Market.Oanda).Symbol

        self.slrc = SimpleLinearRegressionChannel(self, 30, 30, 2.0)
        self.slrcB = SimpleLinearRegressionChannel(self, 60, 60, 2.0)
        
        self.linRegWindow = collections.deque(maxlen=60)
        self.Consolidate(self.pair, timedelta(minutes=60), self.OnDataHour)
        
        self.SetWarmUp(self.slrc.WarmUpPeriod, Resolution.Minute)
        
    def OnDataHour(self, data): 
        
        if len(self.linRegWindow) == 60:
            for i in self.linRegWindow:
                self.slrcB.Update(i)
    
    def OnData(self, data):
        if data.ContainsKey(self.pair) and data[self.pair] is not None:
            self.linRegWindow.append(data[self.pair])
        else:
            return

        if not self.slrcB.IsReady:
            return

        # Since our indicator is ready, we can use it to compare the current bar with the simple linear regression projection.
        (low, mid, high) = self.slrcB.GetProjection()
        
        bar = data[self.pair]
        
        #self.Debug(f"first: {self.slrc._base_window[-1]} latest:{self.slrc._base_window[0]}")
        #self.Debug(sys.version)
        
        #Plot points
        
        self.Plot("Pricing", "Price", bar.Close)
        self.Plot("Pricing", "LowerChannel", low)
        self.Plot("Pricing", "LinearReg-Extension", mid)
        self.Plot("Pricing", "HigherChannel", high)
        self.Plot("CE", "CorrelationCoefficient", self.slrcB.GetCorrelationCoefficient())
        
        #self.Debug(f"({low}, {mid}, {high})")
from collections import deque
from statistics import stdev

class SimpleLinearRegressionChannel(PythonIndicator):
    def __init__(self, algorithm: QCAlgorithm, base_period: int, projection_period: int, channel_width: float):
        super().__init__()
        assert base_period > 0, f"{self.__init__.__qualname__}: base_period must be greater than 0."
        assert projection_period > 0, f"{self.__init__.__qualname__}: projection_period must be greater than 0."
        assert channel_width >= 0.0, f"{self.__init__.__qualname__}: channel_width must be greater than or equal to 0.0."
        if base_period < 10:
            algorithm.Log(f"Warning - {self.__init__.__qualname__}: base_period is less than 10. This is very few data points to compute a simple linear regression.")
        self._algorithm = algorithm
        self._base_period = base_period
        self._x = list(range(1, base_period + 1))
        self._x_sum = sum(self._x)
        self._x_mean = self._x_sum / base_period
        self._diffs_x_mean = [(x_i - self._x_mean) for x_i in self._x]
        self._B1_den = sum(pow(x_i, 2) for x_i in self._diffs_x_mean)
        self._projection_period = projection_period
        self._channel_width = channel_width
        self._stdev = None
        self.Value = None

        self._base_window = deque(maxlen=base_period)
        self._B0 = None
        self._B1 = None
        self._projection_window = deque(maxlen=projection_period)

        self._R_den_x = (base_period * sum(pow(x_i, 2) for x_i in self._x)) - pow(self._x_sum, 2)

        self.WarmUpPeriod = base_period

    @property
    def IsReady(self):
        return (len(self._base_window) == self._base_window.maxlen) and (len(self._projection_window) >= 1)

    def Update(self, _input):
        if len(self._base_window) != self._base_window.maxlen:
            self._base_window.append(_input.Close)
        else:
            projection_size = len(self._projection_window)
            if projection_size == 0:
                self._simple_linreg()
            if projection_size != self._projection_window.maxlen:
                self._projection_window.append(_input.Close)
            else:
                self.Reset(_input)

    def _simple_linreg(self):
        y_mean = sum(self._base_window) / self._base_period

        B1_num = sum((x_j * y_j) for x_j, y_j in zip(self._diffs_x_mean, [(y_i - y_mean) for y_i in self._base_window]))
        self._B1 = B1_num / self._B1_den
        self._B0 = y_mean - (self._B1 * self._x_mean)

        self._stdev = stdev(self._base_window)

    def Reset(self, _input = None):
        #self._algorithm.Log(f"first: {self._base_window[0]} latest: {self._base_window[-1]}") #displays close 2x behind (first @ 4:00 = 3:00)
        #self._algorithm.Log(f"data: {self._base_window} END")
        self._base_window.clear()
        if self._base_period > self._projection_period:
            self._base_window.extend(self._projection_window)
            if _input is not None:
                self._base_window.append(_input.Close)
            self._projection_window.clear()
        else:
            while (len(self._projection_window) > 0) and (len(self._base_window) != self._base_window.maxlen):
                self._base_window.append(self._projection_window.popleft())
            if len(self._base_window) == self._base_window.maxlen:
                self._simple_linreg()
                if _input is not None:
                    self._projection_window.append(_input.Close)
            else:
                if _input is not None:
                    self._base_window.append(_input.Close)

    def HardReset(self, _input = None):
        self._base_window.clear()
        if _input is not None:
            self._base_window.append(_input.Close)
        self._projection_window.clear()

    def GetSlope(self):
        if self.IsReady:
            return self._B1
        else:
            return None

    def GetProjection(self):
        if self.IsReady:
            x = self._base_period + len(self._projection_window)
            y = self._B0 + (self._B1 * x)
            channel = self._channel_width * self._stdev
            return (y - channel, y, y + channel)
        else:
            return (None, None, None)

    def GetCorrelationCoefficient(self):
        if self.IsReady:
            num = (self._base_period * sum((x_i * y_i) for x_i, y_i in zip(self._x, self._base_window))) - (self._x_sum * sum(self._base_window))
            den = math.sqrt(self._R_den_x * ((self._base_period * sum(pow(y_i, 2) for y_i in self._base_window)) - pow(sum(self._base_window), 2)))
            return num / den
        else:
            return None