Overall Statistics
Total Orders
2
Average Win
4.94%
Average Loss
0%
Compounding Annual Return
8021.440%
Drawdown
1.600%
Expectancy
0
Start Equity
100000
End Equity
104936.72
Net Profit
4.937%
Sharpe Ratio
132.787
Sortino Ratio
0
Probabilistic Sharpe Ratio
0%
Loss Rate
0%
Win Rate
100%
Profit-Loss Ratio
0
Alpha
61.431
Beta
-1.589
Annual Standard Deviation
0.452
Annual Variance
0.205
Information Ratio
120.982
Tracking Error
0.49
Treynor Ratio
-37.81
Total Fees
$53.66
Estimated Strategy Capacity
$300000.00
Lowest Capacity Asset
PRSC SRALEY945R39
Portfolio Turnover
47.06%
# region imports
from AlgorithmImports import *
# endregion

# Your New Python File


class Consolidator:
    period = 7

    def __init__(self, symbol, algo):
        self.symbol = symbol
        self.algo = algo
        self.tgt_pct = self.algo.naive_tgt_pct
        self.stp_pct = self.algo.naive_stp_pct
        self.rsi = self.algo.RSI(self.symbol, self.period, MovingAverageType.Wilders)

        # this runs on add, we cannot do this.
        # if self.algo.extended_hours and self.algo.res == Resolution.MINUTE:
        #     # we cant really warm it up here, unless we're using minute, and extended hours (only minute uses ETH)


    def warmup_rsi(self):
        if self.algo.extended_hours and self.algo.res == Resolution.MINUTE:
            history = self.algo.History([self.symbol], self.period + 3, self.algo.res, extendedMarketHours=self.algo.extended_hours).loc[self.symbol]
            for row in history.itertuples():
                self.rsi.Update(row.Index, row.close)
                self.algo.Debug(f'warming up... with {row.Index}, {row.close}')

    
    def profit_target(self):
        target_threshold = self.tgt_pct
        symbol = self.symbol 
        holding = self.algo.Portfolio[symbol]

        if target_threshold == 0.0: return 

        urpnl_pct = holding.UnrealizedProfitPercent
        # Check if the current price is above the target threshold
        if urpnl_pct  > target_threshold:
            self.algo.Debug(f"Flattening {symbol} -- {urpnl_pct} > {target_threshold}")
            self.algo.Liquidate(symbol, tag=f"TP  -- {urpnl_pct*100}% gain")

    def stop_loss(self):
        stp_threshold = self.stp_pct
        symbol = self.symbol 
        holding = self.algo.Portfolio[symbol]

        if stp_threshold == 0.0: return 

        stp_threshold = -1 * abs(stp_threshold)
        urpnl_pct = holding.UnrealizedProfitPercent
        # Check if the current price is above the target threshold
        if urpnl_pct  < stp_threshold:
            self.algo.Debug(f"Flattening {symbol} -- {urpnl_pct} < {stp_threshold}")
            self.algo.Liquidate(symbol, tag=f"SL  -- {urpnl_pct*100}%")

    def on_bar(self, bar):
        self.rsi.Update(bar.EndTime, bar.Close)


    @property
    def warmed_up(self):
        return self.rsi.IsReady

    @property
    def RSI(self):
        if self.warmed_up:
            return self.rsi.current.value
        else:
            return 0.0 # Bit of a hack
# region imports
from AlgorithmImports import *
# endregion


