Overall Statistics
Total Trades
336
Average Win
0.58%
Average Loss
-0.04%
Compounding Annual Return
1.358%
Drawdown
3.000%
Expectancy
0.967
Net Profit
7.118%
Sharpe Ratio
0.631
Probabilistic Sharpe Ratio
14.558%
Loss Rate
86%
Win Rate
14%
Profit-Loss Ratio
13.37
Alpha
0.007
Beta
0.028
Annual Standard Deviation
0.015
Annual Variance
0
Information Ratio
-0.465
Tracking Error
0.174
Treynor Ratio
0.336
Total Fees
$336.00
Estimated Strategy Capacity
$0
Lowest Capacity Asset
AAPL YCSFRQ6LAL52|AAPL R735QTJ8XC9X
Portfolio Turnover
0.03%
# region imports
from AlgorithmImports import *
import datetime as dt
import ast
import json
from Selection.FundamentalUniverseSelectionModel import FundamentalUniverseSelectionModel 
from io import StringIO
import pandas as pd
from collections import defaultdict
# endregion

class AdaptableVioletJaguar(QCAlgorithm):

    def Initialize(self):
        self.SetStartDate(2018, 1, 1)  # Set Start Date
        self.SetCash(50000)  # Set Strategy Cash
        
        self.tickers = ["AAPL"]
        
        self.AddUniverse(self.CoarseSelection, self.FineSelection)
        self.UniverseSettings.DataNormalizationMode = DataNormalizationMode.Raw
        
        self.symbolDataBySymbol = {}
        self.fineFundamentals = {}

        self.SAVE_EARNINGS = False
        self.loaded_earnings = {}

        self.slice = None

        if not self.SAVE_EARNINGS:
            self.SetEndDate(2023, 2, 4)
            earnings = json.loads(self.ObjectStore.Read("Earnings_dates"))
            
            temp_earnings = dict(earnings)
            for ticker, str_dates in earnings.items():
                temp_earnings[ticker] = [dt.datetime.strptime(x, '%m/%d/%Y').date() for x in ast.literal_eval(str_dates)]

            self.loaded_earnings = dict(temp_earnings)
            self.Debug(temp_earnings)
            del temp_earnings
            del earnings
            
        elif self.SAVE_EARNINGS and self.ObjectStore.ContainsKey("Earnings_dates"):
            self.SetEndDate(2023, 2, 10)
            self.ObjectStore.Delete("Earnings_dates")     

        self.spy = self.AddEquity("SPY", Resolution.Daily)

        self.Schedule.On(self.DateRules.EveryDay("SPY"),
                 self.TimeRules.AfterMarketOpen("SPY", -10),
                 self.CheckEarningsRules)

        self.Schedule.On(self.DateRules.EveryDay("SPY"),
                 self.TimeRules.BeforeMarketClose("SPY", 60),
                 self.UpdateEarningsObject)

        if not self.SAVE_EARNINGS:
            self.Schedule.On(self.DateRules.EveryDay("SPY"),
                    self.TimeRules.BeforeMarketClose("SPY", 190),
                    self.AddOptions)

            self.Schedule.On(self.DateRules.EveryDay("SPY"),
                    self.TimeRules.BeforeMarketClose("SPY", 90),
                    self.TradeOptions)
                    
            self.Schedule.On(self.DateRules.EveryDay("SPY"),
                    self.TimeRules.BeforeMarketClose("SPY", 150),
                    self.CheckExits)

            self.Schedule.On(self.DateRules.EveryDay("SPY"),
                    self.TimeRules.BeforeMarketClose("SPY", 30),
                    self.AppendIV)

    def AppendIV(self):
        for symbol, symbolData in self.symbolDataBySymbol.items():
            for contract, contract_object in symbolData.all_contracts.items():
                contract_object.update_IV()

    def UpdateEarningsObject(self):
        if self.SAVE_EARNINGS:
            for symbol, symbolData in self.symbolDataBySymbol.items():
                symbolData.update_earnings(self.fineFundamentals[symbol].EarningReports.FileDate)

    # Long Entry Criteria
    #    at least 30 days since the last earnings report
    #    over 30 days until the next earnings report
    #
    # Short Entry Criteria
    #    less than 10 days until the next earnings report
    #
    # Short Exit Criteria
    #    over 30 days since last earnings report
    #
    def CheckEarningsRules(self):
        if self.SAVE_EARNINGS: return

        cur_date = self.Time.date()

        for symbol, symbolData in self.symbolDataBySymbol.items():
            symbol_earnings = symbolData.earnings_dates
            symbol_earnings_series = pd.Series(symbol_earnings)
            insertion_index = symbol_earnings_series.searchsorted(cur_date) ## Result is the index at which it will exit in the next list

            prev_earnings_date = symbol_earnings[insertion_index-1]
            next_earnings_date = symbol_earnings[insertion_index]  ## WHEN RUNNING ALGORITHM, MAKE SURE self.EndDate IS LESS THAN THE LAST EARNINGS VALUE

            delta_prev = cur_date - prev_earnings_date
            delta_next = next_earnings_date - cur_date

            if delta_prev.days >= 30 and delta_next.days > 30:
                symbolData.long_entry_earnings = True
            else:
                symbolData.long_entry_earnings = False

            if delta_next.days < 10:
                symbolData.short_entry_earnings = True
            else:
                symbolData.short_entry_earnings = False

            if delta_prev.days >= 30:
                symbolData.k = True
            else:
                symbolData.short_exit_earnings = False

            if symbolData.long_entry_earnings or symbolData.short_entry_earnings:

                self.Debug(f"{insertion_index} {self.Time.date()} {symbol.Value} : long etr - {symbolData.long_entry_earnings}, short etr - {symbolData.short_entry_earnings}, short ext - {symbolData.short_exit_earnings}")

    def ContractFilter(self, symbol, min_strike, max_strike, min_expiry_days, max_expiry_days):
        contracts = self.OptionChainProvider.GetOptionContractList(symbol, self.Time.date())
        if len(contracts) == 0 : return []
        
        contract_list = [i for i in contracts if min_expiry_days < (i.ID.Date.date() - self.Time.date()).days < max_expiry_days]
        
        if len(contract_list) == 0: return []
        
        min_strike_price = sorted(contract_list, key = lambda x: abs(x.ID.StrikePrice - min_strike))[0].ID.StrikePrice
        max_strike_price = sorted(contract_list, key = lambda x: abs(x.ID.StrikePrice - max_strike))[0].ID.StrikePrice
        
        strike_list = sorted(set([i.ID.StrikePrice for i in contract_list]))
                
        min_strike_rank = strike_list.index(min_strike_price)
        max_strike_rank = strike_list.index(max_strike_price)
        
        try: 
            strikes = strike_list[min_strike_rank:max_strike_rank]
        except:
            strikes = strike_list
        
        filtered_contracts = [i for i in contract_list if i.ID.StrikePrice in strikes]

        return filtered_contracts 

    def AddContract(self, slice, symbol, symbolData, position):
        security_price = self.Securities[symbol].Price
        lower_atr_strike = (symbolData.atr.Current.Value * 8) + security_price
        upper_atr_strike = (symbolData.atr.Current.Value * 12) + security_price
        cur_atr_strike = (symbolData.atr.Current.Value * 10) + security_price
        cur_atr = symbolData.atr.Current.Value
        if position == 'long':
            # self.Debug('long')
            # self.Debug(f"p: {security_price}, atr_l: {lower_atr_strike}, atr_u: {upper_atr_strike}, atr_c: {cur_atr_strike}, atr: {cur_atr}")
            filtered_contracts = self.ContractFilter(symbol, lower_atr_strike, upper_atr_strike, 300, 400)
        elif position == 'short':
            # self.Debug('short')
            # self.Debug(f"atr_l: {lower_atr_strike}, atr_u: {upper_atr_strike}, atr_c: {cur_atr_strike}, atr: {cur_atr}")
            filtered_contracts = self.ContractFilter(symbol, lower_atr_strike, upper_atr_strike, 20, 30)

        if len(filtered_contracts) == 0: 
            return []
        else:
            calls = [x for x in filtered_contracts if x.ID.OptionRight == OptionRight.Call] 
            test = [x.ID.StrikePrice for x in calls]
            test2 = [x.ID.Date.date() for x in calls]
            self.Debug(f"Strikes: {test}")
            self.Debug(f"Expirys: {test2}")

            contracts = sorted(sorted(calls, key = lambda x: abs(self.Securities[symbol].Price- x.ID.StrikePrice)), 
                                            key = lambda x: x.ID.Date, reverse=True)
            if len(contracts) == 0:
                return str()

            for contract in contracts:
                if contract not in self.symbolDataBySymbol[symbol].all_contracts:
                    option = self.AddOptionContract(contract, Resolution.Hour)
                    option.PriceModel = OptionPriceModels.BjerksundStensland()
                    historical_IV = self.GetHistoricalIV(contract, symbol)
                    self.symbolDataBySymbol[symbol].all_contracts[contract] = OptionContractIV(contract, historical_IV, position, symbol, self)

    def GetHistoricalIV(self, option_symbol, equity_symbol):
        Expiry = []
        Strike = []
        Type = []
        Ticker = []
        EndDate = []
        IV = []

        df_dict = defaultdict(list)

        requests = []
        history_securities = []

        look_back = 200
        values = []
        t_requests = []
        option_security = None

        # FOR TESTING PURPOSES
        ##############
        # for security in self.Securities.Values:
        #     all_symbol = security.Symbol
        #     if all_symbol == option_symbol:
        #         option_security = security

        # while len(values) == 0 or look_back > 100:
        #     for subscription in option_security.Subscriptions:
        #         t_requests.append(HistoryRequest(subscription, security.Exchange.Hours, self.Time.date()-timedelta(150), self.StartDate))
        #     t_history = self.History(t_requests)
        #     for s in t_history:
        #         for bar in s.Bars.Values:
        #             values.append(bar)
        #             break
        #     look_back -= 1
        ###################


        for security in self.Securities.Values:
            all_symbol = security.Symbol
            if all_symbol == option_symbol or all_symbol == equity_symbol:
                history_securities.append(security)

        ### RESOLUTION OF HISTORY REQUEST WILL BE WHAT THE SECURITY RESOLUTION IS
        for security in sorted(history_securities, key=lambda x: x.Type):
            for subscription in security.Subscriptions:
                requests.append(HistoryRequest(subscription, security.Exchange.Hours, self.Time.date()-timedelta(look_back), self.StartDate))
        history = self.History(requests)


        ### SOMETIMES THERE IS NO OPTION HISTORICAL DATA. WHY ONLY SOMETIMES????
        prev_date = None
        for s in history:
            underlying_price = None
            underlying_volatility = None

            for bar in s.QuoteBars.Values:
                self.Securities[bar.Symbol].SetMarketPrice(bar)

            for bar in s.Bars.Values:
                symbol = bar.Symbol
                security = self.Securities[symbol]
                security.SetMarketPrice(bar)
                
                if security.Type == SecurityType.Equity:
                    underlying_price = security.Price
                    continue
                
                ### WILL GET THE IV AT 10:00 (HOUR RESOLUTION)), MAYBE SHOULD TRY TO GET EOD?
                if len(df_dict['EndDate']) == 0 or bar.EndTime.date() > df_dict['EndDate'][-1].date():
                    contract = OptionContract.Create(symbol, symbol.Underlying, bar.EndTime, security, underlying_price)
                    contract.LastPrice = bar.Close
                    lastPrice = contract.LastPrice
                    result = security.PriceModel.Evaluate(security, None, contract)

                    IV.append(float(result.ImpliedVolatility))
                    df_dict['Expiry'].append(symbol.ID.Date)
                    df_dict['Strike'].append(symbol.ID.StrikePrice)
                    df_dict['EndDate'].append(bar.EndTime)
                    df_dict['IV'].append(result.ImpliedVolatility)

        df = pd.DataFrame.from_dict(df_dict)

        # FOR DEBUGGING PURPOSES SPECIFICALLY FOR SETTING BREAKPOINT
        ###
        # if len(df.index) == 0:
        #     self.Debug("")
        ###

        return df


    def AddOptions(self):
        for symbol, symbolData in self.symbolDataBySymbol.items():
            if symbolData.long_call == None and symbolData.long_entry_earnings:
                self.AddContract(self.slice, symbol, symbolData, 'long')
            if symbolData.short_call == None and symbolData.long_entry_earnings:
                self.AddContract(self.slice, symbol, symbolData, 'short')

    def TradeOptions(self):
        if self.SAVE_EARNINGS: return
        for symbol, symbolData in self.symbolDataBySymbol.items():
            for contract, contract_object in symbolData.all_contracts.items():
                if symbolData.long_call == None and symbolData.long_entry_earnings:
                    symbolData.make_trade_option_constraints(contract, 'long', self.slice.OptionChains.Values)
                if symbolData.short_call == None and symbolData.long_call != None and symbolData.short_entry_earnings:
                    symbolData.make_trade_option_constraints(contract, 'short', self.slice.OptionChains.Values)

    def CheckExits(self):
        if self.SAVE_EARNINGS: return
        remove_contracts = {'long':False, 'short':False}
        pop_out_of_universe = []
        for symbol, symbolData in self.symbolDataBySymbol.items():
            for contract, contract_object in symbolData.all_contracts.items():
                if symbolData.long_call == contract: # and self.Portfolio[contract].Invested:
                    exit_df = contract_object.option_exit_data()
                    pnl = exit_df['Pnl']
                    dt_expiry = exit_df['Dt_Expiry']
                    self.Debug(exit_df)
                    if pnl >= 1 or pnl <= -0.5 or dt_expiry <= 90:
                        self.Liquidate(contract)
                        remove_contracts['long'] = True
                        symbolData.long_call = None
                        self.Debug('Liquidating Long Contract')

                elif symbolData.short_call == contract: # and self.Portfolio[contract].Invested:
                    exit_df = contract_object.option_exit_data()
                    pnl = exit_df['Pnl']
                    dt_expiry = exit_df['Dt_Expiry']
                    self.Debug(exit_df)
                    if pnl >= 0.5 or self.Securities[symbol].Price > contract.Strke or symbolData.long_call == None: # NEED TO ADD 30 DAYS SINCE LAST EARNINGS REPORT
                        self.Liquidate(contract)
                        remove_contracts['short'] = True
                        symbolData.short_call = None
                        self.Debug('Liquidating Short Contract')

                ### NEED TO IMPLEMENT STRIKE OTM REMOVAL LOGIC
                ### IE. IF CONTRACT NO LONGER FITS STRIKE OTM 10xATR, THEN REMOVE
                elif contract_object.position == 'long':
                    if contract_object.option_exit_data()['Dt_Expiry'] < 300:
                        pop_out_of_universe.append(contract)
                        self.RemoveOptionContract(contract)

                elif contract_object.position == 'short':
                    if contract_object.option_exit_data()['Dt_Expiry'] < 20:
                        pop_out_of_universe.append(contract)
                        self.RemoveOptionContract(contract)

        for position, status in remove_contracts.items():
            if status == True:
                remove = []
                for symbol, symbolData in self.symbolDataBySymbol.items():
                    remove = [k for k,v in symbolData.all_contracts.items() if v.position == 'long']
                for contract in remove:
                    self.Debug(f"Removing : {contract} : {contract.SecurityType}")
                    self.symbolDataBySymbol[contract.Underlying].all_contracts.pop(contract)
                    self.RemoveOptionContract(contract)

    def OnData(self, data: Slice):
        self.slice = data
        
    def OnSecuritiesChanged(self, changes):
        for security in changes.AddedSecurities:
            if security.Symbol.Value == "SPY" or security.Symbol.SecurityType != SecurityType.Equity: continue

            # NEED TO LOOK INTO THE IMPACT OF THIS ON IV, SPECIFICALLY THE RESOLUTION
            self.Securities[security.Symbol].VolatilityModel = StandardDeviationOfReturnsVolatilityModel(30)#, Resolution.Daily) 
            # WITH SOME SECURITIES SETTING TO RAW DIDNT WORK, FIXED BY JUST SETTING UNIVERSE TO RAW
            # security.SetDataNormalizationMode(DataNormalizationMode.Raw)
            
            atr = self.ATR(security.Symbol, 20, MovingAverageType.Simple, Resolution.Daily)
            history = self.History(security.Symbol, 20, Resolution.Daily)
            for bar in history.itertuples():
                tradebar = TradeBar(bar.Index[1], security.Symbol, bar.open, bar.high, bar.low, bar.close, bar.volume)
                atr.Update(tradebar)

            symbolData = SymbolData(self, security, atr)
            if not self.SAVE_EARNINGS:
                symbolData.earnings_dates = self.loaded_earnings[security.Symbol.Value]
            self.symbolDataBySymbol[security.Symbol] = symbolData

        for security in changes.RemovedSecurities:
            if security.Symbol.SecurityType == SecurityType.Equity and self.symbolDataBySymbol[security.Symbol].remove_symbol == True:
                self.symbolDataBySymbol.pop(security.Symbol)
            elif security.Symbol.SecurityType == SecurityType.Option:
                ### This if statement is there to prevent a bug where key i not found
                if security.Symbol in self.symbolDataBySymbol[security.Underlying.Symbol].all_contracts:
                    self.symbolDataBySymbol[security.Underlying.Symbol].all_contracts.pop(security.Symbol)
                for symbol in self.Securities.Keys:
                    if symbol.SecurityType == SecurityType.Option and symbol.Underlying == security.Symbol:
                        self.RemoveSecurity(symbol)

    def CoarseSelection(self, coarse):
        filteredCoarse = [x.Symbol for x in coarse if x.Symbol.Value in self.tickers and x.HasFundamentalData]
        return filteredCoarse
    
    def FineSelection(self, fine):
        for x in fine: 
            self.fineFundamentals[x.Symbol] = x
        
        return [x.Symbol for x in fine]

    def OnEndOfAlgorithm(self):
        if self.SAVE_EARNINGS:
            earnings_dict = {}
            for symbol, symbolData in self.symbolDataBySymbol.items():
                self.Debug(f"{symbol.Value} : {symbolData.earnings_dates}")
                string_earnings = [t.strftime('%m/%d/%Y') for t in symbolData.earnings_dates]
                earnings_dict[str(symbol.Value)] = str(string_earnings)

            dump = json.dumps(earnings_dict)
            self.ObjectStore.Save("Earnings_dates", dump)
            self.Debug(earnings_dict)

        else:
            for symbol, symbolData in self.symbolDataBySymbol.items():
                self.Debug(symbolData.earnings_dates)

    
