Overall Statistics
Total Orders
498
Average Win
0.41%
Average Loss
-0.22%
Compounding Annual Return
-0.323%
Drawdown
2.500%
Expectancy
0.182
Start Equity
100000
End Equity
99033
Net Profit
-0.967%
Sharpe Ratio
-0.81
Sortino Ratio
-0.475
Probabilistic Sharpe Ratio
0.666%
Loss Rate
59%
Win Rate
41%
Profit-Loss Ratio
1.86
Alpha
-0.016
Beta
0.044
Annual Standard Deviation
0.012
Annual Variance
0
Information Ratio
-1.52
Tracking Error
0.095
Treynor Ratio
-0.217
Total Fees
$282.00
Estimated Strategy Capacity
$16000.00
Lowest Capacity Asset
SPXW 2ZV9BLSEJP7DA|SPX 31
Portfolio Turnover
0.09%
#region imports
from AlgorithmImports import *
#endregion

"""  
To support MANY symbols -- we would replace much of main with this.

we would create these symbols on equity/index and option add.
the UNIVERSE (set filter) stays in main.
self.strategies[underlying] = Consolidator( ... )


in on_data -- we loop through the self.strategies ... 
    call on_data, and pass it the slice. 

We REMOVE all regions, except order_handling, from main.

EXAMPLE:

# <Class variables>

underlying_universe = ['SPY','QQQ','IWM']


# <Initialize>

self.strategies = {}
for underlying in underlying_universe:
    # (special case for SPX -- or indexes probably -- diff list?)

    if underlying in ['SPX','VIX']:
        underlying_symbol = self.add_index(underlying).symbol
        option = self.add_index_option(underlying_symbol, f'{underlying}W')
    else:
        underlying_symbol = self.add_equity(underlying).symbol
        option = self.add_option(underlying)

    option.SetFilter(lambda x: x.IncludeWeeklys().Strikes(0, 100).Expiration(timedelta(days=0), timedelta(days=1)))
    self.strategies[underlying_symbol] = Consolidator(underlying_symbol, option.symbol, self)


# <OnData>

for underlying_symbol, strat in self.strategies.items():
    strat.on_data(slice)

"""

