Overall Statistics
Total Trades
49
Average Win
38.85%
Average Loss
-4.04%
Compounding Annual Return
299.561%
Drawdown
30.900%
Expectancy
3.419
Net Profit
1180.055%
Sharpe Ratio
3.864
Probabilistic Sharpe Ratio
97.167%
Loss Rate
58%
Win Rate
42%
Profit-Loss Ratio
9.61
Alpha
1.026
Beta
0.535
Annual Standard Deviation
0.506
Annual Variance
0.256
Information Ratio
0.455
Tracking Error
0.478
Treynor Ratio
3.657
Total Fees
$47377.96
Estimated Strategy Capacity
$4400000.00
Lowest Capacity Asset
BTCUSD E3
###################################################
#
#  Smart Rolling window
#  ========================
#  Convenience object to build on RollingWindow functionality
#
#  Methods:
#  -------------------------
#  mySmartWindow.IsRising()
#  mySmartWindow.IsFalling()
#  mySmartWindow.crossedAboveValue(value)
#  mySmartWindow.crossedBelowValue(value)
#  mySmartWindow.crossedAbove(otherWindow)
#  mySmartWindow.crossedBelow(otherWindow)
#  mySmartWindow.IsFlat(decimalPrecision)
#  mySmartWindow.hasAtLeastThisMany(value)
#
#
#  Author:ekz
###################################################

class SmartRollingWindow():
    
    def __init__(self, windowType, windowLength):
        self.window    = None
        self.winLength = windowLength

        if (windowType is "int"):self.window = RollingWindow[int](windowLength)
        elif (windowType is "bool"):self.window = RollingWindow[bool](windowLength)
        elif (windowType is "float"):self.window = RollingWindow[float](windowLength)
        elif (windowType is "TradeBar"):self.window = RollingWindow[TradeBar](windowLength)

    def crossedAboveValue(self, value):return (self.window[1] <= value < self.window[0])
    def crossedBelowValue(self, value): return (self.window[1] >= value > self.window[0])

    def crossedAbove(self, series): return (self.window[1] <= series[1] and self.window[0] > series[0])
    def crossedBelow(self, series): return (self.window[1] >= series[1] and self.window[0] < series[0])

    def isFlat(self):    return (self.window[1] == self.window[0])
    def isFalling(self): return (self.window[1] > self.window[0])
    def isRising(self):  return (self.window[1] < self.window[0])

    def Add(self,value): 
        self.window.Add(value)

    def IsReady(self):
        return (self.window is not None) and \
               (self.window.Count >= self.winLength) ## TODO: just use rw.IsReady?
    
    def __getitem__(self, index):
        return self.window[index]
################################################################################
# KalmanFilterIndicator
#
# Core logic from @vladimir's KalmanFilter implementation:
# https://www.quantconnect.com/forum/discussion/12741/kalman-filter-for-bitcoin/p1
#
################################################################################
from pykalman import KalmanFilter
        
class KalmanFilterIndicator(PythonIndicator):  
    def __init__(self,name, period, selector=Field.Close,
                 transition_matrices = [1], observation_matrices = [1], 
                 initial_state_mean = 0, initial_state_covariance = 1, 
                 observation_covariance=1, transition_covariance=.01):
                     
        self.Name     = name
        self.period   = period
        self.Value    = 0
        self.barCalc  = selector
        
        self.transition_matrices      = transition_matrices
        self.observation_matrices     = observation_matrices
        self.initial_state_mean       = initial_state_mean 
        self.initial_state_covariance = initial_state_covariance
        self.observation_covariance   = observation_covariance
        self.transition_covariance    = transition_covariance
        
        self.rollingWindow = RollingWindow[float](self.period)
    
    
    # ---------------------------------    
    def Update(self, inputBar):
        
        effectiveBarValue = self.barCalc(inputBar) 
        self.rollingWindow.Add(effectiveBarValue)         
        
        if(not self.rollingWindow.IsReady):
            return False
        else:

            basisValue = np.flipud(np.array([self.rollingWindow[i] for i in range(self.period)]))
            
            self.kf = KalmanFilter( transition_matrices = self.transition_matrices,
                                    observation_matrices     = self.observation_matrices,
                                    initial_state_mean       = self.initial_state_mean,
                                    initial_state_covariance = self.initial_state_covariance,
                                    observation_covariance   = self.observation_covariance,
                                    transition_covariance    = self.transition_covariance)
            
            kf,_ = self.kf.filter(basisValue)
            currKalman = kf[-1]

            self.Value = float(currKalman)
            return True
        
################################################################################
#
# LaguerreFilterIndicator
# ==============================
# Laguerre Filter as defined by John F. Ehlers in `Cybernetic Analysis for 
# Stock and Futures`, 2004, published by Wiley. `ISBN: 978-0-471-46307-8
# https://www.mt5users.com/wp-content/uploads/2020/01/timewarp.pdf
#
# Copied from @vladimir's implementation
# https://www.quantconnect.com/forum/discussion/11788/another-digital-filter-laguerre-filter/p1/comment-34897
# 
################################################################################
        
