Overall Statistics
Total Trades
55
Average Win
6.84%
Average Loss
-13.09%
Compounding Annual Return
15.629%
Drawdown
32.300%
Expectancy
0.456
Net Profit
276.612%
Sharpe Ratio
0.53
Sortino Ratio
0.622
Probabilistic Sharpe Ratio
5.995%
Loss Rate
4%
Win Rate
96%
Profit-Loss Ratio
0.52
Alpha
0.04
Beta
0.925
Annual Standard Deviation
0.209
Annual Variance
0.044
Information Ratio
0.218
Tracking Error
0.157
Treynor Ratio
0.12
Total Fees
$56.80
Estimated Strategy Capacity
$0
Lowest Capacity Asset
QQQ YTG30PLFBECM|QQQ RIWIV7K5Z9LX
Portfolio Turnover
0.26%
# region imports
from AlgorithmImports import *
from QuantConnect.DataSource import *
import numpy as np
from math import log, sqrt, exp
from scipy.stats import norm
# endregion

VOLA = 126; BASE_RET = 85; LEV = 0.99; 
class FormalOrangeBadger(QCAlgorithm):

    def Initialize(self):
        self.SetStartDate(2015, 1, 1)
        #self.SetEndDate(2022, 12, 3)
        self.SetCash(10000)

        self.DCH_previous_Up = dict()
        self.DCH_previous_Down = dict()
        self.DCH_previous_Middle = dict()
        self.dch = dict()
        self.roc = dict()
        self.aps = dict()
        self.adx = dict()
        #self.log_roc = dict()
        self.std_log_roc = dict()
        self.SetBrokerageModel(BrokerageName.InteractiveBrokersBrokerage, AccountType.Cash)
        self.SetSecurityInitializer(lambda x: x.SetMarketPrice(self.GetLastKnownPrice(x)))
        #self.SetSecurityInitializer(self.customSecurityInitializer)
        self.vix = self.AddData(CBOE, "VIX").Symbol
        self.IVlvl = 0.5
        self.Ivlvl_High = 0.9
        self.threshold = 0.02
        self.std_log_roc_MIN = dict()
        self.volatility = 0
        self.option_symbol = dict()
        self.option = dict()
        #self.ticket_initial = None
        #self.ticket_dca = None

        self.yieldCurveTwo = self.AddData(USTreasuryYieldCurveRate, "USTYCR").Symbol
        history = self.History(USTreasuryYieldCurveRate, self.yieldCurveTwo, 2, Resolution.Daily)
