Overall Statistics |
Total Orders 10 Average Win 1.52% Average Loss -0.91% Compounding Annual Return 168.796% Drawdown 16.100% Expectancy 0.609 Start Equity 100000000 End Equity 102746024.17 Net Profit 2.746% Sharpe Ratio 7.044 Sortino Ratio 0 Probabilistic Sharpe Ratio 78.315% Loss Rate 40% Win Rate 60% Profit-Loss Ratio 1.68 Alpha 1.656 Beta -0.865 Annual Standard Deviation 0.157 Annual Variance 0.025 Information Ratio 2.062 Tracking Error 0.225 Treynor Ratio -1.275 Total Fees $321572.15 Estimated Strategy Capacity $750000.00 Lowest Capacity Asset CNXA XZBZTQ4K6NHH Portfolio Turnover 76.01% |
# region imports from AlgorithmImports import * # endregion # Your New Python File class Consolidator: def __init__(self, symbol, algo): self.symbol = symbol self.algo = algo
# region imports from AlgorithmImports import * # endregion class Graveyard: # PROBABLY want to do this w scheduler, at 4am, 9:30am # DONT know if this will work! BC we may not have todays history data TODAY! def try_get_gaps(self, symbols: List[str], assert_date: bool = True): gap_data_by_symbol = {} today = self.Time.date() # Get daily data for prior close (RTH only) daily = self.History(symbols, timedelta(days=3), Resolution.Daily) history_minute = self.History(symbols, timedelta(days=3), Resolution.Minute) history_minute_eth = self.History(symbols, timedelta(days=3), Resolution.Minute, extendedMarketHours=True) # Get the latest date in the minute data (today's date) latest_minute_date = history_minute.index.get_level_values('time').date.max() # Check if latest minute data date matches today's date if latest_minute_date != today and assert_date: self.Debug(f"Warning: Data is not from today! Latest data date: {latest_minute_date}, Expected date: {today}") return None # Filter today's minute data for RTH and ETH today_minute_data_rth = history_minute[history_minute.index.get_level_values('time').date == latest_minute_date] today_minute_data_eth = history_minute_eth[history_minute_eth.index.get_level_values('time').date == latest_minute_date] prior_close_rth = daily.groupby('symbol')['close'].last() # Get the last close price from ETH for prior day prior_close_eth = history_minute_eth[history_minute_eth.index.get_level_values('time').date == latest_minute_date - timedelta(days=1)] prior_close_eth = prior_close_eth.groupby('symbol')['close'].last() # Get today's ETH open (first price of the day during ETH) today_open_eth = today_minute_data_eth.groupby('symbol')['open'].first() # Get today's RTH open (first price during regular trading hours) today_open_rth = today_minute_data_rth.groupby('symbol')['open'].first() # Loop through symbols and store all values in a result object for symbol in symbols: try: # Calculate the gaps gap_rth = ((today_open_rth[symbol] - prior_close_rth[symbol]) / prior_close_rth[symbol]) * 100 gap_eth = ((today_open_eth[symbol] - prior_close_eth[symbol]) / prior_close_eth[symbol]) * 100 # Store the calculated data for each symbol gap_data_by_symbol[symbol] = { "gap_rth": gap_rth, "gap_eth": gap_eth, "today_open_rth": today_open_rth[symbol], "today_open_eth": today_open_eth[symbol], "prior_close_rth": prior_close_rth[symbol], "prior_close_eth": prior_close_eth[symbol] } print(gap_data_by_symbol) except KeyError: self.Debug(f"Symbol {symbol} data not available for gap calculation.") return gap_data_by_symbol def RunGapCalculation(self): symbols = list(self.symbol_data.keys()) gap_data = self.try_get_gaps(symbols) if gap_data: self.Debug(f"Gap Data: {gap_data}") self.gap_data = gap_data else: self.Debug("No gap data available.") self.gap_data = {}
# region imports from AlgorithmImports import * from Consolidator import Consolidator # endregion """ Capitalization notes mega cap -- 200b+ (big 5) large cap -- 10b - 200b (mcdonalds, nike) mid cap -- 2b - 10b (etsy, roku) small cap -- 300m - 2b (five below, cargurus) micro cap -- 50m - 300m """ market_cap_categories = { "Mega Cap": {"min": 2.00e11, "max": float('inf')}, "Large Cap": {"min": 1.00e10, "max": 2.00e11}, "Mid Cap": {"min": 2.00e9, "max": 1.00e10}, "Small Cap": {"min": 3.00e8, "max": 2.00e9}, "Micro Cap": {"min": 5.00e7, "max": 3.00e8}, "Nano Cap": {"min": 0, "max": 5.00e7} } class EnergeticVioletShark(QCAlgorithm): max_univ_price = 20 min_univ_price = 5 # TODO: fit these! max_market_cap = market_cap_categories['Micro Cap']['max'] # TEMP min_market_cap = 0 # market_cap_categories['Micro Cap']['min'] volume_lookback_days = 7 # LONG only, for now. min_premkt_vol_mult = 1.5 min_regmkt_vol_mult = 1.5 # SHORT filter max_regmkt_vol_mult = 1.5 min_gap_pct = 15 # volume multiple is today vs avg of X days res = Resolution.HOUR debug_lvl = 1 def initialize(self): self.set_start_date(2024, 10, 1) self.set_start_date(2024, 10, 2) self.set_cash(100_000_000) self.bm = self.add_equity("SPY", self.res).symbol self.AddUniverse(self.SelectCoarse, self.SelectFine) self.extended_hours = True # self.UniverseSettings.Resolution = self.res if self.extended_hours: self.universe_settings.extended_market_hours = True # self.Schedule.On(self.DateRules.EveryDay(self.bm), self.TimeRules.At(9, 31), self.RunGapCalculation) # self.Schedule.On(self.DateRules.EveryDay(self.bm), self.TimeRules.At(5, 00), self.AfterETHOpen) # DOES NOT WORK self.Schedule.On(self.DateRules.EveryDay(self.bm), self.TimeRules.At(9, 31), self.AfterRTHOpen) self.Schedule.On(self.DateRules.EveryDay("SPY"), self.TimeRules.BeforeMarketClose("SPY", 1), self.EODX) self.symbol_list = [] self.symbol_data = {} self.gap_data = {} self.longs = {} self.shorts = {} def EODX(self): self.liquidate(tag="EOD Exit") """ def AfterETHOpen(self): symbols = list(self.symbol_data.keys()) self.gap_data = self.get_prior_closes(symbols) data = self.current_slice if data.contains_key(self.bm): bars = data.bars for symbol in symbols: if data.contains_key(symbol): eth_open = bars[symbol].open self.gap_data[symbol]['eth_open'] = eth_open self.Debug(f'updated {symbol} at time {self.Time} with eth open: {eth_open}') else: self.Debug(f'failed to update {symbol} at time {self.time} -- dropping.') # self.gap_data.pop(symbol) # SO little will work in ETH, so don't do it. else: self.debug("no data... wtf (ETH)") """ def AfterRTHOpen(self): symbols = list(self.symbol_data.keys()) self.gap_data= self.get_prior_closes(symbols) data = self.current_slice if data.contains_key(self.bm): bars = data.bars for symbol in symbols: if data.contains_key(symbol): _open = bars[symbol].open _prior_rth_close = self.gap_data[symbol].get('prior_close_rth', None) self.gap_data[symbol]['rth_open'] = _open if _prior_rth_close: _gap_rth_pct = ((_open - _prior_rth_close) / _prior_rth_close) * 100 if _gap_rth_pct > self.min_gap_pct: self.gap_data[symbol]['rth_gap_pct'] = _gap_rth_pct if self.debug_lvl > 2: self.Debug(f'{str(symbol)} --> viable gap pct: {_gap_rth_pct}') else: if self.debug_lvl > 2: self.Debug(f'{str(symbol)} dropped -- nonviable gap {_gap_rth_pct}') self.gap_data.pop(symbol) if self.debug_lvl > 2: self.Debug(f'updated {symbol} at time {self.Time} with rth open: {_open}') else: self.Debug(f'failed to update {symbol} at time {self.time} -- dropping') # could drop this. self.gap_data.pop(symbol) else: self.debug("no data... wtf") return self.Debug(f'viable gappers: {len(self.gap_data)}') to_fetch = list(self.gap_data.keys()) if len(to_fetch) == 0: return volume_multiples_df = self.fetch_volume_multiples(to_fetch) if volume_multiples_df.empty: return self.Debug(f'volume multiples df shape -- {volume_multiples_df.shape}') pre_mkt = volume_multiples_df.loc[4:8] reg_mkt = volume_multiples_df.loc[8:] # ---------------------------- ENTRY filter pma = pre_mkt.mean() rma = reg_mkt.mean() maybe_longs = [] maybe_shorts = [] # Maybe easier to filter HERE, we should have this data now... for symbol in to_fetch: _pma = pma[symbol] _rma = rma[symbol] ok_for_long = _pma > self.min_premkt_vol_mult and _rma > self.min_regmkt_vol_mult ok_for_short = _pma > self.min_premkt_vol_mult and _rma < self.max_regmkt_vol_mult # NOTE -- we want to find things that are NOT in this _rma > min reg for SHORTS ! if ok_for_long or ok_for_short: data = self.gap_data[symbol] data['volume_df'] = volume_multiples_df[symbol] data['pre_mkt_mult'] = _pma data['reg_mkt_mult'] = _rma if ok_for_long: maybe_longs.append(symbol) if ok_for_short: maybe_shorts.append(symbol) else: self.gap_data.pop(symbol) # LONG filtering! # for longs we want to find the LARGEST large mult reg mkt, WITH big pre mkt volume. # WE want something that is on pace for a 5x multiple, really. top_by_pma = self.get_top_by_key(self.gap_data, key='pre_mkt_mult', asc=False, n=20) top_by_rma = self.get_top_by_key(self.gap_data, key='reg_mkt_mult', asc=False, n=5) # Extract the symbols from both lists pma_symbols = {item[0] for item in top_by_pma} rma_symbols = {item[0] for item in top_by_rma} common_symbols = pma_symbols & rma_symbols self.longs = {symbol: self.gap_data[symbol] for symbol in common_symbols if symbol in maybe_longs} # SHORTS we want to sort on the LARGEST gap, and find a WEAK volume for. it's a bit different of a pipeline, really. # self.shorts = {symbol: self.gap_data[symbol] for symbol in common_short_symbols if symbol in maybe_shorts} # NOW we need to do high of day tracking, probably -- fuck man. n_longs = len(self.longs) self.Debug(f'# longs: {n_longs}') for symbol, data in self.longs.items(): self.Debug(f'buying: {data}') self.set_holdings(symbol, .95 / n_longs) @staticmethod def get_top_by_key(gap_data, key='reg_mkt_mult', asc=True, n=5): return sorted(gap_data.items(), key=lambda x: x[1][key], reverse=not asc)[:n] # region Universe def SelectCoarse(self, coarse): filtered = [ x for x in coarse if x.Price < self.max_univ_price and x.Price > self.min_univ_price] filtered = [x for x in filtered if x.HasFundamentalData and x.MarketCap < self.max_market_cap and x.MarketCap > self.min_market_cap] chosen = [x.Symbol for x in filtered] if self.debug_lvl > 0: _symbol_str_list = [str(i) for i in chosen] self.Debug(f'Universe ( {len(chosen)} symbols )') self.symbol_list = chosen return self.symbol_list def SelectFine(self, fine): return [f.Symbol for f in fine] def on_securities_changed(self, changes): for security in changes.AddedSecurities: symbol = security.Symbol self.symbol_data[symbol] = Consolidator(symbol, self) for security in changes.RemovedSecurities: # Handle the removed securities if necessary if security.Symbol in self.symbol_data: self.unsubscribe(security.Symbol) def unsubscribe(self, symbol): self.RemoveSecurity(symbol) self.symbol_data.pop(symbol) # endregion def on_data(self, data: Slice): if not data.contains_key(self.bm): return n_symbols = len(self.symbol_data) for symbol, instance in self.symbol_data.items(): if not data.contains_key(symbol): continue # self.set_holdings(symbol, .01) # HIGH of day tracking. # for symbol, data in self.longs.items(): # region Helpers def get_prior_closes(self, symbols: List[str]): prior_closes_by_symbol = {} daily_rth = self.History(symbols, timedelta(days=3), Resolution.Daily, extendedMarketHours=False) daily_eth = self.History(symbols, timedelta(days=3), Resolution.Daily, extendedMarketHours=True) # Use the latest available date in the data latest_rth = daily_rth.groupby('symbol')['close'].last() latest_eth = daily_eth.groupby('symbol')['close'].last() # Loop through symbols and store the prior closes in the result object for symbol in symbols: try: # Store prior RTH and ETH close for each symbol prior_closes_by_symbol[symbol] = { "prior_close_rth": latest_rth[symbol], "prior_close_eth": latest_eth[symbol] } except KeyError: self.Debug(f"Symbol {symbol} data not available for prior close calculation.") return prior_closes_by_symbol def fetch_volume_multiples(self, symbols: List[str]): # Fetch minute-level historical data for the past 7 days with extended market hours history_minute = self.History(symbols, timedelta(days=self.volume_lookback_days), Resolution.Minute, extendedMarketHours=True) history_minute_flat = history_minute.reset_index() if history_minute.empty: self.Debug("History minute data is empty") return pd.DataFrame() history_minute_flat = history_minute_flat.set_index('time') # Resample to hourly intervals for all symbols at once, summing the volume history_hourly_resampled = history_minute_flat.groupby('symbol').resample('H').agg({'volume': 'sum'}).reset_index() last_date = history_hourly_resampled['time'].dt.date.max() # Split data into 'today' (last available date) and 'before_today' for all symbols today_data = history_hourly_resampled[history_hourly_resampled['time'].dt.date == last_date].copy() before_today_data = history_hourly_resampled[history_hourly_resampled['time'].dt.date < last_date].copy() # Create 'hour' column for grouping purposes today_data['hour'] = today_data['time'].dt.hour before_today_data['hour'] = before_today_data['time'].dt.hour # Calculate the average volume per hour for past days, grouped by symbol and hour avg_volume_before_today = before_today_data.groupby(['symbol', 'hour'])['volume'].mean().unstack() # Calculate today's volume by hour for each symbol avg_volume_today = today_data.groupby(['symbol', 'hour'])['volume'].mean().unstack() # Reindex both DataFrames to ensure all hours (0-23) are present for each symbol all_hours = range(24) avg_volume_before_today = avg_volume_before_today.reindex(columns=all_hours, fill_value=np.nan) avg_volume_today = avg_volume_today.reindex(columns=all_hours, fill_value=np.nan) # Calculate the safe volume multiples (today's volume / average past volume) volume_multiples = avg_volume_today / avg_volume_before_today # Display the results vol_mults_safe = volume_multiples.T return vol_mults_safe def _check_today_in_hist(self): symbols = ["SPY"] today = self.Time.date() history = self.History(symbols, timedelta(days=3), Resolution.MINUTE) latest_date = history.index.get_level_values('time').date.max() if latest_date == today: self.Debug(f"History includes today's data: {latest_date}") else: self.Debug(f"History does not include today's data. Latest data is from: {latest_date}") # endregion