Overall Statistics
Total Orders
543
Average Win
1.03%
Average Loss
-1.56%
Compounding Annual Return
198.229%
Drawdown
35.900%
Expectancy
0.396
Start Equity
1000000
End Equity
16575810.02
Net Profit
1557.581%
Sharpe Ratio
2.42
Sortino Ratio
3.532
Probabilistic Sharpe Ratio
88.850%
Loss Rate
16%
Win Rate
84%
Profit-Loss Ratio
0.66
Alpha
1.459
Beta
0.637
Annual Standard Deviation
0.608
Annual Variance
0.37
Information Ratio
2.407
Tracking Error
0.603
Treynor Ratio
2.31
Total Fees
$4169.46
Estimated Strategy Capacity
$2000000.00
Lowest Capacity Asset
NVMI RTSESF5CFJHH
Portfolio Turnover
1.17%
# region imports
from AlgorithmImports import *
# endregion

class CreativeApricotShark(QCAlgorithm):

    def initialize(self):
        self.SetStartDate(2022, 1, 1)
        self.SetEndDate(2024, 12, 30)
        self.SetCash(1000000)
        #self.AddEquity("SPY", Resolution.Daily)
        #self.tickers = ['AAPL','MSFT','GOOGL','AMZN', 'META','TSLA', 'SPY']#: 287% # winners
        #self.tickers = ['XOM','JNJ','KO','MCD', 'MDT','SHW', 'CTAS'] #182.98 % # high dividend
        self.tickers = ['FIVE','GPCR','STRL','NVMI', 'ONTO','ASML', 'VKTX'] # losers: 1,050.50 %
        self.candles = {}
        self.percent = 0.25
        self.open_positions = []
        self.transactions_history = pd.DataFrame(columns=['Date', 'Stock', 'Type of Transaction', 'Candle', 'Buy Price', 'Qty', 'Sell Price', 'P/L'])
        self.last_sell_date = self.StartDate
        # determines the investment percentage of the totalCash
        self.percent = 0.05

        # We are also going to have a stop-loss metric associated with each position
        # to reduce the drawdown
        self.stop_loss_threshold = -0.2

        # Let us also explore the possibility of SMA anf FMA crossovers

        self.fast_moving_averages = {}
        self.slow_moving_averages = {}
        self.mv_state = {}  # Track the moving average state for each stock

        # Set warm-up period to ensure indicators are ready
        self.SetWarmUp(200, Resolution.Daily)

        # Initialize moving averages
        for ticker in self.tickers:
            equity = self.AddEquity(ticker, Resolution.Daily)
            self.candles[ticker] = Candle(self, ticker)
            
            # Create the moving averages
            self.fast_moving_averages[ticker] = self.SMA(ticker, 50, Resolution.Daily)
            self.slow_moving_averages[ticker] = self.SMA(ticker, 200, Resolution.Daily)

            self.mv_state[ticker] = None  # Initialize the state to None

        

    def OnData(self, data):
        if self.IsWarmingUp:
            return
        for ticker, candle in self.candles.items():
            if ticker not in data.Bars:
                self.debug(ticker)
                continue
            bar = data.Bars[ticker]
            candle.Update(bar)

            # # Check for moving average crossover
            # if self.fast_moving_averages[ticker].IsReady and self.slow_moving_averages[ticker].IsReady:
            #     fast_mv = self.fast_moving_averages[ticker].Current.Value
            #     slow_mv = self.slow_moving_averages[ticker].Current.Value
                
            #     # Check if slow moving average crosses below the fast moving average
            #     if slow_mv < fast_mv and self.mv_state[ticker] != "sell":
            #         self.Debug(f"Slow MV crossed below Fast MV for {ticker} at {self.Time}")
            #         self.sell_all_positions()
            #         self.mv_state[ticker] = "sell"
            #         break  # Exit loop after selling all

            #     # Check if fast moving average crosses above the slow moving average
            #     if fast_mv > slow_mv and self.mv_state[ticker] == "sell":
            #         self.Debug(f"Fast MV crossed above Slow MV for {ticker} at {self.Time}")
            #         self.enter_new_positions(data)
            #         self.mv_state[ticker] = "buy"
            #         break  # Exit loop after entering new positions

            if candle.shouldExit():
                self.close_positions([position for position in self.open_positions if position['Stock'] == ticker], data[ticker].Close, 'SELL', candleStick=candle.getPatternName())
            elif candle.shouldEnter():
                portfolio_value = self.Portfolio.TotalPortfolioValue
                allocation = portfolio_value * self.percent  # Allocate self.percent of portfolio value to each position
                quantity = allocation // data[ticker].Close
                # quantity = (10000 / data[ticker].Close + 1)
                #self.Debug(f"Buying {quantity} shares of {ticker} at {data[ticker].Close} on {self.Time}")
                self.MarketOrder(ticker, quantity)
                self.open_positions.append({'Date': self.Time, 'Qty': quantity, 'Buy Price': data[ticker].Close, 'Stock': ticker, 'Paper P/L': 0, 'Paper P/L %': 0})
        # Everyday we will calculate the Paper profit of each open position
        self.calculate_paper_pl(data)
        # Each day we will calculate to see if our stop-loss thresholf is being hit
        self.check_stop_loss(data)

        #self.check_and_sell_every_30_days(data)
            
    def sell_all_positions(self):
        '''Sell all open positions'''
        for position in self.open_positions:
            ticker = position['Stock']
            qty = position['Qty']
            self.MarketOrder(ticker, -qty)
        self.open_positions = []

    def enter_new_positions(self, data):
        '''Enter a new position for each stock'''
        for ticker in self.tickers:
            if ticker in data.Bars:
                portfolio_value = self.Portfolio.TotalPortfolioValue
                allocation = portfolio_value * self.percent # Allocate 5% of portfolio value to each position
                quantity = allocation // data[ticker].Close
                self.Debug(f"Entering new position: Buying {quantity} shares of {ticker} at {data[ticker].Close} on {self.Time}")
                self.MarketOrder(ticker, quantity)
                self.open_positions.append({'Date': self.Time, 'Qty': quantity, 'Buy Price': data[ticker].Close, 'Stock': ticker, 'Paper P/L': 0, 'Paper P/L %': 0, 'Max Price': data[ticker].Close})

    def check_and_sell_every_30_days(self, data):
        if (self.Time - self.last_sell_date) >= timedelta(days=60):
            self.sell_20_percent_profit(data)
            self.last_sell_date = self.Time

    def sell_20_percent_profit(self, data):
        getProfit = []
        for position in self.open_positions:
            ticker = position['Stock']
            if ticker not in data.Bars:
                continue
            
            price = data.Bars[ticker].Open
            qty = position['Qty']
            paperValue = qty * price
            paperPL = paperValue - qty * position['Buy Price']

            if paperPL > 0:
                price = data.Bars[ticker].Open
                qty = position['Qty']
                sellQty = 0.25 * qty
                if qty <= 4:
                    sellQty = qty
                position['Qty'] -= sellQty
                transaction = {'Date': self.Time, 'Stock': ticker, 'Type of Transaction': 'SELL 30D POSITION', 'Buy Price': position['Buy Price'],
                            'Sell Price': price, 'Qty': sellQty, 'P/L': (price - position['Buy Price']) * sellQty}
                getProfit.append(transaction)
        
        for transaction in getProfit:
            ticker = transaction['Stock']
            self.transactions_history.loc[len(self.transactions_history)] = transaction
            self.Debug(f"Transaction {transaction}")
            self.MarketOrder(ticker, -transaction['Qty'])

        self.open_positions = [p for p in self.open_positions if p['Qty'] > 0]
        
    def close_positions(self, open_positions, price, heading, candleStick=""):
        '''
            We sell 25% of each open position whenever our exit position candle occurs
        '''
        for o in open_positions:
            qty = o['Qty']
            sellQty = self.percent * qty
            if qty <= 4:
                sellQty = qty

            leftQty = qty - sellQty
            paperValue = sellQty * price 
            PL = paperValue - sellQty * o['Buy Price']
            transaction = {'Date': o["Date"], 'Stock' : o['Stock'],'Type of Transaction' : heading, 'Candle': candleStick, 'Buy Price' : o['Buy Price'], 'Sell Price' : price, 'Qty': sellQty , 'P/L': PL}
            self.transactions_history.loc[len(self.transactions_history)] = transaction
            #self.Debug(f"Transacton {transaction}")
            self.MarketOrder(o['Stock'], -sellQty)
            o['Qty'] = leftQty
        
        self.open_positions = [o for o in open_positions if o['Qty'] > 0]


    def calculate_paper_pl(self, data):
        '''
            We regularly take out profits, if a current open position has a unrealized profit of greater than 30%
        '''
        getProfit = []
        for position in self.open_positions:
            ticker = position['Stock']
            if ticker not in data.Bars:
                continue

            # Calculating the paper profit
            price = data.Bars[ticker].Open
            qty = position['Qty']
            paperValue = qty * price
            paperPL = paperValue - qty * position['Buy Price']
            paperPLPercentage = paperPL / (qty * position['Buy Price'])

            if paperPLPercentage > 0.3:
                # Selling 25% of the position if paper profit is > 30%
                sellQty = 0.25 * qty
                if qty <= 4:
                    sellQty = qty
                position['Qty'] -= sellQty
                transaction = {'Date': self.Time, 'Stock': ticker, 'Type of Transaction': 'SELL FRAC', 'Buy Price': position['Buy Price'],
                               'Sell Price': price, 'Qty': sellQty, 'P/L': paperPL * (sellQty / qty)}
                getProfit.append(transaction)
        
        for transaction in getProfit:
            ticker = transaction['Stock']
            self.transactions_history.loc[len(self.transactions_history)] = transaction
            #self.Debug(f"Transacton {transaction}")
            #self.transactions_history = self.transactions_history.append(transaction, ignore_index=True)
            # Selling a part of the position
            self.MarketOrder(ticker, -transaction['Qty'])  

        self.open_positions = [p for p in self.open_positions if p['Qty'] > 0]

    def check_stop_loss(self, data):
        '''
            This function iterates through all the open positions and liquidates
            the entire position if our stop-loss is hit
        '''

        positions_to_liquidate = []
        for position in self.open_positions:
            ticker = position['Stock']
            if ticker not in data.Bars:
                continue

            price = data.Bars[ticker].Open
            qty = position['Qty']
            paperValue = qty * price
            paperPL = paperValue - qty * position['Buy Price']
            paperPLPercentage = paperPL / (qty * position['Buy Price'])

            if paperPLPercentage <= self.stop_loss_threshold:
                self.Debug(f"Stop loss hit for position: {position}, current price = {price}, loss = {paperPL} ,loss per = {paperPLPercentage}")
                positions_to_liquidate.append(position)
        
        for position in positions_to_liquidate:
            ticker = position['Stock']
            # Liquidating the entire position
            self.MarketOrder(ticker, -qty)
            #self.close_positions([position], data[ticker].Close, 'STOP LOSS')
            # We need to remove the position from open positions since we have liquidated
            # the entire position
            self.open_positions.remove(position)


