Overall Statistics
Total Trades
92
Average Win
0.38%
Average Loss
-0.29%
Compounding Annual Return
7.974%
Drawdown
3.000%
Expectancy
0.519
Net Profit
1.974%
Sharpe Ratio
1.021
Probabilistic Sharpe Ratio
49.847%
Loss Rate
33%
Win Rate
67%
Profit-Loss Ratio
1.28
Alpha
0
Beta
0
Annual Standard Deviation
0.055
Annual Variance
0.003
Information Ratio
1.021
Tracking Error
0.055
Treynor Ratio
0
Total Fees
$891.10
Estimated Strategy Capacity
$1200000.00
Lowest Capacity Asset
MRKT VRITCWQ0X55X
from AlgorithmImports import *
from settings import *
from collections import namedtuple
import operator
import functools



class ExpansionBreakoutStrategy(QCAlgorithm):

    def Initialize(self):
        
        settings = self.GetSettings()
        
        self.SetStartDate(settings.startDate)  
        self.SetEndDate(settings.endDate)
        self.SetCash(settings.startCash)  
        
        self.SetBrokerageModel(BrokerageName.InteractiveBrokersBrokerage, AccountType.Margin)
        self.DefaultOrderProperties = InteractiveBrokersOrderProperties()
        self.DefaultOrderProperties.TimeInForce = TimeInForce.GoodTilCanceled
        
        self.selectionDataDict = {}
        self.UpSymbols = self.DownSymbols = []
        self.day = self.month = None
        self.coarseSymbols = []
        self.symbolDataDict = {}
        self.HasOpenLongPositions = self.HasOpenShortPositions = False
        self.NumberOfLongPositions = self.NumberOfShortPositions = 0
        self.maxLongPositions = settings.numberOfLongPositions
        self.maxShortPositions = settings.numberOfShortPositions
        self.numberOfSymbolsCoarse = settings.numberOfStocksToScan
        self.maximumPricePerShare = settings.maximumPricePerShare
        self.minimumPricePerShare = settings.minimumPricePerShare
        self.trailingStopDistPct = 0
        self.StopActivateProfit = 0
        
        if self.GetParameters().ContainsKey("StopActivateProfit"):
            param = self.GetParameter("StopActivateProfit")
            if param is not None and (isinstance(param, float) or isinstance(param, int) or isinstance(param, str)):
                self.StopActivateProfit = float(param)
        if self.StopActivateProfit == 0:
            self.StopActivateProfit = settings.StopActivateProfit                

        if self.GetParameters().ContainsKey("trailingStopDistPct"):
            param = self.GetParameter("trailingStopDistPct")
            if param is not None and (isinstance(param, float) or isinstance(param, int) or isinstance(param, str)):
                self.trailingStopDistPct = float(param)
        if self.trailingStopDistPct == 0:
            self.trailingStopDistPct = settings.trailingStopDistPct
            
        self.initialStopDistancePoints = settings.initialStopDistPoints
        self.weight = min(settings.portfolioWeightPerStock, 1/(self.maxLongPositions + self.maxShortPositions))
        self.UniverseSettings.FillForward = True
        self.AddUniverse(self.SelectCoarse)
        self.tickerToPlot = settings.tickerToPlot
        
    
    def GetSettings(self):
        paramNames = ['startDate', 'endDate', 'startCash', 'numberOfStocksToScan', 'minimumPricePerShare', 'maximumPricePerShare', 'numberOfLongPositions', 'numberOfShortPositions', 
                        'initialStopDistPoints', 'trailingStopDistPct', 'portfolioWeightPerStock', 'StopActivateProfit', 'tickerToPlot']
        paramValues = [datetime(*startDate), datetime(*endDate), startCash, numberOfStocksToScan, minimumPricePerShare, maximumPricePerShare, numberOfLongPositions, numberOfShortPositions, 
                        initialStopDistPoints, trailingStopDistPct, portfolioWeightPerStock, StopActivateProfit, tickerToPlot]
        paramSettings = namedtuple('Settings', paramNames)
        settings = paramSettings(*[*map(operator.itemgetter(1), zip(paramNames, paramValues))])
        return settings
        



    def OnData(self, data):
        
        for symbol, symbolData in self.symbolDataDict.items():
            if symbol not in data.QuoteBars:
                continue
            if symbolData.Holdings.Invested:
                if symbolData.ExitConditionMet:
                    self.Liquidate(symbol)
                else:
                    if not symbolData.TrailingStopFlag:
                        if symbolData.Holdings.UnrealizedProfitPercent < self.StopActivateProfit:
                            symbolData.trailingStop.Reset()
                        else:
                            symbolData.TrailingStopFlag = True
                    continue
            else:
                if symbolData.Flag and symbolData.Signal != 0:
                    direction = sign(symbolData.Signal)
                    if (direction > 0 and self.NumberOfLongPositions >= self.maxLongPositions) or (direction < 0 and self.NumberOfShortPositions >= self.maxShortPositions): 
                        continue
                    # we have a signal for this symbol, let's place a stop entry order
                    orderQuantity = self.CalculateOrderQuantity(symbol, direction * self.weight)
                    if orderQuantity == 0:
                        continue
                    stopEntryPrice = symbolData.CurrentHigh.Current.Value if direction > 0 else symbolData.CurrentLow.Current.Value
                    stopEntryPrice += direction*1/8
                    orderProperties = OrderProperties()
                    orderProperties.TimeInForce = TimeInForce.GoodTilDate(self.Time + timedelta(days=1))
                    self.LimitOrder(symbol, orderQuantity, stopEntryPrice, orderProperties = orderProperties)
                    symbolData.Flag = False
                    
                else:
                    continue
          
          
          

    def OnOrderEvent(self, orderEvent):
        if orderEvent.Status == OrderStatus.Filled:
            symbol = orderEvent.Symbol
            holdings = self.symbolDataDict[symbol].Holdings
            
            if orderEvent.FillQuantity * holdings.Quantity > 0:
                # we have entered a position, let's place the stop market order
                openOrders = self.Transactions.GetOpenOrders(symbol)
                if len(openOrders) == 0:
                    direction = sign(orderEvent.FillQuantity)
                    orderProperties = OrderProperties()
                    orderProperties.TimeInForce = TimeInForce.GoodTilCanceled
                    cmp = operator.gt if direction > 0 else operator.lt
                    if cmp(orderEvent.FillPrice, self.symbolDataDict[symbol].PreviousClose):
                        stopPrice = self.symbolDataDict[symbol].StopPrice
                        self.StopMarketOrder(symbol, -orderEvent.FillQuantity, stopPrice, orderProperties = orderProperties)
                    self.symbolDataDict[symbol].trailingStop.Reset()
                    
            if holdings.Quantity == 0:
                # liquidated, let's cancel all open orders for this symbol
                self.Transactions.CancelOpenOrders(symbol)
                
            self.UpdatePortfolioState()
            
            
    def UpdatePortfolioState(self):
        numberOfLongPositions = numberOfShortPositions = 0
        for symbol, holding in self.Portfolio.items():
            if holding.IsLong:
                numberOfLongPositions += 1
            elif holding.IsShort:
                numberOfShortPositions += 1
            else:
                continue
            
        self.NumberOfLongPositions  = numberOfLongPositions
        self.NumberOfShortPositions = numberOfShortPositions
        self.HasOpenLongPositions   = numberOfLongPositions  > 0
        self.HasOpenShortPositions  = numberOfShortPositions > 0
            
                
    
    @property
    def CurrentHoldings(self):
        return [(symbol.Value, holding.Quantity) for symbol,holding in self.Portfolio.items() if holding.Invested]
        
    
    
    def OnSecuritiesChanged(self, changes):
        for security in changes.RemovedSecurities:
            symbol = security.Symbol
            symbolData = self.symbolDataDict.pop(symbol, None)
            if symbolData is not None:
                symbolData.Dispose(self)
        for security in changes.AddedSecurities:
            symbol = security.Symbol
            self.symbolDataDict[symbol] = SymbolData(self, security, self.selectionDataDict[symbol], self.trailingStopDistPct, self.initialStopDistancePoints, self.tickerToPlot)
            
    


    def SelectCoarse(self, coarse):
        selection = []
        upSignals = []
        downSignals = []
        symbols = self.coarseSymbols
        if self.month != self.Time.month:
            filteredCoarse = [c for c in coarse if ( self.minimumPricePerShare < c.Price < self.maximumPricePerShare 
                                                        and c.Volume > 1e5 and c.HasFundamentalData 
                                                        and (self.Time - c.Symbol.ID.Date).days > 42)]
            symbols = [c.Symbol for c in sorted(filteredCoarse, key=lambda c: c.DollarVolume, reverse=True)[:self.numberOfSymbolsCoarse]]
            self.coarseSymbols = symbols
            self.month = self.Time.month
        
        lastBar = self.History(symbols, 1, Resolution.Daily)
        
        for symbol in symbols:
            if symbol.ID.ToString() not in lastBar.index.levels[0]:
                continue
            if symbol not in self.selectionDataDict:
                self.selectionDataDict[symbol] = SelectionData(self, symbol)
            bar = lastBar.loc[symbol]
            if bar.shape != (1,5) or bar.isnull().any().any():
                continue
            tradeBar = TradeBar(bar.index[0], symbol, bar.open[0], bar.high[0], bar.low[0], bar.close[0], bar.volume[0])
            selectionData = self.selectionDataDict[symbol]
            selectionData.Update(tradeBar)
            if selectionData.Signal == 1:
                upSignals.append(symbol)
            elif selectionData.Signal == -1:
                downSignals.append(symbol)
        
        if len(upSignals) > 0 and self.NumberOfLongPositions < self.maxLongPositions:
            self.UpSymbols = sorted(upSignals, key = lambda x: self.selectionDataDict[x].relativeVolume.Current.Value)[:self.maxLongPositions - self.NumberOfLongPositions]
            selection.extend(self.UpSymbols)
        
        if len(downSignals) > 0 and self.NumberOfShortPositions < self.maxShortPositions:
            self.DownSymbols = sorted(downSignals, key = lambda x: self.selectionDataDict[x].relativeVolume.Current.Value)[:self.maxShortPositions - self.NumberOfShortPositions]
            selection.extend(self.DownSymbols)
        
        symbolsWithOpenOrders = [symbol for symbol, symbolData in self.symbolDataDict.items() if len(self.Transactions.GetOpenOrders(symbol)) > 0]
        currentHoldings = [symbol for symbol,holding in self.Portfolio.items() if holding.Invested]
        selection.extend(symbolsWithOpenOrders)
        selection.extend(currentHoldings)
        return np.unique(selection).tolist()
            
            
            
