Overall Statistics
Total Orders
22
Average Win
4.43%
Average Loss
-13.10%
Compounding Annual Return
-65.748%
Drawdown
74.200%
Expectancy
-0.554
Start Equity
1000
End Equity
337.53
Net Profit
-66.247%
Sharpe Ratio
-0.454
Sortino Ratio
-0.099
Probabilistic Sharpe Ratio
1.475%
Loss Rate
67%
Win Rate
33%
Profit-Loss Ratio
0.34
Alpha
-0.356
Beta
0.192
Annual Standard Deviation
0.676
Annual Variance
0.457
Information Ratio
-0.799
Tracking Error
0.708
Treynor Ratio
-1.599
Total Fees
$23.08
Estimated Strategy Capacity
$230000.00
Lowest Capacity Asset
SNDL X6N3VNEHX84L
Portfolio Turnover
2.12%
# region imports
from AlgorithmImports import *
import numpy as np
# endregion


class Fallingknives(QCAlgorithm):
        
    def Initialize(self):
        self.logging = True

        self.min_recent_volume_increase = 8  # Minimum recent volume increase multiplier
        self.min_recent_volume_decrease = 8
        self.min_price_spike = 1.5  # Minimum price spike percentage
        self.min_price_decline = 0.2  # Minimum price decline percentage after spike
        self.stop_loss_pct = .1  # Stop-loss percentage for short positions
        self.take_profit_pct = 2  # Take-profit percentage for short positions
        self.decline_window = 3 * 60 # hours to check for rapid decline
        self.min_hold_time = 1  # days to hold a short position
        self.max_hold_time = 2  # days to hold a short position
        self.min_decline_window_size = 3 * 60 # number of hours over which to check for rapid decline
        self.rebalanceTime = datetime.min
        self.stay_in_cadidate_list = 7  # days to stay in candidate list
        self.split_ratio = .25
        self.volumeBarsDaysSize = 10 * 60
        #self.volumeBarsDaysSize = self.volumeBarsDaysSize * 24

        self.SetStartDate(2020, 3, 6)  # Set start date
        self.SetEndDate(2021, 3, 10)  # Set end date (replace with desired end date)
        self.SetCash(1000)  # Set initial capital
        self.final_universe_size = 100  # Number of stocks in final universe
        self.activeStocks = set()
        self.AddUniverse(self.CoarseFilter, self.FineFilter)
        self.UniverseSettings.Resolution = Resolution.Minute
        self.TickerToAnalyze = "ADIL"

        self.volumes = {}
        self.rolling_prices = {}
        self.shortCandidates = set()
        self.volumePassed = set()
        self.days_in_candidate_list = {}
        self.trailing_stop_thresholds = {}
        self.hold_times = {}
        self.bollingers = {}
        self.trailing_stop_percents = [(100, 100), (-.15, -.1), (-.25, -.16), (-.35, -.26), (-.45, -.36), (-.55, -.46), (-.65, -.56), (-.75, -.66), (-.85, -.76), (-.95, -.86)]

        self.schedule.on(self.date_rules.every(DayOfWeek.MONDAY, DayOfWeek.TUESDAY, DayOfWeek.WEDNESDAY, DayOfWeek.THURSDAY, DayOfWeek.FRIDAY),
                 self.time_rules.at(15, 30),
                 lambda: self.UpdateCandidateList())
        
    def UpdateCandidateList(self):
        # remove stocks that have been in the candidate list for too long
        toremove = []
        for symbol in self.shortCandidates:
            if symbol in self.days_in_candidate_list:
                self.days_in_candidate_list[symbol] += 1
                if self.days_in_candidate_list[symbol] > self.stay_in_cadidate_list:
                    self.Log("Removing stock: " + str(symbol.value) + " from candidate list after " + str(self.stay_in_cadidate_list) + " days.")
                    toremove.append(symbol)
            else:
                self.days_in_candidate_list[symbol] = 1
        for symbol in toremove:
            self.shortCandidates.remove(symbol)
            self.days_in_candidate_list.pop(symbol)

        
        # go through ever symbol in portfolio and update hold times
        for symbol in self.Portfolio.Keys:
            if self.Portfolio[symbol].Invested:
                if symbol in self.hold_times:
                    self.hold_times[symbol] += 1
                else:
                    self.hold_times[symbol] = 1
                if self.hold_times[symbol] > self.max_hold_time:
                    self.Log("Max hold time reached for stock: " + str(symbol.value) + " at time/date: " + str(self.Time))
                    self.SetHoldings(symbol, 0)
                    self.hold_times.pop(symbol)
                    self.bollingers.pop(symbol)
                    self.volumes.pop(symbol)
                    self.rolling_prices.pop(symbol)
                    self.activeStocks.remove(symbol)

    def CoarseFilter(self, coarse):
        # Rebalancing weekly
        
        if self.Time <= self.rebalanceTime:
            return self.Universe.Unchanged
        self.rebalanceTime = self.Time + timedelta(7)
        #sortedByPE = sorted(coarse, key=lambda x: x.MarketCap)
        #final = [x.Symbol for x in sortedByPE if x.MarketCap > 0][:200]
        #return final
        
        #competition coarse
        sortedCoarse = sorted(coarse, key=lambda c:c.DollarVolume, reverse=True)
        return [c.Symbol for c in sortedCoarse][:1000]

        
    
    def FineFilter(self, fine):

        #sortedByDollarVolume = sorted(fine, key=lambda x: x.DollarVolume, reverse=True)
        #final = [x.Symbol for x in sortedByDollarVolume if x.HasFundamentalData and x.price > 1
        #         ][:self.final_universe_size]
        #and x.SecurityReference.ExchangeId == "NAS"

        # competition fine filter
        sortedByPE = sorted(fine, key=lambda x: x.MarketCap)
        final = [x.Symbol for x in sortedByPE if x.MarketCap > 0][:200]

        return final

    def OnSecuritiesChanged(self, changes):
        # close positions in removed securities
        for x in changes.RemovedSecurities:
            #self.Liquidate(x.Symbol)
            if self.TickerToAnalyze in str(x.Symbol):
                self.Log("removed " + str(x.Symbol) + " from universe")
            if x.Symbol in self.activeStocks:
                self.activeStocks.remove(x.Symbol)
        
        # can't open positions here since data might not be added correctly yet
        for x in changes.AddedSecurities:
            history_trade_bar = self.history[TradeBar](x.Symbol, self.volumeBarsDaysSize, Resolution.Minute)
            history_trade_bar_decline = self.history[TradeBar](x.Symbol, self.decline_window, Resolution.Minute)
            self.volumes[x.Symbol] = RollingWindow[float](self.volumeBarsDaysSize)
            self.rolling_prices[x.Symbol] = RollingWindow[float](self.decline_window)

            for trade_bar in history_trade_bar:
                self.volumes[x.Symbol].Add(trade_bar.Volume)
            for trade_bar in history_trade_bar_decline:
                self.rolling_prices[x.Symbol].Add(trade_bar.Close)
            self.bollingers[x.symbol] = self.bb(x.Symbol, 20, 2)
            self.activeStocks.add(x.Symbol)   

    def OnData(self, data):

        # Check for shorting and exit opportunities
        for symbol in self.activeStocks:
            if not data.ContainsKey(symbol) or data[symbol] is None:
                continue

            self.volumes[symbol].Add(data[symbol].Volume)
            self.rolling_prices[symbol].Add(data[symbol].price)
            
            if symbol not in self.shortCandidates:
                self.IsShortingCandidate(data, symbol)   
                if symbol in self.shortCandidates:
                    self.Log("Stock: " + str(symbol.value) + " was in list, check for rapid decline")
                    self.RapidDecline(data, symbol)
            elif self.IsShortExitCandidate(data, symbol):
                self.Log("Covering stock: " + str(symbol.value) + " at price: " + str(data[symbol].price) + " at time/date: " + str(self.Time))
                self.SetHoldings(symbol, 0)
            else:
                self.RapidDecline(data, symbol)
                
            

    def RapidDecline(self, data, symbol):
        # check if stock is in a rapid decline over last x days
        if not data.ContainsKey(symbol) or data[symbol] is None:
            #self.Log("No data for stock: " + str(symbol.value))
            return False
        if not symbol in self.bollingers:
            return False
        if not self.bollingers[symbol].IsReady:
            #self.Log("bollingers aint ready")
            return False
        
        #self.Log("Checking for rapid decline for stock: " + str(symbol.value) + " at time/date: " + str(self.Time))

        old_prices_1 = np.fromiter([i for i in self.rolling_prices[symbol]], dtype=float, count=self.rolling_prices[symbol].Count)[30:60]
        old_prices_2 = np.fromiter([i for i in self.rolling_prices[symbol]], dtype=float, count=self.rolling_prices[symbol].Count)[0:30]
        mean_old_price_1 = np.max(old_prices_1)
        mean_old_price_2 = np.mean(old_prices_2)
        price_decline = mean_old_price_2 / mean_old_price_1

        volumecomparesList1 = np.fromiter([i for i in self.volumes[symbol]], dtype=float, count=self.volumes[symbol].Count)[60:120]
        volumecomparesList1 = volumecomparesList1[volumecomparesList1 != 0]
        volumecomparesList2 = np.fromiter([i for i in self.volumes[symbol]], dtype=float, count=self.volumes[symbol].Count)[0:60]
        volumecomparesList2 = volumecomparesList2[volumecomparesList2 != 0]
        lasthourvolume = np.mean(volumecomparesList1)
        thishourvolume = np.mean(volumecomparesList2)

        min_volume_jump = thishourvolume / lasthourvolume

        if "ENSV" in str(symbol):
            self.Log("----Price decline: " + str(price_decline) + " at time/date: " + str(self.Time))
            self.Log("----price averaged over back half of hour: " + str(mean_old_price_1) + " at time/date: " + str(self.Time))
            self.Log("----min volume jump: " + str(min_volume_jump) + " at time/date: " + str(self.Time))

        #  min_volume_jump >= self.min_recent_volume_decrease and (
        if price_decline < 1 - self.min_price_decline:# data[symbol].price <= self.bollingers[symbol].UpperBand.Current.Value:
            self.Log("----Price decline: " + str(price_decline) + " at time/date: " + str(self.Time))
            self.Log("----price averaged over back half of hour: " + str(mean_old_price_1) + " at time/date: " + str(self.Time))
            self.Log("----min volume jump: " + str(min_volume_jump) + " at time/date: " + str(self.Time))

            self.Log("shorting stock: " + str(symbol.value) + " at price: " + str(data[symbol].price) + " at time/date: " + str(self.Time))
            self.SetHoldings(symbol, -self.split_ratio)
            self.shortCandidates.remove(symbol)
            self.trailing_stop_thresholds[symbol] = self.trailing_stop_percents[0]
            return True

        return False

    def IsShortingCandidate(self, data, symbol):
        # Check for recent volume increase, price spike, and confirmation decline

        # check if stock is ready to analyze
        if not data.ContainsKey(symbol) or data[symbol] is None:
            if self.TickerToAnalyze in str(symbol.value):
                self.Log("No data for stock: " + str(symbol.value))
            return 
        if not self.volumes[symbol].IsReady:
            if self.TickerToAnalyze in str(symbol.value):
                self.Log("Volumes not ready for stock: " + str(symbol.value))
            return 
        
        # check for recent volume increase
        # find the median volume over the volumes window and compare to today
        #convert volume rolling window to list, remove 0 values and find minimum
        volumecomparesList1 = np.fromiter([i for i in self.volumes[symbol]], dtype=float, count=self.volumes[symbol].Count)[60:120]
        volumecomparesList1 = volumecomparesList1[volumecomparesList1 != 0]
        volumecomparesList2 = np.fromiter([i for i in self.volumes[symbol]], dtype=float, count=self.volumes[symbol].Count)[0:60]
        volumecomparesList2 = volumecomparesList2[volumecomparesList2 != 0]

        if len(volumecomparesList1) == 0:
            return

        max_volume_jump = np.mean(volumecomparesList2) / np.mean(volumecomparesList1)

        if max_volume_jump < self.min_recent_volume_increase and symbol not in self.volumePassed:
            #if "ENSV" in str(symbol):
                #self.Log("Stock: " + str(symbol.value) + " did not pass volume check with jump: " + str(max_volume_jump) + " at time/date: " + str(self.Time))
            return 
        
        if symbol not in self.volumePassed:
            self.volumePassed.add(symbol)
            self.Log("*********" + str(symbol.value) + " Passed volume check with jump: " + str(max_volume_jump) + " at time/date: " + str(self.Time) + "*********")

        old_prices1 = np.fromiter([i for i in self.rolling_prices[symbol]], dtype=float, count=self.rolling_prices[symbol].Count)[20:120]
        old_prices2 = np.fromiter([i for i in self.rolling_prices[symbol]], dtype=float, count=self.rolling_prices[symbol].Count)[0:20]
        price_spike = np.mean(old_prices2) / np.mean(old_prices1)

        if price_spike < self.min_price_spike:
            #if "ENSV" in str(symbol):
            self.Log("$$$$$$$$$$$$$Stock: " + str(symbol.value) + " did not pass price spike check with spike: " + str(price_spike) + " at time/date: " + str(self.Time))
            return 
        
        self.volumePassed.remove(symbol)

        self.Log("$$$$$$$$$$$$$$$" + str(symbol.value) + " Passed price spike check, adding to candidate list wiht spike: " + str(price_spike) + " at time/date: " + str(self.Time) + "*********")

        # Add stock to candidates to watch to short
        self.shortCandidates.add(self.AddEquity(symbol, Resolution.Minute).Symbol)
        


    def IsShortExitCandidate(self, data, symbol):
        # Exit on stop-loss or take-profit

        if not data.ContainsKey(symbol) or data[symbol] is None or symbol not in self.hold_times:
            return False

        if self.Portfolio[symbol].Invested:
            entry_price = self.Portfolio[symbol].AveragePrice
        else:
            return False
        
        current_price = data[symbol].close
        if current_price <= entry_price * (1 - self.take_profit_pct) and self.hold_times[symbol] >= self.min_hold_time:
            self.Log("Take profit hit for stock: " + str(symbol.value) + " at price: " + str(data[symbol].price) + " at time/date: " + str(self.Time))
            self.bollingers.pop(symbol)

            return True
        elif current_price >= entry_price * (1 + self.stop_loss_pct) and self.hold_times[symbol] >= self.min_hold_time:
            self.Log("Stop loss hit for stock: " + str(symbol.value) + " at price " + str(data[symbol].price) + " at time/date: " + str(self.Time))
            self.bollingers.pop(symbol)

            return True
        elif current_price >= entry_price * (1 + self.trailing_stop_thresholds[symbol][1]) and self.hold_times[symbol] >= self.min_hold_time:
            self.Log("Trailing stop hit for stock: " + str(symbol.value) + " at price " + str(data[symbol].price) + " at time/date: " + str(self.Time))
            self.bollingers.pop(symbol)

            return True
        
        for thresh in self.trailing_stop_percents:
            if thresh[0] >= (data[symbol].close - entry_price) / entry_price and thresh[0] < self.trailing_stop_thresholds[symbol][0]:
                self.trailing_stop_thresholds[symbol] = thresh

        # finally, sell if price moved above upper bollinger band
        #if data[symbol].price > self.bollingers[symbol].UpperBand.Current.Value:
        #    self.Log("Price moved above upper bollinger band for stock: " + str(symbol.value) + " at price " + str(data[symbol].price) + " at time/date: " + str(self.Time))
        #    return True

        return False