class Consolidator:

    def __init__(self, underlying_symbol, option_symbol, algo):
        # universe created in init 
        self.a = algo

        idx = underlying_symbol
        self.underlying_symbol = underlying_symbol
        self._symbol = option_symbol

        self._contracts = 1

        # Note -- we could simply pull this down via a history call (each day) -- but this is more efficient.
        self.cons = self.a.consolidate(idx, timedelta(minutes=15), self.on_15m)
        self.atr = AverageTrueRange(self.a.length, MovingAverageType.Simple)
        self.register_indicator(idx, self.atr, self.cons)
        self.warmup_data(idx)

        self.open_prices = {}
        self.long_ct = None 
        self.short_ct = None

    # region Data Handling
    
    def warmup_data(self, idx):
        """ 
         auto-warmup, with history -- need to warmup with sufficient bars, to cover n bars at x interval.
         (data formats changed -- be careful, volume NOT in the default history object)
        # https://www.quantconnect.com/docs/v2/research-environment/datasets/indices  
        """

        hist = self.a.history([idx], self.a.length * 15 + 1, Resolution.Minute).loc[idx]
        for row in hist.itertuples():
            bar = TradeBar(row.Index, idx, row.open, row.high, row.low, row.close, 0)
            self.cons.Update(bar)
        self.a.log(f'successfully warmed up.')

    def on_15m(self, bar: TradeBar) -> None:
        self.atr.update(bar)

    def on_data(self, data: Slice):
        bars = data.bars
        if not data.contains_key(self.underlying_symbol): return
        ul = bars[self.underlying_symbol]


        # first bar of day?
        if (ul.end_time).hour == 9 and (ul.end_time).minute == 31:
            open_price = ul.open
            self.open_prices[ul.end_time.date()] = open_price
            
            if self.IsReady:
                self.sell_price = open_price + self.atr.current.value * self.a.entry_strike_atrs
                self.buy_price = self.sell_price + self.atr.current.value * self.a.exit_strike_atrs
                self.a.log(f'open: {open_price}, atr: {self.atr.current.value}, sell: {self.sell_price}, buy: {self.buy_price}')
                
                # self.max_risk_pts = abs(self.sell_price - self.buy_price)
                self.ul_stop_lvl = self.buy_price

                if not self.a.portfolio[self.underlying_symbol].invested:
                    self.entry_logic(data)
                
                else:
                    self.exit_logic()

    # endregion

    # region Strategy Logic 

    def entry_logic(self, slice):
        """ 
            After recording the opening figures, the algorithm calculates the two components of the
            credit spread. Both legs of the spread share the same day of expiration and are of equal size.
            (Note: For testing purposes, both legs contain 1 contract each.) The strikes for each leg are
            determined as follows:

            - The Write Strike equals the Open Price of $XSP (9:30 AM EST) plus one ATR,
            rounded to the nearest whole dollar.

            - The Buy Strike equals the Write Strike plus 2 ATRs, rounded to the nearest whole dollar.

        """
        if self.a.Portfolio[self.underlying_symbol].Invested:
            return
        
        chain = slice.OptionChains.get(self._symbol)
        if not chain:
            return

        # nearest expiry
        expiry = sorted(chain, key=lambda x: x.Expiry)[0].Expiry

        self.a.log(f'date: {self.a.time.date()} -- expiry: {expiry}')
        
        calls = [i for i in chain if i.Expiry == expiry and i.Right == OptionRight.Call]
        if len(calls) < 1: return

        # sorted the contracts according to their strike prices 
        calls = sorted(calls, key=lambda x: x.Strike)

        sell_contract = sorted(calls, key=lambda x: abs(self.sell_price - x.strike))[0]
        buy_contract = sorted(calls, key=lambda x: abs(self.buy_price - x.strike))[0]

        self.short_ct = sell_contract
        self.long_ct = buy_contract

        self.a.log(f'symbol: {sell_contract.Symbol} -- {sell_contract.BidPrice} / {sell_contract.AskPrice}')
        self.a.log(f'symbol: {buy_contract.Symbol} -- {buy_contract.BidPrice} / {buy_contract.AskPrice}')

        self.max_gain, self.max_loss = self.credit_spread_max_gain_loss(buy_contract, sell_contract)

        self.initialCredit = self.max_gain

        self.a.Buy(buy_contract.Symbol, self._contracts)
        self.a.Sell(sell_contract.Symbol, self._contracts)


    def exit_logic(self):
        """
            An unrealized loss of 50% or more of the spread's maximum loss.
            ● An unrealized gain exceeding 90% of the spreads maximum gain.
            ● The underlying price of $XSP surpasses our stop loss level.
            Stop Loss Level = the Open Price of $XSP plus 2 ATRs.
        """
        # so -- we sold it for 100, can buy it back for 50. pnl is 50.
        # or -- we sold it for 100, can buy it back for 150, pnl is -50
        current_pnl = self.initialCredit - self.spread_price()

        # If our pnl if <= 50% of max loss -- stop out.
        stop_condition = current_pnl <= self.a.pct_of_max_loss_stop * self.max_loss
        if stop_condition:
            self.a.log(f'stop condition: {current_pnl} < {self.a.pct_of_max_loss_stop * self.max_loss} -- ({self.a.pct_of_max_loss_stop} * {self.max_loss})')
            self.a.liquidate(self.underlying_symbol, tag="SL")
        
        # if our pnl is >= 90% of max gain -- take profit.
        tgt_condition = current_pnl >= self.a.pct_of_max_gain_tgt * self.max_gain
        if tgt_condition:
            self.a.liquidate(self.underlying_symbol, tag="TP")
            self.a.log(f'tp condition: {current_pnl} >= {self.pct_of_max_gain_tgt * self.max_gain} -- ({self.a.pct_of_max_gain_tgt} * {self.max_gain})')


        ul_stop_condition = self.a.portfolio[self.underlying_symbol].price >= self.buy_price
        if ul_stop_condition:
            self.a.liquidate(self.underlying_symbol, tag="Underlying SL")
            self.a.log(f"tp condition: {ul_stop_condition} = {self.a.portfolio[self.underlying_symbol].price} >= {self.buy_price}")

    #endregion 

    # region Utilities


    def spread_price(self):
        """ Here we are really pricing the cost to exit """
        if self.long_ct and self.short_ct:
            # credit - debit 
            return self.short_ct.AskPrice - self.long_ct.BidPrice


    def credit_spread_max_gain_loss(self, buy, sell):
        """This returns in points -- share points -- not $$"""
        buyprice = buy.AskPrice
        sellprice = sell.BidPrice

        buystk = buy.Strike
        sellstk = sell.Strike

        stk_distance = abs(buystk - sellstk)
        credit = sellprice - buyprice
        max_gain = credit
        max_loss = (stk_distance) - credit

        self.a.log(f'Strike distance: {stk_distance}')
        self.a.log(f'max gain: {credit}')
        self.a.log(f'max loss: {max_loss}')
        return max_gain, max_loss


    @property
    def IsReady(self):
        return self.atr.is_ready


    #endregion 