class SymbolData:
    def __init__(self, algo, security, atr):
        self.algo = algo
        self.security = security
        self.symbol = security.Symbol
        self.earnings_dates = []
        self.long_entry_earnings = False
        self.short_entry_earnings = False
        self.short_exit_earnings = False
        self.long_call = None
        self.short_call = None
        self.atr = atr
        self.all_contracts = {}
        self.remove_symbol = False

    def update_earnings(self, date):
        if date.date() not in self.earnings_dates:
            self.earnings_dates.append(date.date())

    ## RUN THIS EVERYONE MORNING 
    def check_option_constraints(self):
        pass

    def make_trade_option_constraints(self, contract, position, optionChain):
        call = None
        
        for chain in optionChain:
            for x in chain:
                if x.Symbol == contract:
                    call = x
                    self.algo.Debug(f"Looking at {call} : {call.Expiry} : {call.Strike} - {position}")

        # SOMETIMES A CONTRACT HAS IV SOMETIMES IT DOESNT. NEED TO LOOK INTO WHY.
        # CANT FIGURE OUT WHY VIA RESEARCH NOTEBOOK. NEED TO CREATE SEPARATE ALGO TO TEST
        try:
            cur_IV = float(call.ImpliedVolatility)
        except:
            self.algo.Debug(f"No IV: {contract}")
            return
        if len(self.all_contracts[contract].historical_IV.index) == 0:
            hist_iv_df = self.algo.GetHistoricalIV(contract, self.symbol)
        else:
            hist_iv_df = self.all_contracts[contract].historical_IV

        # SOMETIMES A CONTRACT HAS HISTORICAL IV SOMETIMES IT DOESNT. NEED TO LOOK INTO WHY.
        # CANT FIGURE OUT WHY VIA RESEARCH NOTEBOOK. NEED TO CREATE SEPARATE ALGO TO TEST
        try:
            hist_iv = list(hist_iv_df.IV)
        except:
            self.algo.Debug(f"No HIST IV: {self.algo.Time.date()} - {contract} ")#- {contract.Expiry}")
            return
        hist_iv = hist_iv[:100]
        IV_Percentile = sum(cur_IV < i for i in hist_iv) / 100
        self.algo.Debug(f"IV_Percentile : {IV_Percentile}")
        if position == 'long' and IV_Percentile <= 0.1 and IV_Percentile > 0:
            self.algo.Buy(contract,1)
            self.long_call = contract
            self.algo.Debug(f"Trade on : {position} : {self.algo.Time} : {call.Strike} : {call.Expiry}")
        elif position == 'short' and IV_Percentile > 0.9:
            self.algo.Sell(contract,1)
            self.short_call = contract

class OptionContractIV:
    def __init__(self, symbol, historical_IV, position, ticker, algo):
        self.symbol = symbol
        self.historical_IV = historical_IV
        self.position = position
        self.ticker = ticker
        self.algo = algo

    def update_IV(self):
        call = None
        for chain in self.algo.slice.OptionChains.Values:
            for x in chain:
                if x.Symbol == self.symbol:
                    call = x

        if call == None:
            return

        df_dict = defaultdict(list)
        df_dict['Expiry'].append(self.symbol.ID.Date)
        df_dict['Strike'].append(self.symbol.ID.StrikePrice)
        df_dict['EndDate'].append(self.algo.Time)
        df_dict['IV'].append(call.ImpliedVolatility)

        df = pd.DataFrame.from_dict(df_dict)
        self.historical_IV = pd.concat([self.historical_IV, df], ignore_index=True)

    def option_exit_data(self):
        exits = {}
        exits['Pnl'] = self.algo.Portfolio[self.symbol].UnrealizedProfitPercent
        delta = (self.symbol.ID.Date.date() - self.algo.Time.date())
        exits['Dt_Expiry'] = delta.days
        return exits