class Graveyard:


    # PROBABLY want to do this w scheduler, at 4am, 9:30am
    # DONT know if this will work! BC we may not have todays history data TODAY!
    def try_get_gaps(self, symbols: List[str], assert_date: bool = True):
        gap_data_by_symbol = {}
        today = self.Time.date()

        # Get daily data for prior close (RTH only)
        daily = self.History(symbols, timedelta(days=3), Resolution.Daily)
        history_minute = self.History(symbols, timedelta(days=3), Resolution.Minute)
        history_minute_eth = self.History(symbols, timedelta(days=3), Resolution.Minute, extendedMarketHours=True)

        # Get the latest date in the minute data (today's date)
        latest_minute_date = history_minute.index.get_level_values('time').date.max()

        # Check if latest minute data date matches today's date
        if latest_minute_date != today and assert_date:
            self.Debug(f"Warning: Data is not from today! Latest data date: {latest_minute_date}, Expected date: {today}")
            return None

        # Filter today's minute data for RTH and ETH
        today_minute_data_rth = history_minute[history_minute.index.get_level_values('time').date == latest_minute_date]
        today_minute_data_eth = history_minute_eth[history_minute_eth.index.get_level_values('time').date == latest_minute_date]
        prior_close_rth = daily.groupby('symbol')['close'].last()

        # Get the last close price from ETH for prior day
        prior_close_eth = history_minute_eth[history_minute_eth.index.get_level_values('time').date == latest_minute_date - timedelta(days=1)]
        prior_close_eth = prior_close_eth.groupby('symbol')['close'].last()

        # Get today's ETH open (first price of the day during ETH)
        today_open_eth = today_minute_data_eth.groupby('symbol')['open'].first()

        # Get today's RTH open (first price during regular trading hours)
        today_open_rth = today_minute_data_rth.groupby('symbol')['open'].first()
        # Loop through symbols and store all values in a result object
        for symbol in symbols:
            try:
                # Calculate the gaps
                gap_rth = ((today_open_rth[symbol] - prior_close_rth[symbol]) / prior_close_rth[symbol]) * 100
                gap_eth = ((today_open_eth[symbol] - prior_close_eth[symbol]) / prior_close_eth[symbol]) * 100

                # Store the calculated data for each symbol
                gap_data_by_symbol[symbol] = {
                    "gap_rth": gap_rth,
                    "gap_eth": gap_eth,
                    "today_open_rth": today_open_rth[symbol],
                    "today_open_eth": today_open_eth[symbol],
                    "prior_close_rth": prior_close_rth[symbol],
                    "prior_close_eth": prior_close_eth[symbol]
                }
                print(gap_data_by_symbol)
            except KeyError:
                self.Debug(f"Symbol {symbol} data not available for gap calculation.")

        return gap_data_by_symbol


    def RunGapCalculation(self):
        symbols = list(self.symbol_data.keys())
        gap_data = self.try_get_gaps(symbols)

        if gap_data:
            self.Debug(f"Gap Data: {gap_data}")
            self.gap_data = gap_data
        else:
            self.Debug("No gap data available.")
            self.gap_data = {}
# region imports
from AlgorithmImports import *
from Consolidator import Consolidator
# endregion


"""
Capitalization notes

mega cap    -- 200b+ 
    (big 5)
large cap   -- 10b - 200b 
    (mcdonalds, nike)

mid cap     -- 2b - 10b 
    (etsy, roku)

small cap   -- 300m - 2b
    (five below, cargurus)

micro cap    -- 50m - 300m
"""

market_cap_categories = {
    "Mega Cap": {"min": 2.00e11, "max": float('inf')},
    "Large Cap": {"min": 1.00e10, "max": 2.00e11},
    "Mid Cap": {"min": 2.00e9, "max": 1.00e10},
    "Small Cap": {"min": 3.00e8, "max": 2.00e9},
    "Micro Cap": {"min": 5.00e7, "max": 3.00e8},
    "Nano Cap": {"min": 0, "max": 5.00e7}
}


"""
TODO: replace this big stupid ass dictionary with our consolidator class + fields!
move some of the calculations to that class, as well, even as statics.

Add targets! I think stops are probably a bad idea, maybe impossible, but we can certainly try.
consider adding filters for signal, say high of day for longs, etc.

"""