class LaguerreFilterIndicator(PythonIndicator):  
    def __init__(self, name, gamma ):
        self.Name = name
        self.gamma = gamma
        self.prices = np.array([])
        self.Value = 0
        self.L0 = 0.0; self.L1 = 0.0; self.L2 = 0.0; self.L3 = 0.0
        
    
    def Update(self, input):
        mp = (input.High + input.Low)/2
        self.prices = np.append(self.prices, mp)[-4:]
        if len(self.prices) <= 1:
            self.L0 = mp; self.L1 = mp; self.L2 = mp; self.L3 = mp;
        
        if len(self.prices) != 4 : return
    
        L01 = self.L0; L11 = self.L1; L21 = self.L2; L31 = self.L3;
        g = self.gamma  
        
        self.L0 = (1 - g)*mp + g*L01
        self.L1 = L01 - g*self.L0 + g*L11
        self.L2 = L11 - g*self.L1 + g*L21
        self.L3 = L21 - g*self.L2 + g*L31
        
        if len(self.prices) != 4 :
            self.Value = mp
            return False
        
        self.Value = (self.L0 + (2*self.L1) + 2*(self.L2) + self.L3) / 6
        return True        
        
##########################################################################################
## Kalman Crossovers
## ----------------------------------------
## An exploration of Kalman Filter for trend entry signals, in combination with EMAs and 
## Laguerre filters, taking positions when crossovers occur. 
##
## Inspired by @vladimir's KalmanFilter and Laguerre implementations:
## https://www.quantconnect.com/forum/discussion/12741/kalman-filter-for-bitcoin/p1
## https://www.quantconnect.com/forum/discussion/11788/another-digital-filter-laguerre-filter/p1/comment-34897
##
## Author:ekz
##########################################################################################

from FilterIndicators import *
from SmartRollingWindow import *
from pykalman import KalmanFilter
import numpy as np

