book
Checkout our new book! Hands on AI Trading with Python, QuantConnect, and AWS Learn More arrow

Indicators

Custom Indicators

Introduction

LEAN supports over 100 pre-built indicators, but a custom indicator is an indicator you define. It receives input, performs some calculation, and sets its output value. Custom indicators are helpful when you want to achieve any of the following results:

  • Use an indicator that LEAN doesn't currently implement
  • Combine built-in indicators beyond the logic of Indicator Extensions
  • Create your own unique indicator for trading

Define Indicators

Custom indicators must implement the PythonIndicator class. The indicator must have an update method and name, time, and value attributes. The update method must accept an IndicatorDataPoint, QuoteBar, or TradeBar and return a boolean that represents if the indicator is ready. The time attribute represents the last time you updated the indicator and the value attribute represents the current indicator value. The following definition provides an example of a custom simple moving average indicator.

Select Language:
class CustomSimpleMovingAverage(PythonIndicator):
    def __init__(self, name, period):
        self.name = name
        self.warm_up_period = period
        self.time = datetime.min
        self.value = 0
        self.queue = deque(maxlen=period)

    def update(self, input: BaseData) -> bool:
        self.queue.appendleft(input.value)
        count = len(self.queue)
        self.time = input.time
        self.value = sum(self.queue) / count
        return count == self.queue.maxlen

The following definition provides an example of a custom money flow index indicator.

Select Language:
class CustomMoneyFlowIndex(PythonIndicator):
    def __init__(self, name, period):
        super().__init__()
        self.name = name
        self.value = 0
        self.previous_typical_price = 0
        self.negative_money_flow = deque(maxlen=period)
        self.positive_money_flow = deque(maxlen=period)
    
    def update(self, input):
        if not isinstance(input, TradeBar):
            raise TypeError('CustomMoneyFlowIndex.update: input must be a TradeBar')
    
        typical_price = (input.high + input.low + input.close) / 3
        money_flow = typical_price * input.volume
            
        # We need to avoid double rounding errors
        if abs(self.previous_typical_price / typical_price - 1) < 1e-10:
            self.previous_typical_price = typical_price
            
        self.negative_money_flow.appendleft(money_flow if typical_price < self.previous_typical_price else 0)
        self.positive_money_flow.appendleft(money_flow if typical_price > self.previous_typical_price else 0)
        self.previous_typical_price = typical_price
    
        positive_money_flow_sum = sum(self.positive_money_flow)        
        total_money_flow = positive_money_flow_sum + sum(self.negative_money_flow)
    
        self.value = 100
        if total_money_flow != 0:
            self.value *= positive_money_flow_sum / total_money_flow
    
        return len(self.positive_money_flow) == self.positive_money_flow.maxlen

Create Indicators

You must define a custom indicator before you can create an instance of it.

To create a custom indicator, call the indicator constructor.

Select Language:
self.custom_sma = CustomSimpleMovingAverage("My SMA", 10)

Updates

The process to update custom indicators is the same process you use to update manual indicators. For more information about updating manual indicators, see Manual Updates or Automatic Updates.

Warm Up Indicators

The process to warm up custom indicators is the same process you use to warm up manual indicators.

Examples

The following examples demonstrate some common practices for implementing custom indicators.

Example 1: Custom Money Flow Index

The following algorithm implements a custom Money Flow Index indicator. We estimate the supply-demand balance of SPY and trade using the average money flow direction.

Select Language:
from collections import deque 

class CustomIndicatorsAlgorithm(QCAlgorithm):
    def initialize(self) -> None:
        self.set_start_date(2022, 1, 1)
        self.set_end_date(2022, 6, 1)

        # Request daily SPY data to feed the indicators to generate trade signals and trade.
        self.spy = self.add_equity("SPY").symbol

        # Create a custom money flow index to generate a trade signal.
        self.custom_mfi = CustomMoneyFlowIndex(20)

        # Warm up for immediate usage of indicators.
        self.set_warm_up(20, Resolution.DAILY)

    def on_data(self, slice: Slice) -> None:
        bar = slice.bars.get(self.spy)
        if bar:
            # Update the custom MFI with the updated trade bar to obtain the updated trade signal.
            self.custom_mfi.update(bar)

            # Buy if the positive money flow is above negative, indicating demand is greater than supply, driving up the price.
            if self.custom_mfi.current.value > 50:
                self.set_holdings(self.spy, 1)
            # Sell if the positive money flow is below negative, indicating demand is less than supply, driving down the price.
            else:
                self.set_holdings(self.spy, -1)