#"IWM", "NVDA", "AAPL", "AMD", "AMZN", "MSFT", "DIS", "GOOGL", "META", "GOOG", "INTC", "XOM", "DD", "UPS", "JNJ", "JPM", "PG"
        for ticker in ["SPY", "QQQ", ]:
            symbol = self.AddEquity(ticker, Resolution.Minute).Symbol
            self.option[symbol] = self.AddOption(symbol, Resolution.Minute)
            self.option[symbol].SetFilter(lambda universe: universe.IncludeWeeklys().Strikes(-15, 15).Expiration(600, 999999))
            self.option_symbol[symbol] = self.option[symbol].Symbol
            self.option[symbol].PriceModel = OptionPriceModels.CrankNicolsonFD()
            self.dch[symbol] = self.DCH(symbol, 20, 20, Resolution.Daily)
            self.DCH_previous_Up[symbol] = None
            self.DCH_previous_Down[symbol] = None
            self.DCH_previous_Middle[symbol] = None
            self.roc[symbol] = self.ROC(symbol, 60, Resolution.Minute)


        
        self.SLV = self.AddEquity('SLV', Resolution.Daily).Symbol  
        self.GLD = self.AddEquity('GLD', Resolution.Daily).Symbol  
        self.XLI = self.AddEquity('XLI', Resolution.Daily).Symbol 
        self.XLU = self.AddEquity('XLU', Resolution.Daily).Symbol
        self.DBB = self.AddEquity('DBB', Resolution.Daily).Symbol  
        self.UUP = self.AddEquity('UUP', Resolution.Daily).Symbol
        
        self.EnableAutomaticIndicatorWarmUp = True
        self.SetWarmUp(100, Resolution.Daily)
        self.SetBenchmark("SPY")

        
        self.spy = self.AddEquity('SPY', Resolution.Daily)
        self.MKT = self.spy.Symbol
        self.SetSecurityInitializer(lambda x: x.SetMarketPrice(self.GetLastKnownPrice(x))) 
        
        self.pairs = [self.SLV, self.GLD, self.XLI, self.XLU, self.DBB, self.UUP]
        
        
        self.bull = 1        
        self.count = 0 
        self.outday = 0        
        self.wt = {}
        self.real_wt = {}
        self.spy = []
        
        
        
        self.Schedule.On(self.DateRules.EveryDay(), self.TimeRules.AfterMarketOpen('SPY', 30),
            self.daily_check)
        #self.Schedule.On(self.DateRules.EveryDay("SPY"), self.TimeRules.BeforeMarketClose("SPY", 30), self.CheckDTEPuts)
        self.Schedule.On(self.DateRules.EveryDay("SPY"), self.TimeRules.AfterMarketOpen("SPY", 30), self.CheckDTECalls)
        self.Schedule.On(self.DateRules.EveryDay("SPY"), self.TimeRules.AfterMarketOpen("SPY", 10), self.DCA_Bear)
        #self.Schedule.On(self.DateRules.EveryDay("SPY"), self.TimeRules.AfterMarketOpen("SPY", 0), self.VIX)
        #self.Schedule.On(self.DateRules.EveryDay("SPY"), self.TimeRules.AfterMarketOpen("SPY", 40), self.Plotting)
    
    
        symbols = [self.MKT] + self.pairs
        for symbol in symbols:
            self.consolidator = TradeBarConsolidator(timedelta(days=1))
            self.consolidator.DataConsolidated += self.consolidation_handler
            self.SubscriptionManager.AddConsolidator(symbol, self.consolidator)
        
        
        self.history = self.History(symbols, VOLA + 1, Resolution.Daily)
        if self.history.empty or 'close' not in self.history.columns:
            return
        self.history = self.history['close'].unstack(level=0).dropna()

        self.existing_option_positions = set()
        
        
    def consolidation_handler(self, sender, consolidated):
        self.history.loc[consolidated.EndTime, consolidated.Symbol] = consolidated.Close
        self.history = self.history.iloc[-(VOLA + 1):] 
    
    def daily_check(self):
        
        vola = self.history[[self.MKT]].pct_change().std() * np.sqrt(252)
        wait_days = int(vola * BASE_RET)
        period = int((1.0 - vola) * BASE_RET)        
        r = self.history.pct_change(period).iloc[-1]
        VIXhistory = self.History(CBOE, self.vix, 90, Resolution.Daily)
            # (Current - Min) / (Max - Min)
        self.rank = ((self.Securities[self.vix].Price - min(VIXhistory["low"])) / (max(VIXhistory["high"]) - min(VIXhistory["low"])))
        exit = ((r[self.SLV] < r[self.GLD]) and (r[self.XLI] < r[self.XLU]) and (r[self.DBB] < r[self.UUP]) and (self.rank > self.IVlvl))

        if exit:
            self.bull = 0
            self.outday = self.count
        if self.count >= self.outday + wait_days:
            self.bull = 1
        self.count += 1    

        #self.Debug(f"{self.Time} ----- Current Signal - {self.bull}")


    def VIX_check(self):
        if self.rank > self.IVlvl and self.rank < self.Ivlvl_High:
            self.volatility = 1
        elif self.rank > self.Ivlvl_High:
            self.volatility = 2

        else: self.volatility = 0    

    def VIX(self):
        VIXhistory = self.History(CBOE, self.vix, 90, Resolution.Daily)
            # (Current - Min) / (Max - Min)
        self.rank = ((self.Securities[self.vix].Price - min(VIXhistory["low"])) / (max(VIXhistory["high"]) - min(VIXhistory["low"])))
        
    
    def CheckDTECalls(self):
        option_invested = [x.Key for x in self.Portfolio if x.Value.Invested and x.Value.Type==SecurityType.Option and x.Key.ID.OptionRight == OptionRight.Call]
        if option_invested:
            for i, symbol in enumerate(option_invested):
                if (self.Time + timedelta(300) > option_invested[i].ID.Date):
                    self.Liquidate(option_invested[i], tag = "Sold due to Expiration")
                    #self.Debug (f"| {self.Time}   [+]---  Liquidate condition 1 ----- @ {str(self.Portfolio[option_invested[i]].Symbol.Value)} || Stock @ {str(self.Securities[option_invested[i].Underlying.Value].Price)}|| Profit/Loss @ {str(self.Portfolio[option_invested[i]].LastTradeProfit)}")


    def DCA_Bear(self):  
        option_invested = [x.Key for x in self.Portfolio if x.Value.Invested and x.Value.Type==SecurityType.Option and x.Key.ID.OptionRight == OptionRight.Call]
        if option_invested and self.bull == 0:
            for symbol, option_symbol in self.option_symbol.items():
                invested1 = [option for option in option_invested if option.Underlying == symbol.Value]
                for i, symbol in enumerate(invested1):
                    if self.Portfolio[invested1[i]].Quantity > 1: continue
                    option_buy = self.AddOptionContract(invested1[i], Resolution.Minute)
                    if (self.Portfolio[invested1[i]].AveragePrice - option_buy.AskPrice) / (self.Portfolio[invested1[i]].AveragePrice) > 0.4: 
                        self.ticket_dca_1 = self.Buy(option_buy.Symbol, 1)
    
    def customSecurityInitializer(self, security):            
        bar = self.GetLastKnownPrice(security)
        security.SetMarketPrice(bar)
    
    def OnData(self, slice: Slice):
        if self.IsWarmingUp: return
        if not all ([roc.IsReady for symbol, roc in self.roc.items()]):
            return
        if self.Time.hour < 10: return
        for symbol, option_symbol in self.option_symbol.items():
            option_invested1 = [x.Key for x in self.Portfolio if x.Value.Invested and x.Value.Type==SecurityType.Option and x.Key.ID.OptionRight == OptionRight.Call]
            option_invested2 = [x.Key for x in self.Portfolio if x.Value.Invested and x.Value.Type==SecurityType.Option and x.Key.ID.OptionRight == OptionRight.Put]
        
            #if  self.DCH_previous_Up[symbol] is not None and self.DCH_previous_Down[symbol] is not None:

                #self.price = self.Securities[symbol].Close
                #self.high = self.Securities[symbol].High
                #self.low = self.Securities[symbol].Low
                
            invested1 = [option for option in option_invested1 if option.Underlying == symbol.Value]
            if len(invested1) > 0: 
                #if self.price < self.DCH_previous_Down[symbol]:
                    #for i, symbol in enumerate(invested1): 
                        #self.Liquidate(invested1[i], tag = "sold due to lower lows")
                        #self.Debug (f"| {self.Time}   [-]---  Liquidate Condition 2 ---- @ {str(self.Portfolio[invested1[i]].Symbol.Value)} || Stock @ {str(self.Securities[invested1[i].Underlying.Value].Price)}|| Profit/Loss @ {str(self.Portfolio[invested1[i]].LastTradeProfit)}")
                        #continue                  
                continue           
                
                #invested2 = [option for option in option_invested2 if option.Underlying == self.Securities[symbol].Symbol.Value]
                #if len(invested2) > 0:
                    #continue

                #if self.volatility == 1: 
                    #self.Debug(f"{self.Time} --- High VIX")
                    #return 
                #if self.volatility == 2:
                    #security = self.Securities[symbol]
                    #self.buyputs(security)
                    #self.Debug(f" {self.Time} --- Reset")
                    #return
                #invested2 = [option for option in option_invested2 if option.Underlying == self.Securities[symbol].Symbol.Value]
                #if len(invested2) > 0: return
            if self.bull == 1:
                    #if ((((self.price - self.DCH_previous_Middle[symbol]) / (self.DCH_previous_Middle[symbol])) > self.threshold) and (self.roc[symbol].Current.Value > self.threshold)):
                    #if (((self.price - self.DCH_previous_Middle[symbol]) / (self.DCH_previous_Middle[symbol])) < -(self.threshold) or (self.price < self.DCH_previous_Down[symbol])) and (self.std_log_roc[symbol].Current.Value < self.std_log_roc_MIN[symbol].Current.Value) and (self.roc[symbol].Current.Value < -(self.threshold)):
                    #if ((self.price < self.DCH_previous_Down[symbol]) and (self.roc[symbol].Current.Value > self.threshold) and (self.aps[symbol].Current.Value < -2)) or ((((self.price - self.DCH_previous_Middle[symbol]) / (self.DCH_previous_Middle[symbol])) < -(self.threshold)) and (self.roc[symbol].Current.Value > self.threshold) and (self.aps[symbol].Current.Value < -2)):
                chain = slice.OptionChains.get(self.option_symbol[symbol], None)
                if not chain: return

            # Get the furthest expiration date of the contracts
                expiry = sorted(chain, key = lambda x: x.Expiry, reverse=True)[0].Expiry
            
            # Select the call Option contracts with the furthest expiry
                calls = [i for i in chain if i.Expiry == expiry and i.Right == OptionRight.Call and i.Strike > i.UnderlyingLastPrice and i.ImpliedVolatility < 0.2]
                if len(calls) == 0: return

            # Select the ITM and OTM contract strike prices from the remaining contracts
                            #call_strikes = sorted([x.Strike for x in calls])
                itm_strike_call = calls[0].Symbol
                            #option_strategy = OptionStrategies.BullCallSpread(self.option_symbol[symbol], itm_strike, otm_strike, expiry)
                self.ticket_initial = self.Buy(itm_strike_call, 1)
                        
                    
                    #self.buyoptions(security)
                    #self.Debug(f"{self.Time} --- stock {symbol} --- Price {self.price} --- DCH UpperValue -- {self.DCH_previous_Up[symbol]} -- DCH MiddleValue {self.DCH_previous_Middle} ---- ROC-- {self.roc[symbol].Current.Value} ---- Volatility {self.std_log_roc[symbol].Current.Value} ----- VMIN {self.std_log_roc_MIN[symbol].Current.Value}")
                
                    #self.SetHoldings(symbol, 0.1)
                    #self.DCH_previous_Up[symbol] = self.dch[symbol].UpperBand.Current.Value
                    #self.DCH_previous_Down[symbol] = self.dch[symbol].LowerBand.Current.Value    

            #self.DCH_previous_Up[symbol] = self.dch[symbol].UpperBand.Current.Value
            #self.DCH_previous_Down[symbol] = self.dch[symbol].LowerBand.Current.Value
            #self.DCH_previous_Middle[symbol] = self.dch[symbol].Current.Value    


    def OnOrderEvent(self, orderEvent):
            order = self.Transactions.GetOrderById(orderEvent.OrderId)
            
            if order.Status == OrderStatus.Filled and order.Direction == OrderDirection.Buy and order.SecurityType == SecurityType.Option:
                if (self.Portfolio[order.Symbol].Quantity) > 1:
                    response = self.ticket.Cancel("Cancelled Trade")
                    if response.IsSuccess:
                        self.Debug("Order successfully cancelled")
                    quantity = self.Portfolio[order.Symbol].Quantity
                    fill_price = self.Portfolio[order.Symbol].AveragePrice
                    limit_price = fill_price * 1.5
                    self.ticket_1 = self.LimitOrder(order.Symbol, -quantity, limit_price, tag = "Sold at 50% Profit")
                    self.Debug(f" {self.Time} -- -- Bought & Update -- {order.Symbol} Average Price {self.Portfolio[order.Symbol].AveragePrice} // Contract Price at {self.Securities[order.Symbol].AskPrice}   // {self.Securities[order.Symbol].Underlying.Price} ---- Limit Price {limit_price} ")
                    
                elif (self.Portfolio[order.Symbol].Quantity) == 1:
                    quantity = self.Portfolio[order.Symbol].Quantity
                    fill_price = self.Portfolio[order.Symbol].AveragePrice
                    limit_price = fill_price * 1.5
                    self.ticket = self.LimitOrder(order.Symbol, -quantity, limit_price, tag = "Sold at 50% Profit")
                    self.Debug(f" {self.Time} -- -- Bought {order.Symbol} for {self.Portfolio[order.Symbol].AveragePrice} //  // {self.Securities[order.Symbol].Underlying.Price} ----- Limit Price {limit_price}")

            #if order.Status == OrderStatus.Filled and order.Direction == OrderDirection.Buy and order.SecurityType == SecurityType.Equity:
                #self.Debug(f" {self.Time} -- -- Bought {order.Symbol} for {self.Portfolio[order.Symbol].AveragePrice} //   ")
            #if order.Status == OrderStatus.Filled and order.Direction == OrderDirection.Buy and self.ticket_dca_1.OrderId:
                #updateSettings = UpdateOrderFields()
                #updateSettings.LimitPrice = self.Portfolio[order.Symbol].AveragePrice * 1.3
                #updateSettings.Quantity = -(self.Portfolio[order.Symbol].Quantity)
                #updateSettings.Tag = "Limit Price Updated for DCA Trade - Sold at new 30% Profit"
                #response = ticket.Update(updateSettings)
                #if response.IsSuccess:
                    #self.Debug("Order updated successfully")
            
            
            if order.Status == OrderStatus.Filled and order.Direction == OrderDirection.Sell and order.SecurityType == SecurityType.Option and not order.Type == OrderType.Limit:
                self.Debug(f" {self.Time} -- -- Sold {order.Symbol} for {self.Portfolio[order.Symbol].Price} //  stock price // {self.Securities[order.Symbol].Underlying.Price} ---- Profit/Loss {self.Portfolio[order.Symbol].LastTradeProfit} ")


            #if order.Status == OrderStatus.Filled and order.Direction == OrderDirection.Sell and order.SecurityType == SecurityType.Equity:
                #self.Debug(f" {self.Time} -- -- Sold {order.Symbol} for {self.Portfolio[order.Symbol].Price} //  stock price // {self.Securities[order.Symbol].Price} ---- Profit/Loss {self.Portfolio[order.Symbol].LastTradeProfit} ")

            if order.Status == OrderStatus.Filled and order.Type == OrderType.Limit and (order.Direction == OrderDirection.Sell):
                self.Debug(f" {self.Time} -- -- Closed{order.Symbol} for {self.Portfolio[order.Symbol].Price} //  stock price // {self.Securities[order.Symbol].Underlying.Price} ---- Profit/Loss {self.Portfolio[order.Symbol].LastTradeProfit} ")
                #options_quantity = self.Portfolio[order.Symbol].Quantity
                #if options_quantity > 0: 
                    #fill_price = self.Portfolio[order.Symbol].AveragePrice
                    #limit_price2 = fill_price * 1.5
                    #stop_price = fill_price * 0.8
                    #self.LimitOrder(order.Symbol, -options_quantity, limit_price2, tag = "Sold at 50% Profit")
                    #self.TrailingStopOrder(order.Symbol, -options_quantity, 0.20, True)
                    #self.Debug(f" {self.Time} -- -- Trailing Updated {order.Symbol} quantity {self.Portfolio[order.Symbol].Quantity} // Limit {limit_price2} stock price // {self.Securities[order.Symbol].Underlying.Price} ") 


            if order.Type == OrderType.OptionExercise:
                self.Liquidate(order.Symbol)

    
    def buyoptions(self, security):
            security.SetDataNormalizationMode(DataNormalizationMode.Raw)
                # Filter for out of the money call options and expiring at least 60 days
            option_chain = self.OptionChainProvider.GetOptionContractList(security.Symbol, self.Time)
            
            call_options = [option for option in option_chain if option.ID.OptionRight == 0]
            otm_call_options = sorted([option for option in call_options if ((option.ID.StrikePrice - security.Close)/(security.Close) > 0.05) and (option.ID.Date - self.Time).days >= 450])

            if len(otm_call_options) > 0:
                self.call = otm_call_options[0]
                    #subscribe to the option contract, so that we can buy it
                option_buy = self.AddOptionContract(self.call, Resolution.Minute)
                option_buy.PriceModel = OptionPriceModels.CrankNicolsonFD()
                if option_buy.AskPrice == 0:
                    self.Debug("No prices")
                    return            
                #if option_buy.AskPrice > option_price_BS:
                    #self.Log((f"| Option too Expensive! {self.Time})"))
                    #return
                #if self.Portfolio[option_buy.Symbol].Invested:
                    #return
                #quantity = 10000 / option_buy.AskPrice
                #quantity1 = math.floor(quantity / 100)
                #if quantity1 < 1: return
                if option_symbol not in self.existing_option_positions:
                    self.SetHoldings(option_buy.Symbol, 0.3)
                    self.existing_option_positions.add(option_symbol)
                #self.MarketOrder(option_buy.Symbol, quantity1)
                #self.MarketOrder(option_buy.Symbol, 1)

    def buyputs(self, security):
            security.SetDataNormalizationMode(DataNormalizationMode.Raw)
            option_chain = self.OptionChainProvider.GetOptionContractList(security.Symbol, self.Time)
            put_options = [option for option in option_chain if option.ID.OptionRight == 1]

                # Filter for out of the money call options and expiring at least 60 days
            otm_put_options = sorted([option for option in put_options if (option.ID.Date - self.Time).days <= 60])
            otm_expiry = sorted([option for option in otm_put_options if ((option.ID.StrikePrice - security.Close)/(security.Close) < -0.05)], reverse = True)

            if len(otm_expiry) > 0:
                self.put = otm_expiry[0]
                    #subscribe to the option contract, so that we can buy it
                option_buyput = self.AddOptionContract(self.put, Resolution.Minute)
                if self.Portfolio[option_buyput.Symbol].Invested:
                    return
                if option_buyput.AskPrice == 0:
                    self.Debug("No prices")
                    return
                self.SetHoldings(option_buyput.Symbol, 0.3)
         
    def BlackScholesCall(self, S, K, T, r, sigma):
        d1 = (log(S / K) + (r + sigma ** 2 / 2) * T) / (sigma * sqrt(T))
        d2 = d1 - sigma * sqrt(T)
        return S * norm.cdf(d1) - K * exp(-r * T) * norm.cdf(d2)


    def Plotting(self):
        self.Plot('VIX', 'Rank', self.rank)