class KalmanCrossovers(QCAlgorithm):

    # Initialize params, assets, indicators
    # ------------------------------------=
    def Initialize(self):
        self.InitAlgoParams()
        self.InitBacktestParams()
        self.InitAssets()
        self.InitIndicators()


    # Set key system parameters. Called from Initialize(). 
    # -----------------------------------------------------
    def InitAlgoParams(self):
        self.ticker       = "BTCUSD" 

        self.lgrGamma     = float(self.GetParameter("lgrGamma"))
        self.emaPeriod    = int(self.GetParameter("emaPeriod"))
        self.kalPeriod    = int(self.GetParameter("kalPeriod"))

        self.entryMethod  =  int(self.GetParameter("entryMethod"))
        self.exitMethod   =  int(self.GetParameter("exitMethod"))     

        self.entrySignalMessage = ""
        self.exitSignalMessage  = ""
        
        
    # Set backtest params: dates, cash, etc. Called from Initialize().
    # ------------------------------------------------------------------
    def InitBacktestParams(self):
        self.initCash = 100000           # todo: use this to track buy+hold     
        self.SetCash(self.initCash)  
        self.SetStartDate(2020, 1, 1)
        self.SetEndDate(2021, 11, 1)
        
        
    # Initialize assets: Symbol, broker, ticker, etc. Called from Initialize(). 
    # ------------------------------------------------------------------------------=
    def InitAssets(self):
        self.SetBrokerageModel(BrokerageName.Bitfinex, AccountType.Margin)
        self.crypto = self.AddCrypto(self.ticker, Resolution.Daily).Symbol   
        self.SetBenchmark(self.ticker)         


    # Initialize indicators. Called from Initialize(). 
    # ------------------------------------------------------
    def InitIndicators(self):

        ## Inquire as to why such a long warmup...
        self.SetWarmUp(5*self.kalPeriod, Resolution.Daily)  

        self.ema        = self.EMA(self.crypto, self.emaPeriod, Resolution.Daily)
        self.kalFilter  = KalmanFilterIndicator(name='Kalman',period=self.kalPeriod, selector=Field.Low)
        self.lgrFilter  = LaguerreFilterIndicator('Laguerre', self.lgrGamma)
        
        self.RegisterIndicator(self.crypto, self.kalFilter,  Resolution.Daily)
        self.RegisterIndicator(self.crypto, self.lgrFilter,  Resolution.Daily)

        ## Using a consolidator to schedule entries/exists, in case 
        ## we want to use a custom time frame (otherwise we'd use onData)
        self.Consolidate(self.crypto, Resolution.Daily, self.OnConsolidatedBarClose)
        
        ## Initialize rolling windows. We'll use these to track filter values
        self.kalWindow   = SmartRollingWindow("float", 2)
        self.emaWindow   = SmartRollingWindow("float", 2)
        self.lgrWindow   = SmartRollingWindow("float", 2)
        self.priceWindow = SmartRollingWindow("float", 2)
        

    # Called after every bar close on our primary time frame (eg 30M, 1H, 4H, 1D).
    # ----------------------------------------------------------------------------
    def OnConsolidatedBarClose(self, bar):
        
        ## Update rolling windows, we'lll use these to check for crossovers
        self.UpdateRollingWindows()

        ## If we're done warming up and indicators are ready,
        ## check for entry/exit signals and act accordingly
        if (not self.IsWarmingUp and self.IndicatorsAreReady()):

            if(self.EntrySignalFired()):
                self.SetHoldings(self.crypto, 1, tag=self.entrySignalMessage)
                self.entrySignalMessage = ""
                
            elif(self.ExitSignalFired()):
                self.Liquidate(tag=self.exitSignalMessage)
                self.exitSignalMessage = ""

        self.PlotCharts()

                
    # If exit criteria is met, then set self.entrySignalMessage and return True
    # -------------------------------------------------------------------------
    def EntrySignalFired(self):
    
        if( not self.Portfolio.Invested):
    

            ## Entry 1: EMA crossed above Kalman
            if( self.entryMethod == 1) and ( self.emaWindow.crossedAbove(self.kalWindow) ):
                self.entrySignalMessage = "ENTRY: EMA x-above Kalman"
                return True

            ## Entry 2: Laguerre crossed above Kalman
            elif( self.entryMethod == 2) and ( self.lgrWindow.crossedAbove(self.kalWindow) ):
                self.entrySignalMessage = "ENTRY: Laguerre x-above Kalman"
                return True
    
            ## Entry 3: Price crossed above Kalman
            elif( self.entryMethod == 3) and ( self.priceWindow.crossedAbove(self.kalWindow) ):
                self.entrySignalMessage = "ENTRY: Price x-above Kalman"
                return True

            ## Entry 4: Price is above Kalman
            elif( self.entryMethod == 4) and ( self.priceWindow[0] > self.kalWindow[0]):
                self.entrySignalMessage = "ENTRY: Price is above Kalman"
                return True

        return False        
                
                
    # If exit criteria is met, then set self.exitSignalMessage and return True
    # ------------------------------------------------------------------------
    def ExitSignalFired(self):
        if( self.Portfolio.Invested): 
            
            profitpct = round(self.Securities[self.crypto].Holdings.UnrealizedProfitPercent,4)
            winlossStr = 'WIN' if (profitpct > 0) else 'LOSS'
            winlossStr = str(profitpct) +"% " + winlossStr             
            
            ## Exit 1: EMA crossed under Kalman
            if( self.exitMethod == 1 ) and ( self.emaWindow.crossedBelow(self.kalWindow) ):
                self.exitSignalMessage = f"EXIT: Ema x-under Kalman @ {winlossStr}" 
                return True
            
            ## Exit 2: Laguerre crossed under Kalman
            elif( self.exitMethod == 2 ) and ( self.lgrWindow.crossedBelow(self.kalWindow) ):
                self.exitSignalMessage = f"EXIT: Laguerre x-under Kal @ {winlossStr}" 
                return True

            ## Exit 3: Price crossed under Kalman
            elif( self.exitMethod == 3 ) and ( self.priceWindow.crossedBelow(self.kalWindow) ):
                self.exitSignalMessage = f"EXIT: Price x-under Kalman @ {winlossStr}" 
                return True
            
            ## Exit 4: Price is below Kalman
            elif( self.exitMethod == 4 ) and ( self.priceWindow[0] < self.kalWindow[0] ):
                self.exitSignalMessage = f"EXIT: Price is under Kalman @ {winlossStr}" 
                return True
            
    

    # Update rolling windows (willl need to check for crossovers etc)
    # ---------------------------------------------------------------
    def UpdateRollingWindows(self):
        self.kalWindow.Add(self.kalFilter.Value)
        self.emaWindow.Add(self.ema.Current.Value)
        self.lgrWindow.Add(self.lgrFilter.Value)
        self.priceWindow.Add(self.Securities[self.crypto].Price)

    # Check if indicators are ready
    # ----------------------------------------
    def IndicatorsAreReady(self):
        return self.kalFilter.IsReady and self.ema.IsReady and \
               self.lgrFilter.IsReady 
    
    # Plot Charts
    # ----------------------------------------
    def PlotCharts(self):    
        self.Plot("charts", "Price", self.Securities[self.crypto].Price)        
        self.Plot("charts", "Kalman", self.kalFilter.Value)
        self.Plot("charts", "EMA", self.ema.Current.Value)
        self.Plot("charts", "Laguerre", float(self.lgrFilter.Value))