class SymbolData:
    
    def __init__(self, algorithm, security, selectionData, trailingStopDistPct, initialStopDistPoints, tickerToPlot):
        self.algorithm = algorithm
        self.Security = security
        self.Symbol = security.Symbol
        self.Holdings = security.Holdings
        self.selectionData = selectionData
        self.Signal = selectionData.Signal
        previousClose = selectionData.expansionBreakoutIndicator.Close
        self.StopPrice = previousClose - self.Signal*initialStopDistPoints
        self.PreviousClose = previousClose
        previousHigh = selectionData.expansionBreakoutIndicator.High
        previousLow = selectionData.expansionBreakoutIndicator.Low
        self.CurrentHigh = algorithm.Identity(self.Symbol, Resolution.Daily, Field.High)
        self.CurrentLow = algorithm.Identity(self.Symbol, Resolution.Daily, Field.Low)
        self.PreviousHigh = IndicatorExtensions.Of(Delay(1), self.CurrentHigh)
        self.PreviousLow = IndicatorExtensions.Of(Delay(1), self.CurrentLow)
        self.Consolidators = [algorithm.ResolveConsolidator(self.Symbol, resolution) for resolution in [Resolution.Minute, Resolution.Daily]]
        self.Flag = True
        self.day = None
        self.CurrentHigh.Update(algorithm.Time, previousHigh)
        self.CurrentLow.Update(algorithm.Time, previousLow)
        self.TrailingStopFlag = False
        self.trailingStop = TrailingStop(trailingStopDistPct, self.Signal)
        algorithm.RegisterIndicator(self.Symbol, self.trailingStop, Resolution.Minute)
        algorithm.WarmUpIndicator(self.Symbol, self.trailingStop, Resolution.Minute)
        self.scheduledEvents = []
        if tickerToPlot == self.Symbol.Value:
            self.InitCharts()
            self.scheduledEvents.append(algorithm.Schedule.On(algorithm.DateRules.EveryDay(self.Symbol), algorithm.TimeRules.Every(timedelta(minutes=5)), self.UpdateCharts))
           
            
        
      
    @property
    def IsReady(self):
        return self.trailingStop.IsReady and self.CurrentHigh.IsReady and self.CurrentLow.IsReady
    
    
    @property
    def ExitConditionMet(self):
        if self.trailingStop.Triggered:
            return True
        return False
        
        
    def Dispose(self, algorithm):
        for consolidator in self.Consolidators:
            algorithm.SubscriptionManager.RemoveConsolidator(self.Symbol, consolidator)
        if len(self.scheduledEvents) > 0:
            for scheduledEvent in self.scheduledEvents:
                scheduledEvent = None
    
   
    def InitCharts(self):
        chart = Chart(self.Symbol.Value, ChartType.Stacked)
        chart.AddSeries(Series('Price', SeriesType.Scatter, 0, "$"))
        # chart.AddSeries(Series('Price (Low)', SeriesType.Line, 0, "$"))
        chart.AddSeries(Series('Trailing Stop', SeriesType.Scatter, 0, "$"))
        chart.AddSeries(Series('Portfolio Exposure', SeriesType.Scatter, 1, "%"))
        self.algorithm.AddChart(chart)
        
    
    
    def UpdateCharts(self):
        if self.Symbol not in self.algorithm.CurrentSlice.Bars:
            return
        if not self.algorithm.IsMarketOpen(self.Symbol):
            return
        tradeBar = self.algorithm.CurrentSlice.Bars[self.Symbol]
        self.algorithm.Plot(self.Symbol.Value, 'Price', tradeBar.Close)
        self.algorithm.Plot(self.Symbol.Value, 'Portfolio Exposure', self.algorithm.Portfolio[self.Symbol].HoldingsValue/self.algorithm.Portfolio.TotalPortfolioValue*100)
        if not self.IsReady or self.trailingStop.Value == 0:
            return
        self.algorithm.Plot(self.Symbol.Value, 'Trailing Stop', self.trailingStop.Value)
        
        


