Overall Statistics
Total Trades
41
Average Win
4.96%
Average Loss
-1.90%
Compounding Annual Return
37.716%
Drawdown
12.900%
Expectancy
0.803
Net Profit
40.522%
Sharpe Ratio
1.372
Probabilistic Sharpe Ratio
60.382%
Loss Rate
50%
Win Rate
50%
Profit-Loss Ratio
2.61
Alpha
0.27
Beta
-0.04
Annual Standard Deviation
0.205
Annual Variance
0.042
Information Ratio
0.952
Tracking Error
0.593
Treynor Ratio
-7.033
Total Fees
â‚®0.00
Estimated Strategy Capacity
â‚®2000000.00
Lowest Capacity Asset
ETHUSDT 18N
# region imports
from AlgorithmImports import *
from datetime import timedelta
from QuantConnect import Resolution, AccountType
from QuantConnect.Algorithm import QCAlgorithm
from QuantConnect.Brokerages import BrokerageName
from QuantConnect.Data.Market import TradeBar
# endregion


# strategy parameters
USE_EMA = True                      # enable EMAs?
USE_EMA_CROSSOVER = False          # only enter position on crossovers or every time EMA condition is valid?
EMA1_LENGTH = 7
EMA2_LENGTH = 11
MAMA_LENGTH = 55
MAMA_EXP = True
MAMA_M_LENGTH = 34
MAMA_ACCEL_FACTOR = True
MAMA_P_LENGTH = 13

# Trailing stop
USE_TS = False
TS_DISTANCE = 10.0

# backtesting options
TICKER = "ETHUSDT"
MIN_POS = 10                        # please enter minimum order quantity here to properly detect near-zero position
                                    # because sometimes after closing position there are leftovers remaining on balance
TIMEFRAME = timedelta(days=1)       # timeframe, use seconds, minutes, hours, days, weeks, months,..
RESOLUTION = Resolution.Minute      # backtesting resolution, Second/Minute/Hour/Daily, should be lower than timeframe
CASH = 3_000_000                      # starting balance
POSITION_SIZE_PCT = 33.0            # position size, percent of balance
CURRENCY = "USDT"                   # should match ticker's second currency
START_DATE = (2022, 1, 1)           # backtesting start date, (YYYY, MM, DD)
END_DATE = None              # backtesting end date (YYYY, MM, DD) or None to run till the end
FEES = False                         # enable fees


class PF_EMA_MAMA(QCAlgorithm):
    def Initialize(self) -> None:
        self.SetTimeZone("UTC")
        self.UniverseSettings.Resolution = RESOLUTION
        self.SetAccountCurrency("USDT")
        self.SetCash(CURRENCY, CASH)
        self.SetStartDate(*START_DATE)
        if END_DATE:
            self.SetEndDate(*END_DATE)

        self.SetBrokerageModel(BrokerageName.Bitfinex, AccountType.Margin)
        self.UpdateParameters()

        security = self.AddCrypto(TICKER, RESOLUTION, Market.Binance)

        if FEES:
            security.FeeModel = BinanceFeeModel()
        else:
            security.FeeModel = ConstantFeeModel(0.0)

        self.symbolData = SymbolData(self, security.Symbol)
        self.SetWarmUp(TIMEFRAME * max(EMA1_LENGTH*5, EMA2_LENGTH*5, MAMA_LENGTH*5, MAMA_M_LENGTH*2*5))

        if USE_TS:
            self.SetRiskManagement(TrailingStopRiskManagementModel(TS_DISTANCE / 100.0))

    def UpdateParameters(self):
        global USE_EMA, USE_EMA_CROSSOVER, EMA1_LENGTH, EMA2_LENGTH, MAMA_LENGTH, MAMA_EXP, \
                MAMA_M_LENGTH, MAMA_ACCEL_FACTOR, MAMA_P_LENGTH, USE_TS, TS_DISTANCE

        USE_EMA = self.convert_param(bool, "USE_EMA", USE_EMA)
        USE_EMA_CROSSOVER = self.convert_param(bool, "USE_EMA_CROSSOVER", USE_EMA_CROSSOVER)
        EMA1_LENGTH = self.convert_param(int, "EMA1_LENGTH", EMA1_LENGTH)
        EMA2_LENGTH = self.convert_param(int, "EMA2_LENGTH", EMA2_LENGTH)
        MAMA_LENGTH = self.convert_param(int, "MAMA_LENGTH", MAMA_LENGTH)
        MAMA_EXP = self.convert_param(bool, "MAMA_EXP", MAMA_EXP)
        MAMA_M_LENGTH = self.convert_param(int, "MAMA_M_LENGTH", MAMA_M_LENGTH)
        MAMA_ACCEL_FACTOR = self.convert_param(float, "MAMA_ACCEL_FACTOR", MAMA_ACCEL_FACTOR)
        MAMA_P_LENGTH = self.convert_param(int, "MAMA_P_LENGTH", MAMA_P_LENGTH)
        USE_TS = self.convert_param(bool, "USE_TS", USE_TS)
        TS_DISTANCE = self.convert_param(float, "TS_DISTANCE", TS_DISTANCE)

    def convert_param(self, t, name, default):
        if t == bool:
            return bool(float(self.GetParameter(name) or default))
        elif t == int:
            return int(float(self.GetParameter(name) or default))
        elif t == float:
            return float(self.GetParameter(name) or default)