class CustomMoneyFlowIndex(PythonIndicator):
    def __init__(self, period: int) -> None:
        super().__init__()
        self.value = 0
        self.previous_typical_price = 0
        self.negative_money_flow = deque(maxlen=period)
        self.positive_money_flow = deque(maxlen=period)
    
    def update(self, input: BaseData) -> bool:
        if not isinstance(input, TradeBar):
            raise TypeError('CustomMoneyFlowIndex.update: input must be a TradeBar')
    
        # Estimate the money flow by averaging the price multiplied by volume.
        typical_price = (input.high + input.low + input.close) / 3
        money_flow = typical_price * input.volume
            
        # We need to avoid double-rounding errors.
        if abs(self.previous_typical_price / typical_price - 1) < 1e-10:
            self.previous_typical_price = typical_price
        
        # Add the period money flow to calculate the aggregated money flow.
        self.negative_money_flow.appendleft(money_flow if typical_price < self.previous_typical_price else 0)
        self.positive_money_flow.appendleft(money_flow if typical_price > self.previous_typical_price else 0)
        self.previous_typical_price = typical_price
    
        positive_money_flow_sum = sum(self.positive_money_flow)        
        total_money_flow = positive_money_flow_sum + sum(self.negative_money_flow)
    
        # Set the value to be the positive money flow ratio.
        self.value = 100
        if total_money_flow != 0:
            self.value *= positive_money_flow_sum / total_money_flow
    
        # Set the is_ready property to receive the required bars to fill all windows.
        return len(self.positive_money_flow) == self.positive_money_flow.maxlen

Example 2: Multiple Symbol Custom Indicator

The following algorithm implements a custom Cointegration price divergence indicator involving 2 symbols: GOOGL & GOOG. It trades the arbitrage between 2 cointegrated stocks on their price convergence after deviating more than 2 standard deviations.

Select Language:
from sklearn.linear_model import LinearRegression
from statsmodels.tsa.stattools import adfuller

class CustomIndicatorsAlgorithm(QCAlgorithm):
    def initialize(self) -> None:
        self.set_start_date(2019, 1, 1)
        self.set_end_date(2023, 1, 1)

        # Request 2 classes of Google stock data to feed the indicators to generate trade signals and trade.
        goog1 = self.add_equity("GOOGL").symbol
        goog2 = self.add_equity("GOOG").symbol

        # Create a custom money flow index to generate a trade signal.
        self.cointegration = Cointegration(self, goog1, goog2, 252, self.date_rules.month_start())
        # Add a handler to trade on updates.
        self.cointegration.updated += self.on_updated
        # Register the indicator to update automatically with daily data.
        self.register_indicator(goog1, self.cointegration, Resolution.DAILY)
        self.register_indicator(goog2, self.cointegration, Resolution.DAILY)

        # Warm up for immediate usage of indicators.
        self.warm_up_indicator(goog1, self.cointegration, Resolution.DAILY)
        self.warm_up_indicator(goog2, self.cointegration, Resolution.DAILY)

    def on_updated(self, sender: object, point: IndicatorDataPoint) -> None:
        if sender.is_ready:
            holding = self.portfolio[self.cointegration.symbol1]

            # If the residual is lower than -2x SD, it means class A price is much higher than what it should be compared to class C.
            # We sell class A and buy class C to bet on their price convergence.
            if point.value < -2 and not holding.is_short:
                self.set_holdings(self.cointegration.symbol1, -0.5)
                self.set_holdings(self.cointegration.symbol2, 0.5 * self.cointegration.ratio)
            # If the residual is higher than the threshold, it means class A price is much lower than what it should be compared to class C.
            # We buy class A and sell class C to bet on their price convergence.
            elif point.value > 2 and not holding.is_long:
                self.set_holdings(self.cointegration.symbol1, 0.5)
                self.set_holdings(self.cointegration.symbol2, -0.5 * self.cointegration.ratio)
            # Close positions of the price are converged.
            elif (holding.is_short and point.value > 0) or (holding.is_long and point.value < 0):
                self.liquidate()

