Overall Statistics
Total Trades
21
Average Win
119.95%
Average Loss
-9.55%
Compounding Annual Return
281.711%
Drawdown
50.300%
Expectancy
7.133
Net Profit
4375.455%
Sharpe Ratio
3.58
Probabilistic Sharpe Ratio
98.227%
Loss Rate
40%
Win Rate
60%
Profit-Loss Ratio
12.56
Alpha
1.087
Beta
0.596
Annual Standard Deviation
0.53
Annual Variance
0.281
Information Ratio
1.191
Tracking Error
0.453
Treynor Ratio
3.18
Total Fees
$4066.07
Estimated Strategy Capacity
$12000000.00
Lowest Capacity Asset
BTCUSD E3
###################################################
#
#  Smart Rolling window
#  ========================
#  Convenience object to build on RollingWindow functionality
#
#  Methods:
#  -------------------------
#  mySmartWindow.IsRising()
#  mySmartWindow.IsFalling()
#  mySmartWindow.crossedAboveValue(value)
#  mySmartWindow.crossedBelowValue(value)
#  mySmartWindow.crossedAbove(otherWindow)
#  mySmartWindow.crossedBelow(otherWindow)
#  mySmartWindow.IsFlat(decimalPrecision)
#  mySmartWindow.hasAtLeastThisMany(value)
#
#
#  Author:ekz
###################################################

class SmartRollingWindow():
    
    def __init__(self, windowType, windowLength):
        self.window    = None
        self.winLength = windowLength

        if (windowType is "int"):self.window = RollingWindow[int](windowLength)
        elif (windowType is "bool"):self.window = RollingWindow[bool](windowLength)
        elif (windowType is "float"):self.window = RollingWindow[float](windowLength)
        elif (windowType is "TradeBar"):self.window = RollingWindow[TradeBar](windowLength)

    def crossedAboveValue(self, value):return (self.window[1] <= value < self.window[0])
    def crossedBelowValue(self, value): return (self.window[1] >= value > self.window[0])

    def crossedAbove(self, series): return (self.window[1] <= series[1] and self.window[0] > series[0])
    def crossedBelow(self, series): return (self.window[1] >= series[1] and self.window[0] < series[0])

    def isFlat(self):    return (self.window[1] == self.window[0])
    def isFalling(self): return (self.window[1] > self.window[0])
    def isRising(self):  return (self.window[1] < self.window[0])

    def Add(self,value): 
        self.window.Add(value)

    def IsReady(self):
        return (self.window is not None) and \
               (self.window.Count >= self.winLength) ## TODO: just use rw.IsReady?
    
    def __getitem__(self, index):
        return self.window[index]
################################################################################
# KalmanFilterIndicator
#
# Core logic from @vladimir's KalmanFilter implementation:
# https://www.quantconnect.com/forum/discussion/12741/kalman-filter-for-bitcoin/p1
#
################################################################################
from pykalman import KalmanFilter
        
class KalmanFilterIndicator(PythonIndicator):  
    def __init__(self,name, period, transition_matrices = [1], 
                 observation_matrices = [1], initial_state_mean = 0, 
                 initial_state_covariance = 1, observation_covariance=1, 
                 transition_covariance=.01):
                     
        self.Name   = name
        self.period = period
        self.Value  = 0

        self.transition_matrices      = transition_matrices
        self.observation_matrices     = observation_matrices
        self.initial_state_mean       = initial_state_mean 
        self.initial_state_covariance = initial_state_covariance
        self.observation_covariance   = observation_covariance
        self.transition_covariance    = transition_covariance
        
        self.rollingWindow = RollingWindow[TradeBar](self.period)
    
    
    # ---------------------------------    
    def Update(self, input):

        self.rollingWindow.Add(input)         

        if(not self.rollingWindow.IsReady):
            return False
        else:

            C = np.flipud(np.array([self.rollingWindow[i].Close for i in range(self.period)]))
            
            self.kf = KalmanFilter(transition_matrices = self.transition_matrices,
                     observation_matrices     = self.observation_matrices,
                     initial_state_mean       = self.initial_state_mean,
                     initial_state_covariance = self.initial_state_covariance,
                     observation_covariance   = self.observation_covariance,
                     transition_covariance    = self.transition_covariance)
            
            kf,_ = self.kf.filter(C)
            currKalman = kf[-1]

            self.Value = float(currKalman)
            return True
        