class SymbolData:
    def __init__(self, algo: PF_EMA_MAMA, symbol: Symbol):
        self.algo = algo
        self.symbol = symbol

        # EMAs
        self.ema1 = ExponentialMovingAverage(EMA1_LENGTH)
        self.ema2 = ExponentialMovingAverage(EMA2_LENGTH)
        self.ema1_1 = IndicatorExtensions.Of(Delay(1), self.ema1)
        self.ema2_1 = IndicatorExtensions.Of(Delay(1), self.ema2)

        # MaMA
        self.momentum = Momentum(MAMA_M_LENGTH)
        self.acceleration = IndicatorExtensions.Of(Momentum(MAMA_M_LENGTH), self.momentum)
        self.change = Momentum(1)
        self.probability = SimpleMovingAverage(MAMA_P_LENGTH)
        if MAMA_EXP:
            self.mama = ExponentialMovingAverage(MAMA_LENGTH)
        else:
            self.mama = SimpleMovingAverage(MAMA_LENGTH)

        # consolidator
        consolidator = TradeBarConsolidator(self.CustomConsolidator)
        consolidator.DataConsolidated += self.OnBar
        algo.SubscriptionManager.AddConsolidator(symbol, consolidator)

        # register indicators with consolidator
        algo.RegisterIndicator(symbol, self.ema1, consolidator)
        algo.RegisterIndicator(symbol, self.ema2, consolidator)
        algo.RegisterIndicator(symbol, self.momentum, consolidator)
        algo.RegisterIndicator(symbol, self.change, consolidator)

    def CustomConsolidator(self, dt):
        period = TIMEFRAME
        if period >= timedelta(days=1):
            start = dt.replace(hour=0, minute=0, second=0)
        # elif period == timedelta(hours=1):
        #     start = dt.replace(minute=30)
        #     if start > dt:
        #         start -= period
        else:
            start = dt
        return CalendarInfo(start, period)

    def OnBar(self, _sender, bar: TradeBar):
        # calculate and update MaMA
        if self.change.Current.Value > 0:
            self.probability.Update(bar.EndTime, 1)
        else:
            self.probability.Update(bar.EndTime, 0)
        if MAMA_ACCEL_FACTOR:
            val = (self.momentum.Current.Value + .5 * self.acceleration.Current.Value) * self.probability.Current.Value
        else:
            val = self.momentum.Current.Value * self.probability.Current.Value
        adjustedSource = bar.Close + val
        self.mama.Update(bar.EndTime, adjustedSource)

        if self.algo.IsWarmingUp:
            return

        # plotting
        self.algo.Plot("Indicators", "Price", bar.Close)
        self.algo.Plot("Indicators", "MaMA", self.mama.Current.Value)
        self.algo.Plot("Indicators", "EMA1", self.ema1.Current.Value)
        # self.algo.Plot("Indicators", "EMA1_1", self.ema1_1.Current.Value)
        self.algo.Plot("Indicators", "EMA2", self.ema2.Current.Value)
        # self.algo.Plot("Indicators", "EMA2_1", self.ema2_1.Current.Value)

        # entry
        if self.algo.Portfolio[self.symbol].AbsoluteQuantity < MIN_POS:
            signal = self.GetSignal(bar.Close)
            if signal != 0:
                qty = int(self.algo.CalculateOrderQuantity(self.symbol, signal * POSITION_SIZE_PCT / 100.0))
                if abs(qty) >= MIN_POS:
                    self.algo.MarketOrder(self.symbol, qty)
        # exit
        else:
            self.CheckExit(bar.Close)

    def GetSignal(self, price):
        if not self.mama.IsReady:
            return 0

        ema_signal = 0
        if self.ema2_1.IsReady and self.ema1_1.IsReady:
            if USE_EMA_CROSSOVER:
                if self.ema1_1.Current.Value <= self.ema2_1.Current.Value and self.ema1.Current.Value > self.ema2.Current.Value:
                    ema_signal = 1
                elif self.ema1_1.Current.Value >= self.ema2_1.Current.Value and self.ema1.Current.Value < self.ema2.Current.Value:
                    ema_signal = -1
            else:
                if self.ema1.Current.Value > self.ema2.Current.Value:
                    ema_signal = 1
                elif self.ema1.Current.Value < self.ema2.Current.Value:
                    ema_signal = -1

        mama_long = price > self.mama.Current.Value

        if mama_long and (ema_signal == 1 or not USE_EMA):
            return 1
        elif not mama_long and (ema_signal == -1 or not USE_EMA):
            return -1
        else:
            return 0

    def CheckExit(self, price):
        if not self.mama.IsReady:
            return

        mama_long = price > self.mama.Current.Value
        if USE_EMA and (mama_long and self.ema1.Current.Value < self.ema2.Current.Value):
            self.algo.Liquidate(tag="Exit")
        if USE_EMA and (not mama_long and self.ema1.Current.Value > self.ema2.Current.Value):
            self.algo.Liquidate(tag="Exit")