Overall Statistics
Total Orders
10
Average Win
5.24%
Average Loss
0%
Compounding Annual Return
1061373.816%
Drawdown
1.100%
Expectancy
0
Start Equity
100000000
End Equity
128913643.93
Net Profit
28.914%
Sharpe Ratio
2426.933
Sortino Ratio
0
Probabilistic Sharpe Ratio
100.000%
Loss Rate
0%
Win Rate
100%
Profit-Loss Ratio
0
Alpha
1410.047
Beta
-2.748
Annual Standard Deviation
0.58
Annual Variance
0.337
Information Ratio
2228.393
Tracking Error
0.632
Treynor Ratio
-512.552
Total Fees
$358698.81
Estimated Strategy Capacity
$310000.00
Lowest Capacity Asset
CNXA XZBZTQ4K6NHH
Portfolio Turnover
74.01%
# region imports
from AlgorithmImports import *
# endregion

# Your New Python File


class Consolidator:

    def __init__(self, symbol, algo):
        self.symbol = symbol
        self.algo = algo
        self.tgt_pct = self.algo.naive_tgt_pct


    def profit_target(self):
        target_threshold = self.tgt_pct
        symbol = self.symbol 
        holding = self.algo.Portfolio[symbol]

        urpnl_pct = holding.UnrealizedProfitPercent
        # Check if the current price is above the target threshold
        if urpnl_pct  > target_threshold:
            self.algo.Debug(f"Flattening {symbol} -- {urpnl_pct} > {target_threshold}")
            self.algo.Liquidate(symbol, tag=f"TP  -- {urpnl_pct*100}% gain")
# region imports
from AlgorithmImports import *
# endregion


class Graveyard:


    # PROBABLY want to do this w scheduler, at 4am, 9:30am
    # DONT know if this will work! BC we may not have todays history data TODAY!
    def try_get_gaps(self, symbols: List[str], assert_date: bool = True):
        gap_data_by_symbol = {}
        today = self.Time.date()

        # Get daily data for prior close (RTH only)
        daily = self.History(symbols, timedelta(days=3), Resolution.Daily)
        history_minute = self.History(symbols, timedelta(days=3), Resolution.Minute)
        history_minute_eth = self.History(symbols, timedelta(days=3), Resolution.Minute, extendedMarketHours=True)

        # Get the latest date in the minute data (today's date)
        latest_minute_date = history_minute.index.get_level_values('time').date.max()

        # Check if latest minute data date matches today's date
        if latest_minute_date != today and assert_date:
            self.Debug(f"Warning: Data is not from today! Latest data date: {latest_minute_date}, Expected date: {today}")
            return None

        # Filter today's minute data for RTH and ETH
        today_minute_data_rth = history_minute[history_minute.index.get_level_values('time').date == latest_minute_date]
        today_minute_data_eth = history_minute_eth[history_minute_eth.index.get_level_values('time').date == latest_minute_date]
        prior_close_rth = daily.groupby('symbol')['close'].last()

        # Get the last close price from ETH for prior day
        prior_close_eth = history_minute_eth[history_minute_eth.index.get_level_values('time').date == latest_minute_date - timedelta(days=1)]
        prior_close_eth = prior_close_eth.groupby('symbol')['close'].last()

        # Get today's ETH open (first price of the day during ETH)
        today_open_eth = today_minute_data_eth.groupby('symbol')['open'].first()

        # Get today's RTH open (first price during regular trading hours)
        today_open_rth = today_minute_data_rth.groupby('symbol')['open'].first()
        # Loop through symbols and store all values in a result object
        for symbol in symbols:
            try:
                # Calculate the gaps
                gap_rth = ((today_open_rth[symbol] - prior_close_rth[symbol]) / prior_close_rth[symbol]) * 100
                gap_eth = ((today_open_eth[symbol] - prior_close_eth[symbol]) / prior_close_eth[symbol]) * 100

                # Store the calculated data for each symbol
                gap_data_by_symbol[symbol] = {
                    "gap_rth": gap_rth,
                    "gap_eth": gap_eth,
                    "today_open_rth": today_open_rth[symbol],
                    "today_open_eth": today_open_eth[symbol],
                    "prior_close_rth": prior_close_rth[symbol],
                    "prior_close_eth": prior_close_eth[symbol]
                }
                print(gap_data_by_symbol)
            except KeyError:
                self.Debug(f"Symbol {symbol} data not available for gap calculation.")

        return gap_data_by_symbol


    def RunGapCalculation(self):
        symbols = list(self.symbol_data.keys())
        gap_data = self.try_get_gaps(symbols)

        if gap_data:
            self.Debug(f"Gap Data: {gap_data}")
            self.gap_data = gap_data
        else:
            self.Debug("No gap data available.")
            self.gap_data = {}