class Cointegration(PythonIndicator):
    # The standard deviation of the residuals is such that the returned indicator value is relative to the SD.
    _residual_sd = 1e10
    # Store the coefficient and intercept of the cointegrated series for calculating the spread of a new data point.
    _coefficients = [0, 0]
    _windows = {}

    @property
    def ratio(self) -> float:
        return self._coefficients[0]

    @property
    def is_ready(self) -> bool:
        return all(window.is_ready for window in self._windows.values()) and all(x != 0 for x in self._coefficients)

    def __init__(self, algorithm: QCAlgorithm, symbol1: Symbol, symbol2: Symbol, period: int, recalibrating_date_rule: IDateRule) -> None:
        super().__init__("Cointegration")
        self.symbol1 = symbol1
        self.symbol2 = symbol2
        self.value = 0

        # Use rolling windows to save the price data for cointegration analysis.
        self._windows[symbol1] = RollingWindow[IndicatorDataPoint](period)
        self._windows[symbol2] = RollingWindow[IndicatorDataPoint](period)

        # Adjust the cointegration factor between the 2 classes' price series.
        algorithm.schedule.on(
            recalibrating_date_rule,
            algorithm.time_rules.at(23, 59), 
            self.calculate_cointegration
        )

        self.warm_up_period = period
    
    def update(self, input: BaseData) -> bool:
        # Update the rolling windows for cointegration analysis.
        window = self._windows.get(input.symbol)
        if not window:
            raise Exception(f"{input.Symbol} is not part of the Cointegration relation.")
        window.add(IndicatorDataPoint(input.symbol, input.end_time, input.value))

        if not all(x == 0 for x in self._coefficients):
            # Calculate the updated cointegrated series spread only if all symbol data points are updated.
            if all(window[0].end_time == input.end_time for window in self._windows.values()):
                self.value = (self._coefficients[0] * self._windows[self.symbol2][0].value + self._coefficients[1] - self._windows[self.symbol1][0].value) / self._residual_sd

        return self.is_ready

    def calculate_cointegration(self) -> None:
        # Lag direction is unimportant; it is just a sign flip in the linear regression, so we don't need to flip the window order.
        y = np.array([x.value for x in self._windows[self.symbol1]]).reshape(-1, 1)
        x = np.array([x.value for x in self._windows[self.symbol2]]).reshape(-1, 1)

        # Perform Linear Regression on both price series to investigate their relationship.
        lr = LinearRegression().fit(x, y)
        slope = float(lr.coef_[0])
        intercept = float(lr.intercept_)

        # Calculate the residuals series to check if it is stationary, meaning if the 2 price series move together.
        residuals = y - (intercept + slope * x)

        # Check if the residuals are stationary using the augmented Dickey-Fuller test.
        # This means no unit root exists for the difference series, and the residuals are stationary.
        critical = -1.941 + -0.2686 / self.warm_up_period + -3.365 / self.warm_up_period**2 + 31.223 / self.warm_up_period**3
        adf_reject = adfuller(residuals)[0] <= -3.45
        if adf_reject:
            # If cointegrated, update the positional sizing ratio and the spread threshold of the trade trigger.
            self._coefficients = [slope, intercept]
            self._residual_sd = float(np.std(residuals))
        else:
            self._coefficients = [0, 0]
            self._residual_sd = 100000000          # An arbitrarily large number that the class A price will never reach.

Other Examples

For more examples, see the following algorithms:

Demonstration Algorithms
CustomIndicatorAlgorithm.py Python

You can also see our Videos. You can also get in touch with us via Discord.

Did you find this page helpful?

Contribute to the documentation: