Overall Statistics |
Total Orders 543 Average Win 1.03% Average Loss -1.56% Compounding Annual Return 198.229% Drawdown 35.900% Expectancy 0.396 Start Equity 1000000 End Equity 16575810.02 Net Profit 1557.581% Sharpe Ratio 2.42 Sortino Ratio 3.532 Probabilistic Sharpe Ratio 88.850% Loss Rate 16% Win Rate 84% Profit-Loss Ratio 0.66 Alpha 1.459 Beta 0.637 Annual Standard Deviation 0.608 Annual Variance 0.37 Information Ratio 2.407 Tracking Error 0.603 Treynor Ratio 2.31 Total Fees $4169.46 Estimated Strategy Capacity $2000000.00 Lowest Capacity Asset NVMI RTSESF5CFJHH Portfolio Turnover 1.17% |
# region imports from AlgorithmImports import * # endregion class CreativeApricotShark(QCAlgorithm): def initialize(self): self.SetStartDate(2022, 1, 1) self.SetEndDate(2024, 12, 30) self.SetCash(1000000) #self.AddEquity("SPY", Resolution.Daily) #self.tickers = ['AAPL','MSFT','GOOGL','AMZN', 'META','TSLA', 'SPY']#: 287% # winners #self.tickers = ['XOM','JNJ','KO','MCD', 'MDT','SHW', 'CTAS'] #182.98 % # high dividend self.tickers = ['FIVE','GPCR','STRL','NVMI', 'ONTO','ASML', 'VKTX'] # losers: 1,050.50 % self.candles = {} self.percent = 0.25 self.open_positions = [] self.transactions_history = pd.DataFrame(columns=['Date', 'Stock', 'Type of Transaction', 'Candle', 'Buy Price', 'Qty', 'Sell Price', 'P/L']) self.last_sell_date = self.StartDate # determines the investment percentage of the totalCash self.percent = 0.05 # We are also going to have a stop-loss metric associated with each position # to reduce the drawdown self.stop_loss_threshold = -0.2 # Let us also explore the possibility of SMA anf FMA crossovers self.fast_moving_averages = {} self.slow_moving_averages = {} self.mv_state = {} # Track the moving average state for each stock # Set warm-up period to ensure indicators are ready self.SetWarmUp(200, Resolution.Daily) # Initialize moving averages for ticker in self.tickers: equity = self.AddEquity(ticker, Resolution.Daily) self.candles[ticker] = Candle(self, ticker) # Create the moving averages self.fast_moving_averages[ticker] = self.SMA(ticker, 50, Resolution.Daily) self.slow_moving_averages[ticker] = self.SMA(ticker, 200, Resolution.Daily) self.mv_state[ticker] = None # Initialize the state to None def OnData(self, data): if self.IsWarmingUp: return for ticker, candle in self.candles.items(): if ticker not in data.Bars: self.debug(ticker) continue bar = data.Bars[ticker] candle.Update(bar) # # Check for moving average crossover # if self.fast_moving_averages[ticker].IsReady and self.slow_moving_averages[ticker].IsReady: # fast_mv = self.fast_moving_averages[ticker].Current.Value # slow_mv = self.slow_moving_averages[ticker].Current.Value # # Check if slow moving average crosses below the fast moving average # if slow_mv < fast_mv and self.mv_state[ticker] != "sell": # self.Debug(f"Slow MV crossed below Fast MV for {ticker} at {self.Time}") # self.sell_all_positions() # self.mv_state[ticker] = "sell" # break # Exit loop after selling all # # Check if fast moving average crosses above the slow moving average # if fast_mv > slow_mv and self.mv_state[ticker] == "sell": # self.Debug(f"Fast MV crossed above Slow MV for {ticker} at {self.Time}") # self.enter_new_positions(data) # self.mv_state[ticker] = "buy" # break # Exit loop after entering new positions if candle.shouldExit(): self.close_positions([position for position in self.open_positions if position['Stock'] == ticker], data[ticker].Close, 'SELL', candleStick=candle.getPatternName()) elif candle.shouldEnter(): portfolio_value = self.Portfolio.TotalPortfolioValue allocation = portfolio_value * self.percent # Allocate self.percent of portfolio value to each position quantity = allocation // data[ticker].Close # quantity = (10000 / data[ticker].Close + 1) #self.Debug(f"Buying {quantity} shares of {ticker} at {data[ticker].Close} on {self.Time}") self.MarketOrder(ticker, quantity) self.open_positions.append({'Date': self.Time, 'Qty': quantity, 'Buy Price': data[ticker].Close, 'Stock': ticker, 'Paper P/L': 0, 'Paper P/L %': 0}) # Everyday we will calculate the Paper profit of each open position self.calculate_paper_pl(data) # Each day we will calculate to see if our stop-loss thresholf is being hit self.check_stop_loss(data) #self.check_and_sell_every_30_days(data) def sell_all_positions(self): '''Sell all open positions''' for position in self.open_positions: ticker = position['Stock'] qty = position['Qty'] self.MarketOrder(ticker, -qty) self.open_positions = [] def enter_new_positions(self, data): '''Enter a new position for each stock''' for ticker in self.tickers: if ticker in data.Bars: portfolio_value = self.Portfolio.TotalPortfolioValue allocation = portfolio_value * self.percent # Allocate 5% of portfolio value to each position quantity = allocation // data[ticker].Close self.Debug(f"Entering new position: Buying {quantity} shares of {ticker} at {data[ticker].Close} on {self.Time}") self.MarketOrder(ticker, quantity) self.open_positions.append({'Date': self.Time, 'Qty': quantity, 'Buy Price': data[ticker].Close, 'Stock': ticker, 'Paper P/L': 0, 'Paper P/L %': 0, 'Max Price': data[ticker].Close}) def check_and_sell_every_30_days(self, data): if (self.Time - self.last_sell_date) >= timedelta(days=60): self.sell_20_percent_profit(data) self.last_sell_date = self.Time def sell_20_percent_profit(self, data): getProfit = [] for position in self.open_positions: ticker = position['Stock'] if ticker not in data.Bars: continue price = data.Bars[ticker].Open qty = position['Qty'] paperValue = qty * price paperPL = paperValue - qty * position['Buy Price'] if paperPL > 0: price = data.Bars[ticker].Open qty = position['Qty'] sellQty = 0.25 * qty if qty <= 4: sellQty = qty position['Qty'] -= sellQty transaction = {'Date': self.Time, 'Stock': ticker, 'Type of Transaction': 'SELL 30D POSITION', 'Buy Price': position['Buy Price'], 'Sell Price': price, 'Qty': sellQty, 'P/L': (price - position['Buy Price']) * sellQty} getProfit.append(transaction) for transaction in getProfit: ticker = transaction['Stock'] self.transactions_history.loc[len(self.transactions_history)] = transaction self.Debug(f"Transaction {transaction}") self.MarketOrder(ticker, -transaction['Qty']) self.open_positions = [p for p in self.open_positions if p['Qty'] > 0] def close_positions(self, open_positions, price, heading, candleStick=""): ''' We sell 25% of each open position whenever our exit position candle occurs ''' for o in open_positions: qty = o['Qty'] sellQty = self.percent * qty if qty <= 4: sellQty = qty leftQty = qty - sellQty paperValue = sellQty * price PL = paperValue - sellQty * o['Buy Price'] transaction = {'Date': o["Date"], 'Stock' : o['Stock'],'Type of Transaction' : heading, 'Candle': candleStick, 'Buy Price' : o['Buy Price'], 'Sell Price' : price, 'Qty': sellQty , 'P/L': PL} self.transactions_history.loc[len(self.transactions_history)] = transaction #self.Debug(f"Transacton {transaction}") self.MarketOrder(o['Stock'], -sellQty) o['Qty'] = leftQty self.open_positions = [o for o in open_positions if o['Qty'] > 0] def calculate_paper_pl(self, data): ''' We regularly take out profits, if a current open position has a unrealized profit of greater than 30% ''' getProfit = [] for position in self.open_positions: ticker = position['Stock'] if ticker not in data.Bars: continue # Calculating the paper profit price = data.Bars[ticker].Open qty = position['Qty'] paperValue = qty * price paperPL = paperValue - qty * position['Buy Price'] paperPLPercentage = paperPL / (qty * position['Buy Price']) if paperPLPercentage > 0.3: # Selling 25% of the position if paper profit is > 30% sellQty = 0.25 * qty if qty <= 4: sellQty = qty position['Qty'] -= sellQty transaction = {'Date': self.Time, 'Stock': ticker, 'Type of Transaction': 'SELL FRAC', 'Buy Price': position['Buy Price'], 'Sell Price': price, 'Qty': sellQty, 'P/L': paperPL * (sellQty / qty)} getProfit.append(transaction) for transaction in getProfit: ticker = transaction['Stock'] self.transactions_history.loc[len(self.transactions_history)] = transaction #self.Debug(f"Transacton {transaction}") #self.transactions_history = self.transactions_history.append(transaction, ignore_index=True) # Selling a part of the position self.MarketOrder(ticker, -transaction['Qty']) self.open_positions = [p for p in self.open_positions if p['Qty'] > 0] def check_stop_loss(self, data): ''' This function iterates through all the open positions and liquidates the entire position if our stop-loss is hit ''' positions_to_liquidate = [] for position in self.open_positions: ticker = position['Stock'] if ticker not in data.Bars: continue price = data.Bars[ticker].Open qty = position['Qty'] paperValue = qty * price paperPL = paperValue - qty * position['Buy Price'] paperPLPercentage = paperPL / (qty * position['Buy Price']) if paperPLPercentage <= self.stop_loss_threshold: self.Debug(f"Stop loss hit for position: {position}, current price = {price}, loss = {paperPL} ,loss per = {paperPLPercentage}") positions_to_liquidate.append(position) for position in positions_to_liquidate: ticker = position['Stock'] # Liquidating the entire position self.MarketOrder(ticker, -qty) #self.close_positions([position], data[ticker].Close, 'STOP LOSS') # We need to remove the position from open positions since we have liquidated # the entire position self.open_positions.remove(position) class Candle: def __init__(self, algorithm, ticker, frac=0.9): self.algorithm = algorithm self.ticker = ticker self.frac = frac self.bb = BollingerBands(20, 2, MovingAverageType.Simple) self.bb2 = BollingerBands(20, 1, MovingAverageType.Simple) self.rsi = RelativeStrengthIndex(14, MovingAverageType.Simple) self.macd = MovingAverageConvergenceDivergence(12, 26, 9, MovingAverageType.Simple) self.sma = SimpleMovingAverage(50) self.data = [] self.pattern_name = "" def Update(self, bar): self.data.append(bar) if len(self.data) > 2: self.data.pop(0) self.bb.Update(bar.EndTime, bar.Close) self.bb2.Update(bar.EndTime, bar.Close) self.rsi.Update(bar.EndTime, bar.Close) self.macd.Update(bar.EndTime, bar.Close) self.sma.Update(bar.EndTime, bar.Close) def return_OHLC(self, candle): return candle.Open, candle.High, candle.Low, candle.Close def return_stats(self, candle): delta_v = candle.Close - candle.Open max_vi = max(candle.Close, candle.Open) min_vi = min(candle.Close, candle.Open) return delta_v, max_vi, min_vi def shouldEnter(self): if len(self.data) < 2: return False if self.isHangingMan() or self.isBullishEngulfing() or self.isDragonFlyDoji(): return True return False def shouldExit(self): if len(self.data) < 2: return False if self.isInvertedHammer() : # look into why no. of trades is decreasing on more candle sticks return True return False def isHangingMan(self): candle = self.data[-1] curr_open, curr_high, curr_low, curr_close = self.return_OHLC(candle) curr_delta_v, curr_max_vi, curr_min_vi = self.return_stats(candle) if ( (curr_high - curr_low) > -4 * curr_delta_v) and \ ((curr_close - curr_low)/(0.001 + curr_high - curr_low ) > 0.6) and \ (curr_open - curr_low)/(0.001 + curr_high - curr_low ) > 0.6 and \ (curr_close >= self.bb.UpperBand.Current.Value * self.frac or curr_close <= self.bb.LowerBand.Current.Value): self.pattern_name = "Hanging Man" return True return False def isInvertedHammer(self): candle = self.data[-1] curr_open, curr_high, curr_low, curr_close = self.return_OHLC(candle) curr_delta_v, curr_max_vi, curr_min_vi = self.return_stats(candle) if ( (curr_high - curr_low) > -3 * curr_delta_v) and \ ((curr_high - curr_close)/(0.001 + curr_high - curr_low ) > 0.6) and \ (curr_high - curr_open)/(0.001 + curr_high - curr_low ) > 0.6 and \ (curr_close >= self.bb.UpperBand.Current.Value or curr_close <= self.bb.LowerBand.Current.Value) and \ (curr_close >= self.bb.UpperBand.Current.Value * self.frac or curr_close <= self.bb.LowerBand.Current.Value): self.pattern_name = "Inverted Hammer" return True return False def isDragonFlyDoji(self): candle = self.data[-1] curr_open, curr_high, curr_low, curr_close = self.return_OHLC(candle) curr_delta_v, curr_max_vi, curr_min_vi = self.return_stats(candle) if ( curr_open == curr_close or (abs(curr_delta_v) / (curr_high - curr_low) < 0.1 )) and \ ((curr_high - curr_max_vi) < (3 * abs(curr_delta_v)) ) and \ ((curr_min_vi - curr_low) > (3 * abs(curr_delta_v)) ) and \ (curr_close >= self.bb.UpperBand.Current.Value * self.frac or curr_close <= self.bb.LowerBand.Current.Value): self.pattern_name = "DragonFly Doji" return True return False def isGravestoneDoji(self): candle = self.data[-1] curr_open, curr_high, curr_low, curr_close = self.return_OHLC(candle) curr_delta_v, curr_max_vi, curr_min_vi = self.return_stats(candle) if ( curr_open == curr_close or(abs(curr_delta_v) / (curr_high - curr_low) < 0.1 )) and \ ((curr_high - curr_max_vi) > (3 * abs(curr_delta_v)) ) and \ ((curr_min_vi - curr_low) <= (3 * abs(curr_delta_v)) ) and \ (curr_close >= self.bb.UpperBand.Current.Value * self.frac or curr_close <= self.bb.LowerBand.Current.Value): self.pattern_name = "Gravestone Doji" return True return False def isBullishEngulfing(self): if len(self.data) < 2: return False candle = self.data[-1] prev_candle = self.data[-2] curr_open, curr_high, curr_low, curr_close = self.return_OHLC(candle) prev_open, prev_high, prev_low, prev_close = self.return_OHLC(prev_candle) curr_delta_v, curr_max_vi, curr_min_vi = self.return_stats(candle) prev_delta_v, prev_max_vi, prev_min_vi = self.return_stats(prev_candle) if curr_close >= prev_open and prev_open > prev_close and \ curr_close > curr_open and prev_close >= curr_open and \ (curr_close - curr_open) > (prev_open - prev_close) and \ (curr_close >= self.bb.UpperBand.Current.Value * self.frac or curr_close <= self.bb.LowerBand.Current.Value): self.pattern_name = "Bullish Engulfing" return True return False def isBearishEngulfing(self): if len(self.data) < 2: return False candle = self.data[-1] prev_candle = self.data[-2] curr_open, curr_high, curr_low, curr_close = self.return_OHLC(candle) prev_open, prev_high, prev_low, prev_close = self.return_OHLC(prev_candle) curr_delta_v, curr_max_vi, curr_min_vi = self.return_stats(candle) prev_delta_v, prev_max_vi, prev_min_vi = self.return_stats(prev_candle) if curr_open >= prev_close and prev_close > prev_open and \ curr_close < curr_open and prev_open >= curr_close and \ (curr_open - curr_close) > (prev_close - prev_open) and \ (curr_close >= self.bb.UpperBand.Current.Value * self.frac or curr_close <= self.bb.LowerBand.Current.Value): self.pattern_name = "Bearish Engulfing" return True return False def getPatternName(self): return self.pattern_name