Overall Statistics |
Total Trades 0 Average Win 0% Average Loss 0% Compounding Annual Return 0% Drawdown 0% Expectancy 0 Net Profit 0% Sharpe Ratio 0 Probabilistic Sharpe Ratio 0% Loss Rate 0% Win Rate 0% Profit-Loss Ratio 0 Alpha 0 Beta 0 Annual Standard Deviation 0 Annual Variance 0 Information Ratio 0 Tracking Error 0 Treynor Ratio 0 Total Fees $0.00 Estimated Strategy Capacity $0 Lowest Capacity Asset |
from SimpleLinearRegressionChannel import SimpleLinearRegressionChannel import collections #import sys as sys class SimpleLinearRegressionChannelAlgorithm(QCAlgorithm): def Initialize(self): self.SetStartDate(2021, 12, 9) self.SetEndDate(2021, 12, 9) self.SetCash(100000) self.pair = self.AddCfd("US30USD", Resolution.Minute, Market.Oanda).Symbol self.slrc = SimpleLinearRegressionChannel(self, 30, 30, 2.0) self.slrcB = SimpleLinearRegressionChannel(self, 60, 60, 2.0) self.linRegWindow = collections.deque(maxlen=60) self.Consolidate(self.pair, timedelta(minutes=60), self.OnDataHour) self.SetWarmUp(self.slrc.WarmUpPeriod, Resolution.Minute) def OnDataHour(self, data): if len(self.linRegWindow) == 60: for i in self.linRegWindow: self.slrcB.Update(i) def OnData(self, data): if data.ContainsKey(self.pair) and data[self.pair] is not None: self.linRegWindow.append(data[self.pair]) else: return if not self.slrcB.IsReady: return # Since our indicator is ready, we can use it to compare the current bar with the simple linear regression projection. (low, mid, high) = self.slrcB.GetProjection() bar = data[self.pair] #self.Debug(f"first: {self.slrc._base_window[-1]} latest:{self.slrc._base_window[0]}") #self.Debug(sys.version) #Plot points self.Plot("Pricing", "Price", bar.Close) self.Plot("Pricing", "LowerChannel", low) self.Plot("Pricing", "LinearReg-Extension", mid) self.Plot("Pricing", "HigherChannel", high) self.Plot("CE", "CorrelationCoefficient", self.slrcB.GetCorrelationCoefficient()) #self.Debug(f"({low}, {mid}, {high})")
from collections import deque from statistics import stdev class SimpleLinearRegressionChannel(PythonIndicator): def __init__(self, algorithm: QCAlgorithm, base_period: int, projection_period: int, channel_width: float): super().__init__() assert base_period > 0, f"{self.__init__.__qualname__}: base_period must be greater than 0." assert projection_period > 0, f"{self.__init__.__qualname__}: projection_period must be greater than 0." assert channel_width >= 0.0, f"{self.__init__.__qualname__}: channel_width must be greater than or equal to 0.0." if base_period < 10: algorithm.Log(f"Warning - {self.__init__.__qualname__}: base_period is less than 10. This is very few data points to compute a simple linear regression.") self._algorithm = algorithm self._base_period = base_period self._x = list(range(1, base_period + 1)) self._x_sum = sum(self._x) self._x_mean = self._x_sum / base_period self._diffs_x_mean = [(x_i - self._x_mean) for x_i in self._x] self._B1_den = sum(pow(x_i, 2) for x_i in self._diffs_x_mean) self._projection_period = projection_period self._channel_width = channel_width self._stdev = None self.Value = None self._base_window = deque(maxlen=base_period) self._B0 = None self._B1 = None self._projection_window = deque(maxlen=projection_period) self._R_den_x = (base_period * sum(pow(x_i, 2) for x_i in self._x)) - pow(self._x_sum, 2) self.WarmUpPeriod = base_period @property def IsReady(self): return (len(self._base_window) == self._base_window.maxlen) and (len(self._projection_window) >= 1) def Update(self, _input): if len(self._base_window) != self._base_window.maxlen: self._base_window.append(_input.Close) else: projection_size = len(self._projection_window) if projection_size == 0: self._simple_linreg() if projection_size != self._projection_window.maxlen: self._projection_window.append(_input.Close) else: self.Reset(_input) def _simple_linreg(self): y_mean = sum(self._base_window) / self._base_period B1_num = sum((x_j * y_j) for x_j, y_j in zip(self._diffs_x_mean, [(y_i - y_mean) for y_i in self._base_window])) self._B1 = B1_num / self._B1_den self._B0 = y_mean - (self._B1 * self._x_mean) self._stdev = stdev(self._base_window) def Reset(self, _input = None): #self._algorithm.Log(f"first: {self._base_window[0]} latest: {self._base_window[-1]}") #displays close 2x behind (first @ 4:00 = 3:00) #self._algorithm.Log(f"data: {self._base_window} END") self._base_window.clear() if self._base_period > self._projection_period: self._base_window.extend(self._projection_window) if _input is not None: self._base_window.append(_input.Close) self._projection_window.clear() else: while (len(self._projection_window) > 0) and (len(self._base_window) != self._base_window.maxlen): self._base_window.append(self._projection_window.popleft()) if len(self._base_window) == self._base_window.maxlen: self._simple_linreg() if _input is not None: self._projection_window.append(_input.Close) else: if _input is not None: self._base_window.append(_input.Close) def HardReset(self, _input = None): self._base_window.clear() if _input is not None: self._base_window.append(_input.Close) self._projection_window.clear() def GetSlope(self): if self.IsReady: return self._B1 else: return None def GetProjection(self): if self.IsReady: x = self._base_period + len(self._projection_window) y = self._B0 + (self._B1 * x) channel = self._channel_width * self._stdev return (y - channel, y, y + channel) else: return (None, None, None) def GetCorrelationCoefficient(self): if self.IsReady: num = (self._base_period * sum((x_i * y_i) for x_i, y_i in zip(self._x, self._base_window))) - (self._x_sum * sum(self._base_window)) den = math.sqrt(self._R_den_x * ((self._base_period * sum(pow(y_i, 2) for y_i in self._base_window)) - pow(sum(self._base_window), 2))) return num / den else: return None