################################################################################
#
# LaguerreFilterIndicator
# ==============================
# Laguerre Filter as defined by John F. Ehlers in `Cybernetic Analysis for 
# Stock and Futures`, 2004, published by Wiley. `ISBN: 978-0-471-46307-8
# https://www.mt5users.com/wp-content/uploads/2020/01/timewarp.pdf
#
# Copied from @vladimir's implementation
# https://www.quantconnect.com/forum/discussion/11788/another-digital-filter-laguerre-filter/p1/comment-34897
# 
################################################################################
        
class LaguerreFilterIndicator(PythonIndicator):  
    def __init__(self, name, gamma ):
        self.Name = name
        self.gamma = gamma
        self.prices = np.array([])
        self.Value = 0
        self.L0 = 0.0; self.L1 = 0.0; self.L2 = 0.0; self.L3 = 0.0
        
    
    def Update(self, input):
        mp = (input.High + input.Low)/2
        self.prices = np.append(self.prices, mp)[-4:]
        if len(self.prices) <= 1:
            self.L0 = mp; self.L1 = mp; self.L2 = mp; self.L3 = mp;
        
        if len(self.prices) != 4 : return
    
        L01 = self.L0; L11 = self.L1; L21 = self.L2; L31 = self.L3;
        g = self.gamma  
        
        self.L0 = (1 - g)*mp + g*L01
        self.L1 = L01 - g*self.L0 + g*L11
        self.L2 = L11 - g*self.L1 + g*L21
        self.L3 = L21 - g*self.L2 + g*L31
        
        if len(self.prices) != 4 :
            self.Value = mp
            return False
        
        self.Value = (self.L0 + (2*self.L1) + 2*(self.L2) + self.L3) / 6
        return True        
        
##########################################################################################
## Kalman Crossovers
## ----------------------------------------
## An exploration of Kalman Filter for trend entry signals, in combination with EMAs and 
## Laguerre filters, taking positions when crossovers occur. 
##
## Inspired by @vladimir's KalmanFilter and Laguerre implementations:
## https://www.quantconnect.com/forum/discussion/12741/kalman-filter-for-bitcoin/p1
## https://www.quantconnect.com/forum/discussion/11788/another-digital-filter-laguerre-filter/p1/comment-34897
##
## Author:ekz
##########################################################################################

from FilterIndicators import *
from SmartRollingWindow import *
from pykalman import KalmanFilter
import numpy as np

