Overall Statistics
Total Orders
62
Average Win
7.24%
Average Loss
-2.52%
Compounding Annual Return
1824.749%
Drawdown
18.400%
Expectancy
0.523
Start Equity
100000
End Equity
127516.09
Net Profit
27.516%
Sharpe Ratio
12.108
Sortino Ratio
23.287
Probabilistic Sharpe Ratio
69.300%
Loss Rate
61%
Win Rate
39%
Profit-Loss Ratio
2.88
Alpha
15.015
Beta
-3.517
Annual Standard Deviation
1.161
Annual Variance
1.348
Information Ratio
11.656
Tracking Error
1.183
Treynor Ratio
-3.997
Total Fees
$0.00
Estimated Strategy Capacity
$5000.00
Lowest Capacity Asset
SNTG XPY9HNNNXVFP
Portfolio Turnover
157.50%
# region imports
from AlgorithmImports import *
# endregion

''' 
We need to do the 'gap' check -- lets do this with a scheduled event on open.
then we need to do a continuous check of pct daily change -- lets reference the open price (store it in the SymbolData, on OnStart event)

1% trailing
3% fixed stop.
Exit EOD

'''

from datetime import timedelta

class TechnicalUniverseAlgorithm(QCAlgorithm):

    gap_pct = 0 # Requires a gap > this, or a gap < -1 * this (daily gap)
    # day_chg_pct = 0 # requires a day pct change (open of day to current) > this, or < -1 * this.

    # this serves no purpose anymore.. (top_gaps)
    top_gaps = 50
    top_final = 1

    lvg = 1.0

    direction = -1
    # 1 == long only.
    # -1 == short only.

    # need to think more about this...
    # cont = 1

    stop_pct = -0.01

    hedge_pct = .05

    def Initialize(self):
        '''Initialise the data and resolution required, as well as the cash and start-end dates for your algorithm. All algorithms must initialized.'''

        # self.SetStartDate(2023,11,29)  #Set Start Date
        self.SetStartDate(2024, 1, 1)
        self.SetEndDate(2024,1,30)    #Set End Date
        self.SetCash(100000)           #Set Strategy Cash

        # Confused about universe resolution, why is it not minute?
        self.UniverseSettings.Resolution = Resolution.Daily
        self.UniverseSettings.Leverage = self.lvg 
        self.SetSecurityInitializer(self.CustomSecurityInitializer)


        self.bm = self.AddEquity('SPY').Symbol

        self.coarse_count = 10
        self.averages = { }
        
        # These are lists of SymbolData instances.
        self.Above = []
        self.Below = []
        self.BelowSymbols = []
        self.AboveSymbols = []

        self.Univ = []

        self.entries_by_date = {}
        self.warmed_up = False

        self.set_security_initializer(MySecurityInitializer(self.brokerage_model, FuncSecuritySeeder(self.get_last_known_prices)))

        self.SetWarmup(timedelta(days=1))
        
        self.AddUniverse(self.CoarseSelectionFilter)
        # self.AddUniverse(self.CoarseSelectionFilter, self.FineSelectionFunction)

        self.Schedule.On(self.DateRules.EveryDay("SPY"),
                 self.TimeRules.AfterMarketOpen("SPY", 1), # try 1 after? maybe 0?
                 self.OnMarketOpen)


        self.Schedule.On(self.DateRules.EveryDay("SPY"),
                 self.TimeRules.BeforeMarketClose("SPY", 1),
                 self.EOD)


    # region Schedule Handlers


    def CustomSecurityInitializer(self, security): 
        security.SetLeverage(self.lvg)

    def EOD(self):
        self.Liquidate()

    # Capture open of day metrics, and prune the universe.
    def OnMarketOpen(self):
        data = self.CurrentSlice
        if data is None: return

        both = self.Above + self.Below
        # self.Log(f'Both: {len(both)}') 
        for inst in both:
            symbol = inst.symbol
            # if not data.Bars.ContainsKey(symbol): continue  
            # bar = data.Bars[symbol]

            try:
                bar = data.Bars[symbol]
            except:
                # Why is there never data at these times, that is SO rare that there owuld be no data 5 min into the open.
                # self.Log(f'no data -- bc there never is {symbol}')
                # self.Log(f'{inst.last_close} -- {inst.sma.Current.Value}')
                continue 
            
            inst.day_open = bar.Open
            inst.open_volume = bar.Volume
            # self.Log(f'Gap Pct: {inst.GapPct}')
            if inst in self.Above:
                
                if inst.GapPct < self.gap_pct:
                    self.Above.remove(inst)
                    self.AboveSymbols.remove(symbol)
                    # TODO: could also remove from Securities, here -- make it 'faster'

            if inst in self.Below:
                if inst.GapPct > -1 * self.gap_pct:
                    self.Below.remove(inst)
                    self.BelowSymbols.remove(symbol)
        
        # self.Log(f'Top: {len(self.Above)}')
        # self.Log(f'Btm: {len(self.Below)}')

    # endregion 


    # region Universe 

    def CoarseSelectionFilter(self, coarse: List[Fundamental]):
        coarse = [x for x in coarse if x.CompanyProfile.SharesOutstanding > 1e6] # Maybe?
        coarse = [x for x in coarse if x.DollarVolume > 1e6 * x.AdjustedPrice]
        coarse = [x for x in coarse if x.CompanyProfile.MarketCap > 3e6]
        if not self.warmed_up:
            history = self.History([c.Symbol for c in coarse], 50, Resolution.Daily) # Why failing?
            for c in coarse:
                symbol = c.Symbol
                try:
                    # if symbol in df.index: 
                    df = history.loc[symbol]
                except:
                    try:
                        hist = self.History(symbol, 50, Resolution.Daily)
                        df = hist.loc[symbol]
                    except:
                        continue 

                for idx, row in df.iterrows():
                    if symbol not in self.averages:
                        self.averages[symbol] = SymbolData(symbol)

                    avg = self.averages[symbol]
                    avg.update(idx, row['close'])
            self.warmed_up = True

        else:
            for cf in coarse:
                if cf.Symbol not in self.averages:
                    self.averages[cf.Symbol] = SymbolData(cf.Symbol)

                # Updates the SymbolData object with current EOD price
                avg = self.averages[cf.Symbol]
                avg.update(cf.EndTime, cf.AdjustedPrice)

        # Filter the values of the dict: we only want up-trending securities
        self.Above = list(filter(lambda x: x.above, self.averages.values()))
        self.Below = list(filter(lambda x: x.below, self.averages.values()))

        self.AboveSymbols = [i.symbol for i in self.Above]
        self.BelowSymbols = [i.symbol for i in self.Below]

        # self.Log(f'Universe (Above): {len(self.Above)}')

        # we need to return only the symbol objects
        return [ x.symbol for x in self.Above + self.Below ]

    # def FineSelectionFunction(self, fine: List[FineFundamental]) -> List[str]:
    #     # Filter securities with a calculated market cap greater than $500 million
    #     filtered = [
    #         f.Symbol for f in fine if f.ValuationRatios.MarketCap > 5e8
    #     ]
    #     return filtered

    def OnSecuritiesChanged(self, changes):
        # liquidate removed securities
        for security in changes.RemovedSecurities:
            if security.Invested:
                self.Liquidate(security.Symbol)

        self.Log(f'Added: {len(changes.AddedSecurities)}')

    # endregion 


    def OnData(self, slice):
        self.ExitLogic()
        if self.time.date not in self.entries_by_date:
            self.EntryLogic()
            if self.portfolio.invested:
                self.entries_by_date[self.time.date] = True
    
    # region entry + exit 

    def EntryLogic(self):
        # if self.Portfolio.Invested: return 
        data = self.CurrentSlice 
        data = data.Bars
        both = self.Above + self.Below
        for inst in both:
            if data.ContainsKey(inst.symbol):
                c = data[inst.symbol].Close
                inst.DayPctReturn(c)

        # # Extra remove of 0's -- dangerous.
        n_set = len([i for i in both if i.GapPct > 0])
        n_set_below = len([i for i in both if i.GapPct < 0])
        self.Log(f'pos gaps: {n_set}')
        self.Log(f'neg gaps: {n_set_below}')

        if n_set == 0 and n_set_below == 0: return 

        largest_pos_gaps = [i for i in sorted(both, key=lambda x: x.GapPct, reverse=True)][:self.top_gaps] # rev = descending, want largest first.
        largest_neg_gaps = [i for i in sorted(both, key=lambda x: x.GapPct, reverse=False)][:self.top_gaps] # keep the smallest 10 (most negative)

        # top_symbols = [i.symbol for i in largest_pos_gaps]
        top_all = [(i.symbol, i.GapPct) for i in largest_pos_gaps]
        # if top_all != []:
        #     self.Log(f'Top: {top_all}')

        btm_all = [(i.symbol, i.GapPct) for i in largest_neg_gaps]
        # if btm_all != []:
        #     self.Log(f'Btm: {btm_all}')

        # Largest gaps up, we really want to SHORT them (I think)

        # we want to divide the leveraged amount between the top 

        allocation = (.99 * self.lvg - self.hedge_pct) / self.top_final
        long_short = self.direction == 0
        # (we use top twice, in long short -- so divide allocation by 2)
        if long_short: allocation /= 2

        # This is the point we know what we are buying, and at what scale.
        if not self.Portfolio.Invested:
            # Short Fades

            # THIS also conflicts with the direction! we need to change this to include cont.
            # short_fade_sign = 1 if self.cont else -1
            if self.direction <= 0:
                for symbol, gap in top_all[:self.top_final]:
                    self.SetHoldings(symbol, -1 * allocation)
                    self.log(f'SELLING large gap in {symbol}')
            
            # Long Fades
            if self.direction >= 0:
                for symbol, gap in btm_all[:self.top_final]:
                    self.SetHoldings(symbol, allocation)
                    self.log(f'BUYING large gap down in {symbol}')

        # hedge_alloc = self.hedge_pct * self.direction * -1
        # if long_short:
        #     self.SetHoldings(self.bm, hedge_alloc )

    
    def ExitLogic(self):
        invested = [symbol for symbol, holding in self.Portfolio.items() if holding.Invested]
        if self.stop_pct != 0:
            for symbol in invested:
                urpnl = self.portfolio[symbol].unrealized_profit_percent
                if urpnl < self.stop_pct:
                    self.liquidate(symbol, tag=f"SL -- {urpnl}")
                    # self.Log(f'Testing -- {symbol} pnl%: {urpnl}')


    # endregion 