class SelectionData:
    
    def __init__(self, algorithm, symbol):
        self.algorithm = algorithm
        self.Symbol = symbol
        self.expansionBreakoutIndicator = ExpansionBreakoutIndicator()
        algorithm.WarmUpIndicator(self.Symbol, self.expansionBreakoutIndicator, Resolution.Daily)
        self.relativeVolume = RelativeDailyVolume()
        algorithm.WarmUpIndicator(self.Symbol, self.relativeVolume, Resolution.Daily)
        
      
    
    def Update(self, tradeBar):
        self.expansionBreakoutIndicator.Update(tradeBar)
        self.relativeVolume.Update(tradeBar)
        
        
        
    @property
    def Signal(self):
        if not self.IsReady:
            return 0
        return self.expansionBreakoutIndicator.Signal
    
    @property
    def Volume(self):
        return self.expansionBreakoutIndicator.Volume


    @property
    def IsReady(self):
        return self.expansionBreakoutIndicator.IsReady




class ExpansionBreakoutIndicator(PythonIndicator):
    
    def __init__(self):
        self.Time = datetime.min
        self.Value = 0
        self.Close = 0
        self.High = 0
        self.Low = 0
        self.Volume = 0
        self.rollingHigh = Maximum(42)
        self.rollingLow = Minimum(42)
        self.dailyRange = 0
        self.largestDailyRange = Maximum(9)
        self.WarmUpPeriod = 42
        self.Signal = 0
        self.previousHigh = 0
        self.previousLow = 0
        self.previousClose = 0
        
    
    def Update(self, data):
        self.Signal = 0
        self.Time = data.Time
        self.previousClose = self.Close
        self.Close = data.Close
        self.High = data.High
        self.Low = data.Low
        self.Volume = data.Volume
        self.dailyRange = data.High - data.Low
        
        if self.dailyRange >= self.largestDailyRange.Current.Value:
            if self.Close > self.rollingHigh.Current.Value:
                self.Signal = 1
            elif self.Close < self.rollingLow.Current.Value:
                self.Signal = -1
        
        self.previousHigh = self.rollingHigh.Current.Value
        self.previousLow = self.rollingLow.Current.Value
        self.rollingHigh.Update(data.Time, data.High)
        self.rollingLow.Update(data.Time, data.Low)
        self.largestDailyRange.Update(data.Time, self.dailyRange)
    
    
    @property
    def IsReady(self):
        return self.rollingHigh.IsReady and self.Time > datetime.min
        