class KalmanCrossovers(QCAlgorithm):

    # Initialize params, assets, indicators
    # ------------------------------------=
    def Initialize(self):
        self.InitAlgoParams()
        self.InitBacktestParams()
        self.InitAssets()
        self.InitIndicators()


    # Set key system parameters. Called from Initialize(). 
    # -----------------------------------------------------
    def InitAlgoParams(self):
        self.ticker    = "BTCUSD" 

        self.lgrGamma  = float(self.GetParameter("lgrGamma"))
        self.emaPeriod = int(self.GetParameter("emaPeriod"))
        self.kalPeriod = int(self.GetParameter("kalPeriod"))

        self.entrySignalMessage = ""
        self.exitSignalMessage  = ""
        
        
    # Set backtest params: dates, cash, etc. Called from Initialize().
    # ------------------------------------------------------------------
    def InitBacktestParams(self):
        self.initCash = 10000           # todo: use this to track buy+hold     
        self.SetCash(self.initCash)  
        self.SetStartDate(2019, 1, 1)
        self.SetEndDate(2021, 10, 31)
        
        
    # Initialize assets: Symbol, broker, ticker, etc. Called from Initialize(). 
    # ------------------------------------------------------------------------------=
    def InitAssets(self):
        self.SetBrokerageModel(BrokerageName.Bitfinex, AccountType.Margin)
        self.crypto = self.AddCrypto(self.ticker, Resolution.Daily).Symbol   
        self.SetBenchmark(self.ticker)         


    # Initialize indicators. Called from Initialize(). 
    # ------------------------------------------------------
    def InitIndicators(self):

        ## Inquire as to why such a long warmup...
        self.SetWarmUp(5*self.kalPeriod, Resolution.Daily)  

        self.ema       = self.EMA(self.crypto, self.emaPeriod, Resolution.Daily)
        self.kalFilter = KalmanFilterIndicator('Kalman',self.kalPeriod )
        self.lgrFilter = LaguerreFilterIndicator('Laguerre', self.lgrGamma)
        
        self.RegisterIndicator(self.crypto, self.kalFilter, Resolution.Daily)
        self.RegisterIndicator(self.crypto, self.lgrFilter, Resolution.Daily)

        ## Using a consolidator to schedule entries/exists, in case 
        ## we want to use a custom time frame (otherwise we'd use onData)
        self.Consolidate(self.crypto, Resolution.Daily, self.OnConsolidatedBarClose)
        
        ## Initialize rolling windows. We'll use these to track filter values
        self.kalWindow   = SmartRollingWindow("float", 2)
        self.emaWindow   = SmartRollingWindow("float", 2)
        self.lgrWindow   = SmartRollingWindow("float", 2)
        self.priceWindow = SmartRollingWindow("float", 2)
        


    # Called after every bar close on our primary time frame (eg 30M, 1H, 4H, 1D).
    # ----------------------------------------------------------------------------
    def OnConsolidatedBarClose(self, bar):

        ## Update rolling windows, we'lll use these to check for crossovers
        self.UpdateRollingWindows()
        
        ## If we're done warming up and indicators are ready,
        ## check for entry/exit signals and act accordingly
        if not self.IsWarmingUp and self.IndicatorsAreReady():

            if(self.EntrySignalFired()):
                self.SetHoldings(self.crypto, 1, tag=self.entrySignalMessage)
                self.entrySignalMessage = ""
                
            elif(self.ExitSignalFired()):
                self.Liquidate(tag=self.exitSignalMessage)
                self.exitSignalMessage = ""

            self.PlotCharts()

                
    # If exit criteria is met, then set self.entrySignalMessage and return True
    # -------------------------------------------------------------------------
    def EntrySignalFired(self):
    
        if( not self.Portfolio.Invested):
    
            ## Entry 1: Laguerre crossed above Kalman
            if( self.lgrWindow.crossedAbove(self.kalWindow) ):
                self.entrySignalMessage = "ENTRY: Laguerre x-above Kalman"
                return True
    
            ## Entry 2: EMA crossed above Kalman
            ## if( self.emaWindow.crossedAbove(self.kalWindow) ):
            ##     self.entrySignalMessage = "ENTRY: EMA x-above Kalman"
            ##     return True

        return False        
                
                
    # If exit criteria is met, then set self.exitSignalMessage and return True
    # ------------------------------------------------------------------------
    def ExitSignalFired(self):
        if( self.Portfolio.Invested): 
            
            profitpct = round(self.Securities[self.crypto].Holdings.UnrealizedProfitPercent,4)
            winlossStr = 'WIN' if (profitpct > 0) else 'LOSS'
            winlossStr = str(profitpct) +"% " + winlossStr             
            
            ## Exit 1: EMA crossed under Kamal
            if( self.emaWindow.crossedBelow(self.kalWindow) ):
                self.exitSignalMessage = f"EXIT: Ema x-under Kalman @ {winlossStr}" 
                return True
            
            ## Exit 2: Laguerre crossed under Kamal
            ## if( self.lgrWindow.crossedBelow(self.kalWindow) ):
            ##     self.exitSignalMessage = f"EXIT: Laguerre x-under Kal @ {winlossStr}" 
            ##     return True
    

    # Update rolling windows (willl need to check for crossovers etc)
    # ---------------------------------------------------------------
    def UpdateRollingWindows(self):
        self.kalWindow.Add(self.kalFilter.Value)
        self.emaWindow.Add(self.ema.Current.Value)
        self.lgrWindow.Add(self.lgrFilter.Value)
        self.priceWindow.Add(self.Securities[self.crypto].Price)

    # Check if indicators are ready
    # ----------------------------------------
    def IndicatorsAreReady(self):
        return self.kalFilter.IsReady and self.ema.IsReady and \
               self.lgrFilter.IsReady 
    
    # Plot Charts
    # ----------------------------------------
    def PlotCharts(self):    
        self.Plot("charts", "Price", self.Securities[self.crypto].Price)        
        self.Plot("charts", "Kalman", self.kalFilter.Value)
        self.Plot("charts", "EMA", self.ema.Current.Value)
        self.Plot("charts", "Laguerre", float(self.lgrFilter.Value))