# region imports
from AlgorithmImports import *
from consolidator import Consolidator
# endregion

"""
0DTE Put Write
dev: @zoakes

Commit Equivalent History
- initial commits, data handling, aggregation, consolidator + time bars.
- handling of pricing, risk, reward, and basics of entry logic.
- handling of orders, entries exits, and pricing wrt risk.


Version History:
0.1 -- Initial Build
0.2 -- Index support performs better -- use that.
0.3 -- Consolidator (supported, not needed) -- for multi-product design.


NOTES:
XSP is not supported, SPX is
https://www.quantconnect.com/docs/v2/writing-algorithms/datasets/algoseek/us-index-options#07-Supported-Assets

"""

class DeterminedSkyBlueFly(QCAlgorithm):

    # # can create custom alerts, and a mailing list here
    # # may start there -- we can extend as desired.
    to_notify = [
        "test@test.com"
    ]

    length = 130

    entry_strike_atrs = 1
    exit_strike_atrs = 2
    # exit strike atrs are counted from the entry strike 
    # -- i.e. it would be entry_strike_atrs + exit_strike_atrs from open


    pct_of_max_loss_stop = .5 
    # .5 == 50%

    pct_of_max_gain_tgt = .9
    # .9 == 90%

    option_sl = False
    ul_sl = False

    target_dte = 0

    _contracts = 1


    _option_type = OptionRight.PUT 
    # or OptionRight.CALL

    _debug_lvl = 3
    # higher debug_lvl is more details logged. (0 - 3)


    def Initialize(self):
        self.SetStartDate(2012,1,1)
        self.SetEndDate(2015,1,1)
        
        # self.SetStartDate(2022, 10, 30)
        # self.SetEndDate(2022,11,15)

        self.SetCash(100000)

        # Benchmark
        self.spy = self.AddEquity("SPY", Resolution.Minute).symbol
        self.set_benchmark(self.spy)
        self.universe_settings.data_normalization_mode = DataNormalizationMode.RAW

        # Index Options (SPX)
        # https://www.quantconnect.com/docs/v2/writing-algorithms/universes/index-options
        self.idx = self.add_index("SPX").symbol
        option = self.add_index_option(self.idx, "SPXW")
        self.underlyingsymbol = self.idx
        self._symbol = option.symbol


        # Equity Options (SPY)
        ## self.idx = self.add_equity("SPY")
        # self.idx = self.spy
        # option = self.add_option("SPY")
        # self.underlyingsymbol = self.idx
        # self._symbol = option.symbol

        lower_bound = 0 if self._option_type == OptionRight.CALL else -100
        upper_bound = 0 if self._option_type == OptionRight.PUT else 100

        # https://www.quantconnect.com/docs/v2/writing-algorithms/universes/equity-options
        option.SetFilter(
            lambda x: x.IncludeWeeklys()
                        .Strikes(lower_bound, upper_bound) 
                        .Expiration(
                            timedelta(days=self.target_dte), 
                            timedelta(days=self.target_dte + 1)
                        )
        )

        self.open_prices = {}
        self.long_ct = None 
        self.short_ct = None

        self.SetSecurityInitializer(self.reality_model)


        # Note -- we could simply pull this down via a history call (each day) -- but this is more efficient.
        self.cons = self.consolidate(self.idx, timedelta(minutes=15), self.on_15m)
        self.atr = AverageTrueRange(self.length, MovingAverageType.Simple)
        self.register_indicator(self.idx, self.atr, self.cons)
        self.warmup_data(self.idx)

    def reality_model(self, security):
        security.SetFeeModel(InteractiveBrokersFeeModel())

    # region Data Handling

    def warmup_data(self, idx):
        """ 
         auto-warmup, with history -- need to warmup with sufficient bars, to cover n bars at x interval.
         (data formats changed -- be careful, volume NOT in the default history object)
        # https://www.quantconnect.com/docs/v2/research-environment/datasets/indices  
        """

        hist = self.history([idx], self.length * 15 + 1, Resolution.Minute).loc[idx]
        for row in hist.itertuples():
            bar = TradeBar(row.Index, idx, row.open, row.high, row.low, row.close, 0)
            self.cons.Update(bar)
        self.log(f'successfully warmed up.')


    def on_15m(self, bar: TradeBar) -> None:
        self.atr.update(bar)
        
    
    def OnData(self, data: Slice):
        bars = data.bars
        if not data.contains_key(self.underlyingsymbol): return
        ul = bars[self.underlyingsymbol]


        # first bar of day?
        if (ul.end_time).hour == 9 and (ul.end_time).minute == 31:
            open_price = ul.open
            self.open_prices[ul.end_time.date()] = open_price
            
            # NOTE -------------- THIS is the prmary place the logic is flipped (here and universe filters)
            if self.IsReady:
                if self._option_type == OptionRight.PUT:
                    self.sell_price = open_price - self.atr.current.value * self.entry_strike_atrs
                    self.buy_price = self.sell_price - self.atr.current.value * self.exit_strike_atrs
                else:
                    self.sell_price = open_price + self.atr.current.value * self.entry_strike_atrs
                    self.buy_price = self.sell_price + self.atr.current.value * self.exit_strike_atrs


                if self._debug_lvl >= 2: 
                    self.log(f'open: {open_price}, atr: {self.atr.current.value}, \
                                sell: {self.sell_price}, buy: {self.buy_price}')
                
                # self.max_risk_pts = abs(self.sell_price - self.buy_price)
                self.ul_stop_lvl = self.buy_price

                if not self.portfolio.invested:
                    self.entry_logic(data)
                
                else:
                    self.exit_logic()

    #endregion

    # region Strategy Logic 


    def entry_logic(self, slice):
        """ 
        After recording the opening figures, the algorithm calculates the two components of the
        credit spread. Both legs of the spread share the same day of expiration and are of equal size.
        (Note: For testing purposes, both legs contain 1 contract each.) The strikes for each leg are
        determined as follows:

        - The Write Strike equals the Open Price of $XSP (9:30 AM EST) plus one ATR,
        rounded to the nearest whole dollar.

        - The Buy Strike equals the Write Strike plus 2 ATRs, rounded to the nearest whole dollar.

        """
        if self.Portfolio.Invested:
            return
        
        chain = slice.OptionChains.get(self._symbol)
        if not chain:
            return

        # nearest expiry
        expiry = sorted(chain, key=lambda x: x.Expiry)[0].Expiry

        if self._debug_lvl >= 2: self.log(f'date: {self.time.date()} -- expiry: {expiry}')
        
        ## CHANGED here too !  (to puts)
        chosen = [i for i in chain if i.Expiry == expiry and i.Right == self._option_type]
        if len(chosen) < 1: return

        sell_contract = sorted(chosen, key=lambda x: abs(self.sell_price - x.strike))[0]
        buy_contract = sorted(chosen, key=lambda x: abs(self.buy_price - x.strike))[0]

        self.short_ct = sell_contract
        self.long_ct = buy_contract

        if self._debug_lvl >= 2:
            self.log(f'symbol: {sell_contract.Symbol} -- {sell_contract.BidPrice} / {sell_contract.AskPrice}')
            self.log(f'symbol: {buy_contract.Symbol} -- {buy_contract.BidPrice} / {buy_contract.AskPrice}')

        self.max_gain, self.max_loss = self.credit_spread_max_gain_loss(buy_contract, sell_contract)

        self.initialCredit = self.max_gain

        self.Buy(buy_contract.Symbol, self._contracts)
        self.Sell(sell_contract.Symbol, self._contracts)
        
        if self._debug_lvl >= 1:
            self.log(f'selling {sell_contract.symbol} at {sell_contract.BidPrice}')
            self.log(f'buying {buy_contract.symbol} at {buy_contract.AskPrice}')


    def exit_logic(self):
        """
            An unrealized loss of 50% or more of the spread's maximum loss.
            ● An unrealized gain exceeding 90% of the spreads maximum gain.
            ● The underlying price of $XSP surpasses our stop loss level.
            Stop Loss Level = the Open Price of $XSP plus 2 ATRs.
        """
        # so -- we sold it for 100, can buy it back for 50. pnl is 50.
        # or -- we sold it for 100, can buy it back for 150, pnl is -50
        current_pnl = self.initialCredit - self.spread_price()

        if self.option_sl:
            # If our pnl if <= 50% of max loss -- stop out.
            stop_condition = current_pnl <= self.pct_of_max_loss_stop * self.max_loss
            if stop_condition:
                log_message = (
                    f'stop condition: {current_pnl} < {self.pct_of_max_loss_stop * self.max_loss}'
                    f'-- ({self.pct_of_max_loss_stop} * {self.max_loss})'
                )
                self.log(log_message)
                self.liquidate(tag="SL")
        
        # if our pnl is >= 90% of max gain -- take profit.
        tgt_condition = current_pnl >= self.pct_of_max_gain_tgt * self.max_gain
        if tgt_condition:
            self.liquidate(tag="TP")
            log_message = (
                f"tp condition: {current_pnl} >= {target_pnl} "
                f"-- ({self.pct_of_max_gain_tgt} * {self.max_gain})"
            )
            self.log(log_message)


        if self.ul_sl:
            ul_stop_condition = self.portfolio[self.underlyingsymbol].price >= self.buy_price
            if ul_stop_condition:
                self.liquidate(tag="Underlying SL")
                self.log(f"tp condition: {self.portfolio[self.underlyingsymbol].price} >= {self.buy_price}")


    # endregion 

    # region Utilities

    # Not in use, currently -- QC has alerting natively, dont think we need this.
    def notify_mailing_list(self, msg: str, subject: Optional[str] = None):
        """
        https://www.quantconnect.com/docs/v2/cloud-platform/live-trading/notifications
        # https://www.quantconnect.com/forum/discussion/11677/formatting-problems-with-self-notify-email/
        self.Notify.Email('email-id', 'Subject Text', 'Message Text', 'Attachment Text')
        """
        if not subject:
            subject = "QuantConnect Notification"

        for addr in self.to_notify:
            self.notify.email(addr, subject, msg, None, None)

            

    @property
    def IsReady(self):
        return self.atr.is_ready


    def spread_price(self):
        """ Here we are really pricing the cost to exit """
        if self.long_ct and self.short_ct:
            # credit - debit 
            return self.short_ct.AskPrice - self.long_ct.BidPrice


    def credit_spread_max_gain_loss(self, buy, sell):
        """This returns in points -- share points -- not $$"""
        buyprice = buy.AskPrice
        sellprice = sell.BidPrice

        buystk = buy.Strike
        sellstk = sell.Strike

        stk_distance = abs(buystk - sellstk)
        credit = sellprice - buyprice
        max_gain = credit
        max_loss = (stk_distance) - credit

        if self._debug_lvl >= 2:
            self.log(f'Strike distance: {stk_distance}')
            self.log(f'max gain: {credit}')
            self.log(f'max loss: {max_loss}')

        return max_gain, max_loss

    #endregion 

    # region Order Events
    
    def OnOrderEvent(self, order_event: OrderEvent):
        """
        https://www.quantconnect.com/docs/v2/writing-algorithms/trading-and-orders/order-events
        """
        order = self.transactions.get_order_by_id(order_event.order_id)
        if order_event.status == OrderStatus.FILLED:
            # Grab the fill price, etc.
            fill_price = order_event.fill_price
            fill_qty = order_event.fill_quantity
            dir = order_event.direction 

        self.Log(str(order_event))

    def on_assignment_order_event(self, assignment_event):
        """ 
        https://www.quantconnect.com/docs/v2/writing-algorithms/reality-modeling/options-models/assignment 
        """
        # Liquidate all positions related to the underlying symbol
        underlying_symbol = assignment_event.Symbol.Underlying
        self.Liquidate(underlying_symbol)
        self.log(f"Option assigned: {assignment_event.Symbol} -- flattening all {underlying_symbol}")

    #endregion