class Candle:
    def __init__(self, algorithm, ticker, frac=0.9):
        self.algorithm = algorithm
        self.ticker = ticker
        self.frac = frac
        self.bb = BollingerBands(20, 2, MovingAverageType.Simple)
        self.bb2 = BollingerBands(20, 1, MovingAverageType.Simple)
        self.rsi = RelativeStrengthIndex(14, MovingAverageType.Simple)
        self.macd = MovingAverageConvergenceDivergence(12, 26, 9, MovingAverageType.Simple)
        self.sma = SimpleMovingAverage(50)
        self.data = []
        self.pattern_name = ""

    def Update(self, bar):
        self.data.append(bar)
        if len(self.data) > 2:
            self.data.pop(0)
        self.bb.Update(bar.EndTime, bar.Close)
        self.bb2.Update(bar.EndTime, bar.Close)
        self.rsi.Update(bar.EndTime, bar.Close)
        self.macd.Update(bar.EndTime, bar.Close)
        self.sma.Update(bar.EndTime, bar.Close)

    def return_OHLC(self, candle):
        return candle.Open, candle.High, candle.Low, candle.Close

    def return_stats(self, candle):
        delta_v = candle.Close - candle.Open
        max_vi = max(candle.Close, candle.Open)
        min_vi = min(candle.Close, candle.Open)
        return delta_v, max_vi, min_vi

    def shouldEnter(self):
        if len(self.data) < 2:
            return False
        if self.isHangingMan() or self.isBullishEngulfing() or self.isDragonFlyDoji():
            return True
        return False

    def shouldExit(self):
        if len(self.data) < 2:
            return False
        if self.isInvertedHammer() : # look into why no. of trades is decreasing on more candle sticks
            return True
        return False

    def isHangingMan(self):
        candle = self.data[-1]
        curr_open, curr_high, curr_low, curr_close = self.return_OHLC(candle)
        curr_delta_v, curr_max_vi, curr_min_vi = self.return_stats(candle)
        if ( (curr_high - curr_low) > -4 * curr_delta_v) and \
               ((curr_close - curr_low)/(0.001 + curr_high - curr_low ) > 0.6) and \
               (curr_open - curr_low)/(0.001 + curr_high - curr_low ) > 0.6 and \
               (curr_close >= self.bb.UpperBand.Current.Value * self.frac or curr_close <= self.bb.LowerBand.Current.Value):
            self.pattern_name = "Hanging Man"
            return True
        return False
    
    def isInvertedHammer(self):
        candle = self.data[-1]
        curr_open, curr_high, curr_low, curr_close = self.return_OHLC(candle)
        curr_delta_v, curr_max_vi, curr_min_vi = self.return_stats(candle)
        if ( (curr_high - curr_low) > -3 * curr_delta_v) and \
               ((curr_high - curr_close)/(0.001 + curr_high - curr_low ) > 0.6) and \
               (curr_high - curr_open)/(0.001 + curr_high - curr_low ) > 0.6 and \
               (curr_close >= self.bb.UpperBand.Current.Value or curr_close <= self.bb.LowerBand.Current.Value) and \
               (curr_close >= self.bb.UpperBand.Current.Value * self.frac or curr_close <= self.bb.LowerBand.Current.Value):
            self.pattern_name = "Inverted Hammer"
            return True
        return False
    
    def isDragonFlyDoji(self):
        candle = self.data[-1]
        curr_open, curr_high, curr_low, curr_close = self.return_OHLC(candle)
        curr_delta_v, curr_max_vi, curr_min_vi = self.return_stats(candle)
        if ( curr_open == curr_close or (abs(curr_delta_v) / (curr_high - curr_low) < 0.1 )) and \
               ((curr_high - curr_max_vi) < (3 * abs(curr_delta_v))  ) and \
               ((curr_min_vi - curr_low) > (3 * abs(curr_delta_v)) ) and \
               (curr_close >= self.bb.UpperBand.Current.Value * self.frac or curr_close <= self.bb.LowerBand.Current.Value):
            self.pattern_name = "DragonFly Doji"
            return True
        return False
    
    def isGravestoneDoji(self):
        candle = self.data[-1]
        curr_open, curr_high, curr_low, curr_close = self.return_OHLC(candle)
        curr_delta_v, curr_max_vi, curr_min_vi = self.return_stats(candle)
        if ( curr_open == curr_close or(abs(curr_delta_v) / (curr_high - curr_low) < 0.1 )) and \
               ((curr_high - curr_max_vi) > (3 * abs(curr_delta_v))   ) and \
               ((curr_min_vi - curr_low) <= (3 * abs(curr_delta_v)) ) and \
               (curr_close >= self.bb.UpperBand.Current.Value * self.frac or curr_close <= self.bb.LowerBand.Current.Value):
            self.pattern_name = "Gravestone Doji"
            return True
        return False

    def isBullishEngulfing(self):
        if len(self.data) < 2:
            return False
        candle = self.data[-1]
        prev_candle = self.data[-2]
        curr_open, curr_high, curr_low, curr_close = self.return_OHLC(candle)
        prev_open, prev_high, prev_low, prev_close = self.return_OHLC(prev_candle)
        curr_delta_v, curr_max_vi, curr_min_vi = self.return_stats(candle)
        prev_delta_v, prev_max_vi, prev_min_vi = self.return_stats(prev_candle)
        if curr_close >= prev_open and prev_open > prev_close and \
                curr_close > curr_open and prev_close >= curr_open and \
                (curr_close - curr_open) > (prev_open - prev_close) and \
               (curr_close >= self.bb.UpperBand.Current.Value * self.frac or curr_close <= self.bb.LowerBand.Current.Value):
            self.pattern_name = "Bullish Engulfing"
            return True
        return False
    
    def isBearishEngulfing(self):
        if len(self.data) < 2:
            return False
        candle = self.data[-1]
        prev_candle = self.data[-2]
        curr_open, curr_high, curr_low, curr_close = self.return_OHLC(candle)
        prev_open, prev_high, prev_low, prev_close = self.return_OHLC(prev_candle)
        curr_delta_v, curr_max_vi, curr_min_vi = self.return_stats(candle)
        prev_delta_v, prev_max_vi, prev_min_vi = self.return_stats(prev_candle)
        if curr_open >= prev_close and prev_close > prev_open and \
                curr_close < curr_open and prev_open >= curr_close and \
                (curr_open - curr_close) > (prev_close - prev_open) and \
               (curr_close >= self.bb.UpperBand.Current.Value * self.frac or curr_close <= self.bb.LowerBand.Current.Value):
            self.pattern_name = "Bearish Engulfing"
            return True
        return False

    def getPatternName(self):
        return self.pattern_name