def sign(x):
    if x == 0:
        return x
    if x > 0:
        return 1
    if x < 0:
        return -1
        
        
        


    
        
class TrailingStop(PythonIndicator):
    
    def __init__(self, pct_dist = 0.1, direction = 1):
        self.pctDist = pct_dist
        self.direction = direction
        self.prevClose = 0
        self.Flag = False
        self.Value = 0
        self.Time = datetime.min
        self.WarmUpPeriod = 1
        
    
    def Update(self, data):
        self.Time = data.Time
        if self.Value*self.prevClose == 0:
            self.prevClose = data.Close
            self.Value = self.prevClose*(1 - self.direction*self.pctDist)
            return
        
        if self.direction == 1:
            if data.Close > self.prevClose:
                self.Value = data.Close*(1 - self.pctDist)
                self.prevClose = data.Close
            if data.Close < self.Value:
                self.Flag = True
                
        elif self.direction == -1:
            if data.Close < self.prevClose:
                self.Value = data.Close*(1 + self.pctDist)
                self.prevClose = data.Close
            if data.Close > self.Value:
                self.Flag = True
            
    
    @property
    def IsReady(self):
        return self.prevClose * self.Value != 0
        
    
    def Reset(self):
        self.Value = 0
        self.Time = datetime.min
        self.prevClose = 0
        self.Flag = False

        
    @property
    def Triggered(self):
        return self.Flag