class MySecurityInitializer(BrokerageModelSecurityInitializer):

    def __init__(self, brokerage_model: IBrokerageModel, security_seeder: ISecuritySeeder) -> None:
        super().__init__(brokerage_model, security_seeder)
    
    def initialize(self, security: Security) -> None:
        # First, call the superclass definition
        # This method sets the reality models of each security using the default reality models of the brokerage model
        super().initialize(security)

        # Next, overwrite some of the reality models        
        security.set_fee_model(ConstantFeeModel(0))    
         


class SymbolData(object):
    def __init__(self, symbol):
        self.symbol = symbol
        self.sma = SimpleMovingAverage(50)
        self.above = False
        self.below = False 
        self.last_close = None
        self.day_open = None
        self.day_ret_pct = None

    def update(self, time, value):
        self.sma.Update(time, value)
        self.last_close = value

        if self.sma.IsReady:
            self.above = value > self.sma.Current.Value 
            self.below = value < self.sma.Current.Value 
        
    @property
    def GapPct(self):
        if self.last_close and self.day_open:
            return ((self.day_open - self.last_close) / self.last_close) * 100
        else:
            return 0
    
    def DayPctReturn(self, current_close):
        if self.day_open:
            self.day_ret_pct = ((current_close - self.day_open) / self.day_open) * 100
            return self.day_ret_pct
        else:
            return 0
    
#region imports
from AlgorithmImports import *
#endregion


# Your New Python File

'''
for above_obj in self.Above[:]:
    symbol = above_obj.symbol
    if not data.Bars.ContainsKey(symbol): continue
    bar = data.Bars[symbol]
    above_obj.day_open = bar.Open

    if above_obj.GapPct < self.gap_pct:
        self.Above.remove(above_obj)
        self.AboveSymbols.remove(symbol)
    else:
        above_obj.open_volume = bar.Volume

for below_obj in self.Below[:]:
    symbol = below_obj.symbol
    if not data.Bars.ContainsKey(symbol): continue
    bar = data.Bars[symbol]
    below_obj.day_open = bar.Open
    if below_obj.GapPct > -1 * self.gap_pct: # Negative gap desired.
        self.Below.remove(above_obj)
        self.BelowSymbols.remove(symbol)
    else:
        below_obj.open_volume = bar.Volume

'''