# region imports
from AlgorithmImports import *
from Consolidator import Consolidator
# endregion


"""
Capitalization notes

mega cap    -- 200b+ 
    (big 5)
large cap   -- 10b - 200b 
    (mcdonalds, nike)

mid cap     -- 2b - 10b 
    (etsy, roku)

small cap   -- 300m - 2b
    (five below, cargurus)

micro cap    -- 50m - 300m
"""

market_cap_categories = {
    "Mega Cap": {"min": 2.00e11, "max": float('inf')},
    "Large Cap": {"min": 1.00e10, "max": 2.00e11},
    "Mid Cap": {"min": 2.00e9, "max": 1.00e10},
    "Small Cap": {"min": 3.00e8, "max": 2.00e9},
    "Micro Cap": {"min": 5.00e7, "max": 3.00e8},
    "Nano Cap": {"min": 0, "max": 5.00e7}
}


"""
TODO: replace this big stupid ass dictionary with our consolidator class + fields!
move some of the calculations to that class, as well, even as statics.

Add targets! I think stops are probably a bad idea, maybe impossible, but we can certainly try.
consider adding filters for signal, say high of day for longs, etc.

"""

class EnergeticVioletShark(QCAlgorithm):

    # Basic Universe Params
    max_univ_price = 20
    min_univ_price = 5

    max_market_cap = market_cap_categories['Micro Cap']['max']
    min_market_cap = 0 # market_cap_categories['Micro Cap']['min']


    # Finer Universe Filters 

    volume_lookback_days = 7 
    # volume multiple is today vs avg of X days
    min_gap_pct = 15


    # Long filters
    min_premkt_vol_mult = 1.5
    min_regmkt_vol_mult = 1.5

    # Short filter (not enabled yet)
    max_regmkt_vol_mult = 1.5

    # .05 = 5%?
    naive_tgt_pct = .05

    res = Resolution.HOUR

    debug_lvl = 1

    def initialize(self):
        self.set_start_date(2024, 10, 1)
        self.set_start_date(2024, 10, 2)
        self.set_cash(100_000_000)
        self.bm = self.add_equity("SPY", self.res).symbol
        self.AddUniverse(self.SelectCoarse, self.SelectFine)

        self.extended_hours = True

        # self.UniverseSettings.Resolution = self.res
        if self.extended_hours:
            self.universe_settings.extended_market_hours = True

        # self.Schedule.On(self.DateRules.EveryDay(self.bm), self.TimeRules.At(9, 31), self.RunGapCalculation)

        # self.Schedule.On(self.DateRules.EveryDay(self.bm), self.TimeRules.At(5, 00), self.AfterETHOpen) # DOES NOT WORK
        self.Schedule.On(self.DateRules.EveryDay(self.bm), self.TimeRules.At(9, 31), self.AfterRTHOpen)
        

        self.Schedule.On(self.DateRules.EveryDay("SPY"), self.TimeRules.BeforeMarketClose("SPY", 1), self.EODX)

        self.symbol_list = []
        self.symbol_data = {}
        self.gap_data = {}
        self.longs = {}
        self.shorts = {}

    # region Event Handlers 

    def EODX(self):
        self.liquidate(tag="EOD Exit")

    """
    def AfterETHOpen(self):
        symbols = list(self.symbol_data.keys())
        self.gap_data = self.get_prior_closes(symbols)

        data = self.current_slice
        if data.contains_key(self.bm):
            bars = data.bars
            for symbol in symbols:
                if data.contains_key(symbol):
                    eth_open = bars[symbol].open
                    self.gap_data[symbol]['eth_open'] = eth_open
                    self.Debug(f'updated {symbol} at time {self.Time} with eth open: {eth_open}')
                else:
                    self.Debug(f'failed to update {symbol} at time {self.time} -- dropping.')
                    # self.gap_data.pop(symbol)
                    # SO little will work in ETH, so don't do it.
        else:
            self.debug("no data... wtf (ETH)")
    """

    def AfterRTHOpen(self):
        symbols = list(self.symbol_data.keys())
        self.gap_data= self.get_prior_closes(symbols)
        data = self.current_slice
        if data.contains_key(self.bm):
            bars = data.bars
            for symbol in symbols:
                if data.contains_key(symbol):
                    _open = bars[symbol].open
                    _prior_rth_close = self.gap_data[symbol].get('prior_close_rth', None)
                    self.gap_data[symbol]['rth_open'] = _open
                    if _prior_rth_close:
                        _gap_rth_pct = ((_open - _prior_rth_close) / _prior_rth_close) * 100
                        if _gap_rth_pct > self.min_gap_pct:
                            self.gap_data[symbol]['rth_gap_pct'] = _gap_rth_pct
                            if self.debug_lvl > 2: self.Debug(f'{str(symbol)} --> viable gap pct: {_gap_rth_pct}')
                        else:
                            if self.debug_lvl > 2: self.Debug(f'{str(symbol)} dropped -- nonviable gap {_gap_rth_pct}')
                            self.gap_data.pop(symbol)

                    if self.debug_lvl > 2: self.Debug(f'updated {symbol} at time {self.Time} with rth open: {_open}')

                else:
                    self.Debug(f'failed to update {symbol} at time {self.time} -- dropping')
                    # could drop this.
                    self.gap_data.pop(symbol)
        else:
            self.debug("no data... wtf")
            return 

        self.Debug(f'viable gappers: {len(self.gap_data)}')

        to_fetch = list(self.gap_data.keys())
        if len(to_fetch) == 0: 
            return 

        volume_multiples_df = self.fetch_volume_multiples(to_fetch).dropna()
        if volume_multiples_df.empty:
            return 

        self.Debug(f'volume multiples df shape -- {volume_multiples_df.shape}')

        pre_mkt = volume_multiples_df.loc[4:8]
        reg_mkt = volume_multiples_df.loc[8:]

        # ---------------------------- ENTRY filter 
        pma = pre_mkt.mean()
        rma = reg_mkt.mean()

        maybe_longs = []
        maybe_shorts = []

        # Maybe easier to filter HERE, we should have this data now...
        for symbol in to_fetch:
            _pma = pma[symbol]
            _rma = rma[symbol]

            ok_for_long = _pma > self.min_premkt_vol_mult and _rma > self.min_regmkt_vol_mult
            ok_for_short = _pma > self.min_premkt_vol_mult and _rma < self.max_regmkt_vol_mult
            # NOTE -- we want to find things that are NOT in this _rma > min reg for SHORTS !
            if ok_for_long or ok_for_short:
                data = self.gap_data[symbol]
                data['volume_df'] = volume_multiples_df[symbol]
                data['pre_mkt_mult'] = _pma
                data['reg_mkt_mult'] = _rma
                if ok_for_long:
                    maybe_longs.append(symbol)
                
                if ok_for_short:
                    maybe_shorts.append(symbol)

            else:
                self.gap_data.pop(symbol)
        
        # LONG filtering! 
        # for longs we want to find the LARGEST large mult reg mkt, WITH big pre mkt volume.
        # WE want something that is on pace for a 5x multiple, really.
        top_by_pma = self.get_top_by_key(self.gap_data, key='pre_mkt_mult', asc=False, n=20)
        top_by_rma = self.get_top_by_key(self.gap_data, key='reg_mkt_mult', asc=False, n=5)

        # Extract the symbols from both lists
        pma_symbols = {item[0] for item in top_by_pma} 
        rma_symbols = {item[0] for item in top_by_rma}
        common_symbols = pma_symbols & rma_symbols
        self.longs = {symbol: self.gap_data[symbol] for symbol in common_symbols if symbol in maybe_longs}

        # SHORTS we want to sort on the LARGEST gap, and find a WEAK volume for. it's a bit different of a pipeline, really.
        # self.shorts = {symbol: self.gap_data[symbol] for symbol in common_short_symbols if symbol in maybe_shorts}
        
        # NOW we need to do high of day tracking, probably -- fuck man.

        n_longs = len(self.longs)
        self.Debug(f'# longs: {n_longs}')
        for symbol, data in self.longs.items():
            self.Debug(f'buying: {data}')
            self.set_holdings(symbol, .95 / n_longs)




    
    @staticmethod
    def get_top_by_key(gap_data, key='reg_mkt_mult', asc=True, n=5):
        return sorted(gap_data.items(), key=lambda x: x[1][key], reverse=not asc)[:n]

    # endregion 

    # region Universe

    def SelectCoarse(self, coarse):
        filtered = [ x for x in coarse if x.Price < self.max_univ_price and x.Price > self.min_univ_price]
        filtered = [x for x in filtered if x.HasFundamentalData and x.MarketCap < self.max_market_cap and x.MarketCap > self.min_market_cap]
        chosen = [x.Symbol for x in filtered]
        if self.debug_lvl > 0:
            _symbol_str_list = [str(i) for i in chosen]
            self.Debug(f'Universe ( {len(chosen)} symbols )')
        self.symbol_list = chosen
        return self.symbol_list 
        
        
    def SelectFine(self, fine):
        return [f.Symbol for f in fine]


    def on_securities_changed(self, changes):
        for security in changes.AddedSecurities:
            symbol = security.Symbol
            self.symbol_data[symbol] = Consolidator(symbol, self)
        
        for security in changes.RemovedSecurities:
            # Handle the removed securities if necessary
            if security.Symbol in self.symbol_data:
                self.unsubscribe(security.Symbol)


    def unsubscribe(self, symbol):
        self.RemoveSecurity(symbol)
        self.symbol_data.pop(symbol)


    # endregion 

    def on_data(self, data: Slice):
        if not data.contains_key(self.bm): return

        n_symbols = len(self.symbol_data)
        for symbol, instance in self.symbol_data.items():
            if not data.contains_key(symbol): continue
            if self.portfolio[symbol].invested:
                instance.profit_target()


    # region Helpers 

    def get_prior_closes(self, symbols: List[str]):
        prior_closes_by_symbol = {}
        daily_rth = self.History(symbols, timedelta(days=3), Resolution.Daily, extendedMarketHours=False)
        daily_eth = self.History(symbols, timedelta(days=3), Resolution.Daily, extendedMarketHours=True)

        # Use the latest available date in the data
        latest_rth = daily_rth.groupby('symbol')['close'].last()
        latest_eth = daily_eth.groupby('symbol')['close'].last()

        # Loop through symbols and store the prior closes in the result object
        for symbol in symbols:
            try:
                # Store prior RTH and ETH close for each symbol
                prior_closes_by_symbol[symbol] = {
                    "prior_close_rth": latest_rth[symbol],
                    "prior_close_eth": latest_eth[symbol]
                }
            except KeyError:
                self.Debug(f"Symbol {symbol} data not available for prior close calculation.")

        return prior_closes_by_symbol

    def fetch_volume_multiples(self, symbols: List[str]):
        # Fetch minute-level historical data for the past 7 days with extended market hours
        history_minute = self.History(symbols, timedelta(days=self.volume_lookback_days), Resolution.Minute, extendedMarketHours=True)
        history_minute_flat = history_minute.reset_index()

        if history_minute.empty:
            self.Debug("History minute data is empty")
            return pd.DataFrame()

        history_minute_flat = history_minute_flat.set_index('time')

        # Resample to hourly intervals for all symbols at once, summing the volume
        history_hourly_resampled = history_minute_flat.groupby('symbol').resample('H').agg({'volume': 'sum'}).reset_index()


        last_date = history_hourly_resampled['time'].dt.date.max()

        # Split data into 'today' (last available date) and 'before_today' for all symbols
        today_data = history_hourly_resampled[history_hourly_resampled['time'].dt.date == last_date].copy()
        before_today_data = history_hourly_resampled[history_hourly_resampled['time'].dt.date < last_date].copy()

        # Create 'hour' column for grouping purposes
        today_data['hour'] = today_data['time'].dt.hour
        before_today_data['hour'] = before_today_data['time'].dt.hour

        # Calculate the average volume per hour for past days, grouped by symbol and hour
        avg_volume_before_today = before_today_data.groupby(['symbol', 'hour'])['volume'].mean().unstack()

        # Calculate today's volume by hour for each symbol
        avg_volume_today = today_data.groupby(['symbol', 'hour'])['volume'].mean().unstack()

        # Reindex both DataFrames to ensure all hours (0-23) are present for each symbol
        all_hours = range(24)
        avg_volume_before_today = avg_volume_before_today.reindex(columns=all_hours, fill_value=np.nan)
        avg_volume_today = avg_volume_today.reindex(columns=all_hours, fill_value=np.nan)

        # Calculate the safe volume multiples (today's volume / average past volume)
        volume_multiples = avg_volume_today / avg_volume_before_today

        # Display the results
        vol_mults_safe = volume_multiples.T
        return vol_mults_safe

    def _check_today_in_hist(self):
        symbols = ["SPY"]
        today = self.Time.date()
        history = self.History(symbols, timedelta(days=3), Resolution.MINUTE)
        latest_date = history.index.get_level_values('time').date.max()
        if latest_date == today:
            self.Debug(f"History includes today's data: {latest_date}")
        else:
            self.Debug(f"History does not include today's data. Latest data is from: {latest_date}")


    # endregion