class EnergeticVioletShark(QCAlgorithm):

    # Basic Universe Params
    max_univ_price = 20
    min_univ_price = 5

    max_market_cap = market_cap_categories['Micro Cap']['max']
    min_market_cap = market_cap_categories['Micro Cap']['min']



    # Finer Universe Filters 

    volume_lookback_days = 7
    # volume multiple is today vs avg of X days
    min_gap_pct = 15


    # Long filters
    min_premkt_vol_mult = 1.0
    min_regmkt_vol_mult = 1.5
    min_rsi = 50

    # We keep the top n1 by premkt volume multiple (todays premkt volume / avg premkt, by resolution, averaged)
    n1 = 40

    # we keep the top n2 by regmkt volume multiple (todays regmkt volume / avg regmkt, by resolution, averaged)
    n2 = 10

    # Short filter (not enabled yet)
    max_regmkt_vol_mult = 1.5

    # If target or stop is 0.0, not used.
    # .05 = 5%
    naive_tgt_pct = 0.05
    # naive stp pct can be positive or negative, it's taken as abs, and used as negative
    naive_stp_pct = 0.00

    res = Resolution.MINUTE

    # testing only

    debug_lvl = 1
    flip_short = False

    def initialize(self):
        self.set_start_date(2024, 10, 1)
        self.set_end_date(2024, 10, 5)

        self.set_cash(100_000)
        self.bm = self.add_equity("SPY", self.res).symbol
        self.AddUniverse(self.SelectCoarse, self.SelectFine)

        self.extended_hours = True

        # self.UniverseSettings.Resolution = self.res
        if self.extended_hours:
            self.universe_settings.extended_market_hours = True

        # self.Schedule.On(self.DateRules.EveryDay(self.bm), self.TimeRules.At(9, 31), self.RunGapCalculation)

        # self.Schedule.On(self.DateRules.EveryDay(self.bm), self.TimeRules.At(5, 00), self.AfterETHOpen) # DOES NOT WORK
        self.Schedule.On(self.DateRules.EveryDay(self.bm), self.TimeRules.At(9, 31), self.AfterRTHOpen)
        
        self.Schedule.On(self.DateRules.EveryDay("SPY"), self.TimeRules.BeforeMarketClose("SPY", 1), self.EODX)

        self.symbol_list = []
        self.symbol_data = {}
        self.gap_data = {}
        self.longs = {}
        self.shorts = {}

    # region Event Handlers 

    def EODX(self):
        self.liquidate(tag="EOD Exit")

    """
    def AfterETHOpen(self):
        symbols = list(self.symbol_data.keys())
        self.gap_data = self.get_prior_closes(symbols)

        data = self.current_slice
        if data.contains_key(self.bm):
            bars = data.bars
            for symbol in symbols:
                if data.contains_key(symbol):
                    eth_open = bars[symbol].open
                    self.gap_data[symbol]['eth_open'] = eth_open
                    self.Debug(f'updated {symbol} at time {self.Time} with eth open: {eth_open}')
                else:
                    self.Debug(f'failed to update {symbol} at time {self.time} -- dropping.')
                    # self.gap_data.pop(symbol)
                    # SO little will work in ETH, so don't do it.
        else:
            self.debug("no data... wtf (ETH)")
    """

    def AfterRTHOpen(self):
        symbols = list(self.symbol_data.keys())
        try:
            self.gap_data = self.get_prior_closes(symbols)
        except:
            self.Debug(f'failed to get any data... strange.')
            return 

        data = self.current_slice
        if data.contains_key(self.bm):
            bars = data.bars
            for symbol in symbols:
                if symbol not in self.gap_data: continue
                if data.contains_key(symbol):
                    _open = bars[symbol].open
                    _prior_rth_close = self.gap_data[symbol].get('prior_close_rth', None)
                    self.gap_data[symbol]['rth_open'] = _open
                    if _prior_rth_close:
                        _gap_rth_pct = ((_open - _prior_rth_close) / _prior_rth_close) * 100
                        if _gap_rth_pct > self.min_gap_pct:
                            self.gap_data[symbol]['rth_gap_pct'] = _gap_rth_pct
                            if self.debug_lvl > 2: self.Debug(f'{str(symbol)} --> viable gap pct: {_gap_rth_pct}')
                        else:
                            if self.debug_lvl > 2: self.Debug(f'{str(symbol)} dropped -- nonviable gap {_gap_rth_pct}')
                            self.gap_data.pop(symbol)
                            continue

                    if self.debug_lvl > 2: self.Debug(f'updated {symbol} at time {self.Time} with rth open: {_open}')

                    # RSI Checks
                    symbol_data = self.symbol_data.get(symbol, None)
                    if symbol_data:
                        # NOTE -- we could try this, warmup w extended NOW
                        # DOES not work.... strange.
                        # symbol_data.warmup_rsi()
                        rsi_value = symbol_data.RSI
                        if rsi_value > self.min_rsi:
                            self.gap_data[symbol]['rsi'] = rsi_value
                        else:
                            self.gap_data.pop(symbol, None)
                    else:
                        self.gap_data.pop(symbol, None)



                else:
                    # self.Debug(f'failed to update {symbol} at time {self.time} -- dropping')
                    if symbol in self.gap_data:
                        self.gap_data.pop(symbol)
                    if symbol in self.symbol_data:
                        self.unsubscribe(symbol)
        else:
            self.debug("no data... wtf")
            return 

        # ----------- NOTE -- we're now down with symbol data
        # thats taken us as far as we can get with traditional flow.
        # the rest is history call hacks to grab ETH data day of, prior to this moment.

        self.Debug(f'viable gappers: {len(self.gap_data)}')

        to_fetch = list(self.gap_data.keys())
        if len(to_fetch) == 0: 
            return 

        try:
            volume_multiples_df = self.fetch_volume_multiples(to_fetch).dropna()
        except:
            return

        if volume_multiples_df.empty:
            return 

        self.Debug(f'volume multiples df shape -- {volume_multiples_df.shape}')

        pre_mkt = volume_multiples_df.loc[4:8]
        reg_mkt = volume_multiples_df.loc[8:]

        # ---------------------------- ENTRY filter 
        pma = pre_mkt.mean()
        rma = reg_mkt.mean()

        maybe_longs = []
        maybe_shorts = []

        # Maybe easier to filter HERE, we should have this data now...
        for symbol in to_fetch:
            _pma = pma[symbol]
            _rma = rma[symbol]

            ok_for_long = _pma > self.min_premkt_vol_mult and _rma > self.min_regmkt_vol_mult
            ok_for_short = _pma > self.min_premkt_vol_mult and _rma < self.max_regmkt_vol_mult
            # NOTE -- we want to find things that are NOT in this _rma > min reg for SHORTS !
            if ok_for_long or ok_for_short:
                data = self.gap_data[symbol]
                data['volume_df'] = volume_multiples_df[symbol]
                data['pre_mkt_mult'] = _pma
                data['reg_mkt_mult'] = _rma
                if ok_for_long:
                    maybe_longs.append(symbol)
                
                if ok_for_short:
                    maybe_shorts.append(symbol)

            else:
                if symbol in self.gap_data:
                    self.gap_data.pop(symbol)
                if symbol in self.symbol_data:
                    self.unsubscribe(symbol)
        
        # LONG filtering! 
        # for longs we want to find the LARGEST mult reg mkt, WITH big pre mkt volume.
        # WE want something that is on pace for a 5x multiple, really.
        top_by_pma = self.get_top_by_key(self.gap_data, key='pre_mkt_mult', asc=False, n=self.n1)
        top_by_rma = self.get_top_by_key(self.gap_data, key='reg_mkt_mult', asc=False, n=self.n2)

        # Extract the symbols from both lists
        pma_symbols = {item[0] for item in top_by_pma} 
        rma_symbols = {item[0] for item in top_by_rma}
        common_symbols = pma_symbols & rma_symbols
        self.longs = {symbol: self.gap_data[symbol] for symbol in common_symbols if symbol in maybe_longs}

        # SHORTS we want to sort on the LARGEST gap, and find a WEAK volume for. it's a bit different of a pipeline, really.
        # self.shorts = {symbol: self.gap_data[symbol] for symbol in common_short_symbols if symbol in maybe_shorts}
        
        # NOW we need to do high of day tracking, probably -- fuck man.

        n_longs = len(self.longs)
        if n_longs == 0: return

        mult = -1 if self.flip_short else 1
        
        full_wt = .95
        wt = mult * full_wt / n_longs
        self.Debug(f'# entries: {n_longs}')
        for symbol, data in self.longs.items():
            self.Debug(f'buying: {data}')
            self.set_holdings(symbol, wt)




    
    @staticmethod
    def get_top_by_key(gap_data, key='reg_mkt_mult', asc=True, n=5):
        return sorted(gap_data.items(), key=lambda x: x[1][key], reverse=not asc)[:n]

    # endregion 

    # region Universe

    def SelectCoarse(self, coarse):
        filtered = [ x for x in coarse if x.Price < self.max_univ_price and x.Price > self.min_univ_price]
        filtered = [x for x in filtered if x.HasFundamentalData and x.MarketCap < self.max_market_cap and x.MarketCap > self.min_market_cap]
        chosen = [x.Symbol for x in filtered]
        if self.debug_lvl > 0:
            _symbol_str_list = [str(i) for i in chosen]
            self.Debug(f'Universe ( {len(chosen)} symbols )')
        self.symbol_list = chosen
        return self.symbol_list 
        
        
    def SelectFine(self, fine):
        return [f.Symbol for f in fine]


    def on_securities_changed(self, changes):
        for security in changes.AddedSecurities:
            symbol = security.Symbol
            self.symbol_data[symbol] = Consolidator(symbol, self)
        
        for security in changes.RemovedSecurities:
            # Handle the removed securities if necessary
            if security.Symbol in self.symbol_data:
                self.unsubscribe(security.Symbol)


    def unsubscribe(self, symbol):
        self.RemoveSecurity(symbol)
        self.symbol_data.pop(symbol)


    # endregion 

    def on_data(self, data: Slice):
        if not data.contains_key(self.bm): return

        n_symbols = len(self.symbol_data)
        for symbol, instance in self.symbol_data.items():
            if not data.contains_key(symbol): continue
            trade_bar = data[symbol]
            instance.on_bar(trade_bar)

            maybe_rsi = instance.rsi.current.value if instance.warmed_up else 0
            if self.debug_lvl >= 2: self.Debug(f"{self.time} -- {symbol}: warmed up? {instance.warmed_up} -- rsi {maybe_rsi}")
            if self.portfolio[symbol].invested:
                instance.profit_target()
                instance.stop_loss()


    # region Helpers 

    def get_prior_closes(self, symbols: List[str]):
        prior_closes_by_symbol = {}
        daily_rth = self.History(symbols, timedelta(days=3), Resolution.Daily, extendedMarketHours=False)
        daily_eth = self.History(symbols, timedelta(days=3), Resolution.Daily, extendedMarketHours=True)

        # Use the latest available date in the data
        latest_rth = daily_rth.groupby('symbol')['close'].last()
        latest_eth = daily_eth.groupby('symbol')['close'].last()

        # Loop through symbols and store the prior closes in the result object
        for symbol in symbols:
            try:
                # Store prior RTH and ETH close for each symbol
                prior_closes_by_symbol[symbol] = {
                    "prior_close_rth": latest_rth[symbol],
                    "prior_close_eth": latest_eth[symbol]
                }
            except KeyError:
                self.Debug(f"Symbol {symbol} data not available for prior close calculation.")

        return prior_closes_by_symbol

    def fetch_volume_multiples(self, symbols: List[str]):
        # Fetch minute-level historical data for the past 7 days with extended market hours
        history_minute = self.History(symbols, timedelta(days=self.volume_lookback_days), Resolution.Minute, extendedMarketHours=True)
        history_minute_flat = history_minute.reset_index()

        if history_minute.empty:
            self.Debug("History minute data is empty")
            return pd.DataFrame()

        history_minute_flat = history_minute_flat.set_index('time')

        # Resample to hourly intervals for all symbols at once, summing the volume
        history_hourly_resampled = history_minute_flat.groupby('symbol').resample('H').agg({'volume': 'sum'}).reset_index()


        last_date = history_hourly_resampled['time'].dt.date.max()

        # Split data into 'today' (last available date) and 'before_today' for all symbols
        today_data = history_hourly_resampled[history_hourly_resampled['time'].dt.date == last_date].copy()
        before_today_data = history_hourly_resampled[history_hourly_resampled['time'].dt.date < last_date].copy()

        # Create 'hour' column for grouping purposes
        today_data['hour'] = today_data['time'].dt.hour
        before_today_data['hour'] = before_today_data['time'].dt.hour

        # Calculate the average volume per hour for past days, grouped by symbol and hour
        avg_volume_before_today = before_today_data.groupby(['symbol', 'hour'])['volume'].mean().unstack()

        # Calculate today's volume by hour for each symbol
        avg_volume_today = today_data.groupby(['symbol', 'hour'])['volume'].mean().unstack()

        # Reindex both DataFrames to ensure all hours (0-23) are present for each symbol
        all_hours = range(24)
        avg_volume_before_today = avg_volume_before_today.reindex(columns=all_hours, fill_value=np.nan)
        avg_volume_today = avg_volume_today.reindex(columns=all_hours, fill_value=np.nan)

        # Calculate the safe volume multiples (today's volume / average past volume)
        volume_multiples = avg_volume_today / avg_volume_before_today

        # Display the results
        vol_mults_safe = volume_multiples.T
        return vol_mults_safe

    def _check_today_in_hist(self):
        symbols = ["SPY"]
        today = self.Time.date()
        history = self.History(symbols, timedelta(days=3), Resolution.MINUTE)
        latest_date = history.index.get_level_values('time').date.max()
        if latest_date == today:
            self.Debug(f"History includes today's data: {latest_date}")
        else:
            self.Debug(f"History does not include today's data. Latest data is from: {latest_date}")


    # endregion