Overall Statistics |
Total Orders 2 Average Win 4.94% Average Loss 0% Compounding Annual Return 8021.440% Drawdown 1.600% Expectancy 0 Start Equity 100000 End Equity 104936.72 Net Profit 4.937% Sharpe Ratio 132.787 Sortino Ratio 0 Probabilistic Sharpe Ratio 0% Loss Rate 0% Win Rate 100% Profit-Loss Ratio 0 Alpha 61.431 Beta -1.589 Annual Standard Deviation 0.452 Annual Variance 0.205 Information Ratio 120.982 Tracking Error 0.49 Treynor Ratio -37.81 Total Fees $53.66 Estimated Strategy Capacity $300000.00 Lowest Capacity Asset PRSC SRALEY945R39 Portfolio Turnover 47.06% |
# region imports from AlgorithmImports import * # endregion # Your New Python File class Consolidator: period = 7 def __init__(self, symbol, algo): self.symbol = symbol self.algo = algo self.tgt_pct = self.algo.naive_tgt_pct self.stp_pct = self.algo.naive_stp_pct self.rsi = self.algo.RSI(self.symbol, self.period, MovingAverageType.Wilders) # this runs on add, we cannot do this. # if self.algo.extended_hours and self.algo.res == Resolution.MINUTE: # # we cant really warm it up here, unless we're using minute, and extended hours (only minute uses ETH) def warmup_rsi(self): if self.algo.extended_hours and self.algo.res == Resolution.MINUTE: history = self.algo.History([self.symbol], self.period + 3, self.algo.res, extendedMarketHours=self.algo.extended_hours).loc[self.symbol] for row in history.itertuples(): self.rsi.Update(row.Index, row.close) self.algo.Debug(f'warming up... with {row.Index}, {row.close}') def profit_target(self): target_threshold = self.tgt_pct symbol = self.symbol holding = self.algo.Portfolio[symbol] if target_threshold == 0.0: return urpnl_pct = holding.UnrealizedProfitPercent # Check if the current price is above the target threshold if urpnl_pct > target_threshold: self.algo.Debug(f"Flattening {symbol} -- {urpnl_pct} > {target_threshold}") self.algo.Liquidate(symbol, tag=f"TP -- {urpnl_pct*100}% gain") def stop_loss(self): stp_threshold = self.stp_pct symbol = self.symbol holding = self.algo.Portfolio[symbol] if stp_threshold == 0.0: return stp_threshold = -1 * abs(stp_threshold) urpnl_pct = holding.UnrealizedProfitPercent # Check if the current price is above the target threshold if urpnl_pct < stp_threshold: self.algo.Debug(f"Flattening {symbol} -- {urpnl_pct} < {stp_threshold}") self.algo.Liquidate(symbol, tag=f"SL -- {urpnl_pct*100}%") def on_bar(self, bar): self.rsi.Update(bar.EndTime, bar.Close) @property def warmed_up(self): return self.rsi.IsReady @property def RSI(self): if self.warmed_up: return self.rsi.current.value else: return 0.0 # Bit of a hack
# region imports from AlgorithmImports import * # endregion class Graveyard: # PROBABLY want to do this w scheduler, at 4am, 9:30am # DONT know if this will work! BC we may not have todays history data TODAY! def try_get_gaps(self, symbols: List[str], assert_date: bool = True): gap_data_by_symbol = {} today = self.Time.date() # Get daily data for prior close (RTH only) daily = self.History(symbols, timedelta(days=3), Resolution.Daily) history_minute = self.History(symbols, timedelta(days=3), Resolution.Minute) history_minute_eth = self.History(symbols, timedelta(days=3), Resolution.Minute, extendedMarketHours=True) # Get the latest date in the minute data (today's date) latest_minute_date = history_minute.index.get_level_values('time').date.max() # Check if latest minute data date matches today's date if latest_minute_date != today and assert_date: self.Debug(f"Warning: Data is not from today! Latest data date: {latest_minute_date}, Expected date: {today}") return None # Filter today's minute data for RTH and ETH today_minute_data_rth = history_minute[history_minute.index.get_level_values('time').date == latest_minute_date] today_minute_data_eth = history_minute_eth[history_minute_eth.index.get_level_values('time').date == latest_minute_date] prior_close_rth = daily.groupby('symbol')['close'].last() # Get the last close price from ETH for prior day prior_close_eth = history_minute_eth[history_minute_eth.index.get_level_values('time').date == latest_minute_date - timedelta(days=1)] prior_close_eth = prior_close_eth.groupby('symbol')['close'].last() # Get today's ETH open (first price of the day during ETH) today_open_eth = today_minute_data_eth.groupby('symbol')['open'].first() # Get today's RTH open (first price during regular trading hours) today_open_rth = today_minute_data_rth.groupby('symbol')['open'].first() # Loop through symbols and store all values in a result object for symbol in symbols: try: # Calculate the gaps gap_rth = ((today_open_rth[symbol] - prior_close_rth[symbol]) / prior_close_rth[symbol]) * 100 gap_eth = ((today_open_eth[symbol] - prior_close_eth[symbol]) / prior_close_eth[symbol]) * 100 # Store the calculated data for each symbol gap_data_by_symbol[symbol] = { "gap_rth": gap_rth, "gap_eth": gap_eth, "today_open_rth": today_open_rth[symbol], "today_open_eth": today_open_eth[symbol], "prior_close_rth": prior_close_rth[symbol], "prior_close_eth": prior_close_eth[symbol] } print(gap_data_by_symbol) except KeyError: self.Debug(f"Symbol {symbol} data not available for gap calculation.") return gap_data_by_symbol def RunGapCalculation(self): symbols = list(self.symbol_data.keys()) gap_data = self.try_get_gaps(symbols) if gap_data: self.Debug(f"Gap Data: {gap_data}") self.gap_data = gap_data else: self.Debug("No gap data available.") self.gap_data = {}
# region imports from AlgorithmImports import * from Consolidator import Consolidator # endregion """ Capitalization notes mega cap -- 200b+ (big 5) large cap -- 10b - 200b (mcdonalds, nike) mid cap -- 2b - 10b (etsy, roku) small cap -- 300m - 2b (five below, cargurus) micro cap -- 50m - 300m """ market_cap_categories = { "Mega Cap": {"min": 2.00e11, "max": float('inf')}, "Large Cap": {"min": 1.00e10, "max": 2.00e11}, "Mid Cap": {"min": 2.00e9, "max": 1.00e10}, "Small Cap": {"min": 3.00e8, "max": 2.00e9}, "Micro Cap": {"min": 5.00e7, "max": 3.00e8}, "Nano Cap": {"min": 0, "max": 5.00e7} } """ TODO: replace this big stupid ass dictionary with our consolidator class + fields! move some of the calculations to that class, as well, even as statics. Add targets! I think stops are probably a bad idea, maybe impossible, but we can certainly try. consider adding filters for signal, say high of day for longs, etc. """ class EnergeticVioletShark(QCAlgorithm): # Basic Universe Params max_univ_price = 20 min_univ_price = 5 max_market_cap = market_cap_categories['Micro Cap']['max'] min_market_cap = market_cap_categories['Micro Cap']['min'] # Finer Universe Filters volume_lookback_days = 7 # volume multiple is today vs avg of X days min_gap_pct = 15 # Long filters min_premkt_vol_mult = 1.0 min_regmkt_vol_mult = 1.5 min_rsi = 50 # We keep the top n1 by premkt volume multiple (todays premkt volume / avg premkt, by resolution, averaged) n1 = 40 # we keep the top n2 by regmkt volume multiple (todays regmkt volume / avg regmkt, by resolution, averaged) n2 = 10 # Short filter (not enabled yet) max_regmkt_vol_mult = 1.5 # If target or stop is 0.0, not used. # .05 = 5% naive_tgt_pct = 0.05 # naive stp pct can be positive or negative, it's taken as abs, and used as negative naive_stp_pct = 0.00 res = Resolution.MINUTE # testing only debug_lvl = 1 flip_short = False def initialize(self): self.set_start_date(2024, 10, 1) self.set_end_date(2024, 10, 5) self.set_cash(100_000) self.bm = self.add_equity("SPY", self.res).symbol self.AddUniverse(self.SelectCoarse, self.SelectFine) self.extended_hours = True # self.UniverseSettings.Resolution = self.res if self.extended_hours: self.universe_settings.extended_market_hours = True # self.Schedule.On(self.DateRules.EveryDay(self.bm), self.TimeRules.At(9, 31), self.RunGapCalculation) # self.Schedule.On(self.DateRules.EveryDay(self.bm), self.TimeRules.At(5, 00), self.AfterETHOpen) # DOES NOT WORK self.Schedule.On(self.DateRules.EveryDay(self.bm), self.TimeRules.At(9, 31), self.AfterRTHOpen) self.Schedule.On(self.DateRules.EveryDay("SPY"), self.TimeRules.BeforeMarketClose("SPY", 1), self.EODX) self.symbol_list = [] self.symbol_data = {} self.gap_data = {} self.longs = {} self.shorts = {} # region Event Handlers def EODX(self): self.liquidate(tag="EOD Exit") """ def AfterETHOpen(self): symbols = list(self.symbol_data.keys()) self.gap_data = self.get_prior_closes(symbols) data = self.current_slice if data.contains_key(self.bm): bars = data.bars for symbol in symbols: if data.contains_key(symbol): eth_open = bars[symbol].open self.gap_data[symbol]['eth_open'] = eth_open self.Debug(f'updated {symbol} at time {self.Time} with eth open: {eth_open}') else: self.Debug(f'failed to update {symbol} at time {self.time} -- dropping.') # self.gap_data.pop(symbol) # SO little will work in ETH, so don't do it. else: self.debug("no data... wtf (ETH)") """ def AfterRTHOpen(self): symbols = list(self.symbol_data.keys()) try: self.gap_data = self.get_prior_closes(symbols) except: self.Debug(f'failed to get any data... strange.') return data = self.current_slice if data.contains_key(self.bm): bars = data.bars for symbol in symbols: if symbol not in self.gap_data: continue if data.contains_key(symbol): _open = bars[symbol].open _prior_rth_close = self.gap_data[symbol].get('prior_close_rth', None) self.gap_data[symbol]['rth_open'] = _open if _prior_rth_close: _gap_rth_pct = ((_open - _prior_rth_close) / _prior_rth_close) * 100 if _gap_rth_pct > self.min_gap_pct: self.gap_data[symbol]['rth_gap_pct'] = _gap_rth_pct if self.debug_lvl > 2: self.Debug(f'{str(symbol)} --> viable gap pct: {_gap_rth_pct}') else: if self.debug_lvl > 2: self.Debug(f'{str(symbol)} dropped -- nonviable gap {_gap_rth_pct}') self.gap_data.pop(symbol) continue if self.debug_lvl > 2: self.Debug(f'updated {symbol} at time {self.Time} with rth open: {_open}') # RSI Checks symbol_data = self.symbol_data.get(symbol, None) if symbol_data: # NOTE -- we could try this, warmup w extended NOW # DOES not work.... strange. # symbol_data.warmup_rsi() rsi_value = symbol_data.RSI if rsi_value > self.min_rsi: self.gap_data[symbol]['rsi'] = rsi_value else: self.gap_data.pop(symbol, None) else: self.gap_data.pop(symbol, None) else: # self.Debug(f'failed to update {symbol} at time {self.time} -- dropping') if symbol in self.gap_data: self.gap_data.pop(symbol) if symbol in self.symbol_data: self.unsubscribe(symbol) else: self.debug("no data... wtf") return # ----------- NOTE -- we're now down with symbol data # thats taken us as far as we can get with traditional flow. # the rest is history call hacks to grab ETH data day of, prior to this moment. self.Debug(f'viable gappers: {len(self.gap_data)}') to_fetch = list(self.gap_data.keys()) if len(to_fetch) == 0: return try: volume_multiples_df = self.fetch_volume_multiples(to_fetch).dropna() except: return if volume_multiples_df.empty: return self.Debug(f'volume multiples df shape -- {volume_multiples_df.shape}') pre_mkt = volume_multiples_df.loc[4:8] reg_mkt = volume_multiples_df.loc[8:] # ---------------------------- ENTRY filter pma = pre_mkt.mean() rma = reg_mkt.mean() maybe_longs = [] maybe_shorts = [] # Maybe easier to filter HERE, we should have this data now... for symbol in to_fetch: _pma = pma[symbol] _rma = rma[symbol] ok_for_long = _pma > self.min_premkt_vol_mult and _rma > self.min_regmkt_vol_mult ok_for_short = _pma > self.min_premkt_vol_mult and _rma < self.max_regmkt_vol_mult # NOTE -- we want to find things that are NOT in this _rma > min reg for SHORTS ! if ok_for_long or ok_for_short: data = self.gap_data[symbol] data['volume_df'] = volume_multiples_df[symbol] data['pre_mkt_mult'] = _pma data['reg_mkt_mult'] = _rma if ok_for_long: maybe_longs.append(symbol) if ok_for_short: maybe_shorts.append(symbol) else: if symbol in self.gap_data: self.gap_data.pop(symbol) if symbol in self.symbol_data: self.unsubscribe(symbol) # LONG filtering! # for longs we want to find the LARGEST mult reg mkt, WITH big pre mkt volume. # WE want something that is on pace for a 5x multiple, really. top_by_pma = self.get_top_by_key(self.gap_data, key='pre_mkt_mult', asc=False, n=self.n1) top_by_rma = self.get_top_by_key(self.gap_data, key='reg_mkt_mult', asc=False, n=self.n2) # Extract the symbols from both lists pma_symbols = {item[0] for item in top_by_pma} rma_symbols = {item[0] for item in top_by_rma} common_symbols = pma_symbols & rma_symbols self.longs = {symbol: self.gap_data[symbol] for symbol in common_symbols if symbol in maybe_longs} # SHORTS we want to sort on the LARGEST gap, and find a WEAK volume for. it's a bit different of a pipeline, really. # self.shorts = {symbol: self.gap_data[symbol] for symbol in common_short_symbols if symbol in maybe_shorts} # NOW we need to do high of day tracking, probably -- fuck man. n_longs = len(self.longs) if n_longs == 0: return mult = -1 if self.flip_short else 1 full_wt = .95 wt = mult * full_wt / n_longs self.Debug(f'# entries: {n_longs}') for symbol, data in self.longs.items(): self.Debug(f'buying: {data}') self.set_holdings(symbol, wt) @staticmethod def get_top_by_key(gap_data, key='reg_mkt_mult', asc=True, n=5): return sorted(gap_data.items(), key=lambda x: x[1][key], reverse=not asc)[:n] # endregion # region Universe def SelectCoarse(self, coarse): filtered = [ x for x in coarse if x.Price < self.max_univ_price and x.Price > self.min_univ_price] filtered = [x for x in filtered if x.HasFundamentalData and x.MarketCap < self.max_market_cap and x.MarketCap > self.min_market_cap] chosen = [x.Symbol for x in filtered] if self.debug_lvl > 0: _symbol_str_list = [str(i) for i in chosen] self.Debug(f'Universe ( {len(chosen)} symbols )') self.symbol_list = chosen return self.symbol_list def SelectFine(self, fine): return [f.Symbol for f in fine] def on_securities_changed(self, changes): for security in changes.AddedSecurities: symbol = security.Symbol self.symbol_data[symbol] = Consolidator(symbol, self) for security in changes.RemovedSecurities: # Handle the removed securities if necessary if security.Symbol in self.symbol_data: self.unsubscribe(security.Symbol) def unsubscribe(self, symbol): self.RemoveSecurity(symbol) self.symbol_data.pop(symbol) # endregion def on_data(self, data: Slice): if not data.contains_key(self.bm): return n_symbols = len(self.symbol_data) for symbol, instance in self.symbol_data.items(): if not data.contains_key(symbol): continue trade_bar = data[symbol] instance.on_bar(trade_bar) maybe_rsi = instance.rsi.current.value if instance.warmed_up else 0 if self.debug_lvl >= 2: self.Debug(f"{self.time} -- {symbol}: warmed up? {instance.warmed_up} -- rsi {maybe_rsi}") if self.portfolio[symbol].invested: instance.profit_target() instance.stop_loss() # region Helpers def get_prior_closes(self, symbols: List[str]): prior_closes_by_symbol = {} daily_rth = self.History(symbols, timedelta(days=3), Resolution.Daily, extendedMarketHours=False) daily_eth = self.History(symbols, timedelta(days=3), Resolution.Daily, extendedMarketHours=True) # Use the latest available date in the data latest_rth = daily_rth.groupby('symbol')['close'].last() latest_eth = daily_eth.groupby('symbol')['close'].last() # Loop through symbols and store the prior closes in the result object for symbol in symbols: try: # Store prior RTH and ETH close for each symbol prior_closes_by_symbol[symbol] = { "prior_close_rth": latest_rth[symbol], "prior_close_eth": latest_eth[symbol] } except KeyError: self.Debug(f"Symbol {symbol} data not available for prior close calculation.") return prior_closes_by_symbol def fetch_volume_multiples(self, symbols: List[str]): # Fetch minute-level historical data for the past 7 days with extended market hours history_minute = self.History(symbols, timedelta(days=self.volume_lookback_days), Resolution.Minute, extendedMarketHours=True) history_minute_flat = history_minute.reset_index() if history_minute.empty: self.Debug("History minute data is empty") return pd.DataFrame() history_minute_flat = history_minute_flat.set_index('time') # Resample to hourly intervals for all symbols at once, summing the volume history_hourly_resampled = history_minute_flat.groupby('symbol').resample('H').agg({'volume': 'sum'}).reset_index() last_date = history_hourly_resampled['time'].dt.date.max() # Split data into 'today' (last available date) and 'before_today' for all symbols today_data = history_hourly_resampled[history_hourly_resampled['time'].dt.date == last_date].copy() before_today_data = history_hourly_resampled[history_hourly_resampled['time'].dt.date < last_date].copy() # Create 'hour' column for grouping purposes today_data['hour'] = today_data['time'].dt.hour before_today_data['hour'] = before_today_data['time'].dt.hour # Calculate the average volume per hour for past days, grouped by symbol and hour avg_volume_before_today = before_today_data.groupby(['symbol', 'hour'])['volume'].mean().unstack() # Calculate today's volume by hour for each symbol avg_volume_today = today_data.groupby(['symbol', 'hour'])['volume'].mean().unstack() # Reindex both DataFrames to ensure all hours (0-23) are present for each symbol all_hours = range(24) avg_volume_before_today = avg_volume_before_today.reindex(columns=all_hours, fill_value=np.nan) avg_volume_today = avg_volume_today.reindex(columns=all_hours, fill_value=np.nan) # Calculate the safe volume multiples (today's volume / average past volume) volume_multiples = avg_volume_today / avg_volume_before_today # Display the results vol_mults_safe = volume_multiples.T return vol_mults_safe def _check_today_in_hist(self): symbols = ["SPY"] today = self.Time.date() history = self.History(symbols, timedelta(days=3), Resolution.MINUTE) latest_date = history.index.get_level_values('time').date.max() if latest_date == today: self.Debug(f"History includes today's data: {latest_date}") else: self.Debug(f"History does not include today's data. Latest data is from: {latest_date}") # endregion