'''


----------
startDate
----------
The start date of the backtest in the format (YYYY, MM, DD).


--------
endDate
--------
The end date of the backtest in the format (YYYY, MM, DD).
You can also set the endDate as a date in the future. 
By doing so the backtest will run to the most recent day which is usually yesterday. 


---------------------
numberOfStocksToScan
---------------------
The number of stocks you want to scan for signals. 
Let you control the speed of the backtest. 
A number significantly larger than 1,000 will slow down the backtest dramatically without significant impact. 
I recommend to choose a number between 100 and 1,000. 


---------------------
minimumPricePerShare
---------------------
The minimum price per share for the universe selection method. 
Example: minimumPricePerShare = 10 will ignore all stocks with a share price below 10 on that day. 


---------------------
maximumPricePerShare
---------------------
The maximum price per share for the universe selection method. 
See also minimumPricePerShare above. 


---------------------
numberOfLongPositions
---------------------
The maximum number of long positions held at the same time.


----------------------
numberOfShortPositions
----------------------
Analogous to above, the maximum number of short positions held at the same time. 


---------------------
initialStopDistPoints
---------------------
The distance for the initial stop in points/dollars. Does not affect the trailing stop. 


-------------------
trailingStopDistPct
-------------------
The percentage distance of the trailing stop. 


------------------------
portfolioWeightPerStock
------------------------
Controls the position size as a percentage of the current total portfolio value. 
Example: portfolioWeightPerStock = 0.5 will allocate 50% of cash per security per signal. 



------------
tickerToPlot
------------
The Ticker you'd like to choose for plotting. 
This will plot the price, trailing stop and portfolio weight/exposure every minute. 



























'''
################################################################################
# ------------------------------------------------------------------------------
# The settings for your trading algorithm
# ------------------------------------------------------------------------------
################################################################################


# ------------------------------------------------------------------------------
# General settings
# ------------------------------------------------------------------------------
startDate  = (2021, 11, 15)
endDate    = (2022, 2, 15)
startCash  = 1000000


# ------------------------------------------------------------------------------
# Selection
# ------------------------------------------------------------------------------
numberOfStocksToScan   = 500
minimumPricePerShare   = 15
maximumPricePerShare   = 130
numberOfLongPositions  = 4
numberOfShortPositions = 4


# ------------------------------------------------------------------------------
# Exit conditions, Risk Management & Position sizing
# ------------------------------------------------------------------------------
initialStopDistPoints   = 1
trailingStopDistPct     = 1
portfolioWeightPerStock = 0.1
StopActivateProfit = 0.04


# ---------
# Charting
# ---------
tickerToPlot = 'NARI'