#region imports
from AlgorithmImports import *
#endregion


"""
class Garbage:

    def TradeOptions(self):
        # https://www.quantconnect.com/docs/v2/writing-algorithms/universes/index-options
        # seems to do it differently with set_filter -- this is slower, but maybe betteR?

        contracts = self.OptionChainProvider.GetOptionContractList(self.underlyingsymbol, self.Time.date())
        if len(contracts) > 1: 
            # get the NEAREST expiry available -- great. (I think this is right -- TEST)

            # TODO: why no 0 dte options?
            expiry = sorted(contracts,key = lambda x: x.ID.Date, reverse=False)[0].ID.Date
            self.log(f'now: {self.time.date()}  exp: {expiry}')

            # filter the call options from the contracts expires on that date
            call = [i for i in contracts if i.ID.Date == expiry and i.ID.OptionRight == 0]
            self.log(f'calls: {[i for i in call]}')


            # # get options that are nearest our target levels.
            self.call_to_sell = sorted(call, key=lambda x: abs(x.ID.StrikePrice - self.sell_price))[0]
            self.call_to_buy = sorted(call, key=lambda x: abs(x.ID.StrikePrice - self.buy_price))[0]
            
            sell = self.AddOptionContract(self.call_to_sell, Resolution.Minute)
            buy = self.AddOptionContract(self.call_to_buy, Resolution.Minute)

            # TODO: how do we get the prices?
            # https://www.quantconnect.com/docs/v2/writing-algorithms/securities/asset-classes/equity-options/requesting-data

            # # Works -- but it returns 0.0? 
            # try:
            #     self.log(f'sell price? : {sell.ask_price}')
            # except:
            #     self.log(f'nope2 ')


            # # works -- but it returns 0.0?
            # try:
            #     self.log(f'sell price? : {sell.AskPrice}')
            # except:
            #     self.log(f'nope4 ')

            # try:
            #     self.log(f'price? : {sell.price}')
            # except:
            #     self.log(f'nope 5')
            

            # # TODO: deal with position sizes... hmm.
            self.Buy(self.call_to_buy, 1)
            self.Sell(self.call_to_sell, 1)
            # Why arent these firing? 
            
            # QC seems to have made alot of changes, broken alot of things, 
            # and make alot of documentation partially accurate.


            # self.market_order(sell, -1)
            # self.market_order(buy, 1)

"""