Overall Statistics
Total Trades
10
Average Win
13.67%
Average Loss
0%
Compounding Annual Return
18.827%
Drawdown
52.900%
Expectancy
0
Net Profit
85.168%
Sharpe Ratio
0.667
Probabilistic Sharpe Ratio
19.819%
Loss Rate
0%
Win Rate
100%
Profit-Loss Ratio
0
Alpha
0.083
Beta
0.746
Annual Standard Deviation
0.239
Annual Variance
0.057
Information Ratio
0.294
Tracking Error
0.196
Treynor Ratio
0.214
Total Fees
$18.50
Estimated Strategy Capacity
$23000.00
Lowest Capacity Asset
SPY YYFADQB3UR1I|SPY R735QTJ8XC9X
Portfolio Turnover
0.24%
from AlgorithmImports import *
import numpy as np
from math import log, sqrt, exp
from scipy.stats import norm
from QuantConnect.DataSource import *

class NOKLeapsBreakout(QCAlgorithm):

    def Initialize(self):
        self.Balance = 100000  
        self.SetStartDate(2020, 1, 1)
        #self.SetEndDate(2023, 3, 20)
        self.SetCash(self.Balance)
        equity = self.AddEquity("SPY", Resolution.Minute)
        equity.SetDataNormalizationMode(DataNormalizationMode.Raw)
        self.equity = equity.Symbol
        self.SetBenchmark(self.equity)
        self.SetSecurityInitializer(lambda x: x.SetMarketPrice(self.GetLastKnownPrice(x)))
        option = self.AddOption("SPY", Resolution.Minute)
        self.option_symbol = option.Symbol
        self.Minimum_Expiry = 648
        self.Maximum_Expiry = 470
        self.Profit_Taking = 1.5
        self.number_options = 2
        self.Min_Option_Price = 0
        option.SetFilter(0, 5, timedelta(self.Minimum_Expiry), timedelta(99999))
        self.yieldCurveTwo = self.AddData(USTreasuryYieldCurveRate, "USTYCR").Symbol
        history = self.History(USTreasuryYieldCurveRate, self.yieldCurveTwo, 60, Resolution.Daily)
        self.SetWarmUp(timedelta(10))
        self.sigma = 0.165
        self.r = None
        self.LastTime = None
        self.Schedule.On(self.DateRules.EveryDay("SPY"), self.TimeRules.BeforeMarketClose("SPY", 0), self.Plotting)
        self.mkt = []



    def OnData(self,data):
        if self.IsWarmingUp: return
        self.SetRuntimeStatistic("Total Profit", (f"$ {round(self.Portfolio.TotalPortfolioValue - 10000)}"))
        if data.ContainsKey(self.yieldCurveTwo):
            rates = data[self.yieldCurveTwo]
            self.r = ((rates.TwoYear) / 100)
            self.Log(f"| {self.Time} Rates found - {self.r}")
        option_invested = [x.Key for x in self.Portfolio if x.Value.Invested and x.Value.Type==SecurityType.Option]
        if option_invested:
            self.SetRuntimeStatistic("Total Profit", (f"$ {round(self.Portfolio.TotalPortfolioValue - 10000)}"))
            for i in range(len(option_invested)):

                if (self.Time + timedelta(self.Maximum_Expiry) > option_invested[i].ID.Date):
                    self.Liquidate(option_invested[i], "Too close to expiration")
                    self.Log (f"| {self.Time}   [+]---  Liquidate @ {str(self.Portfolio[option_invested[i]].Symbol.Value)} || Stock @ {str(self.Portfolio[self.equity].Price)}|| Profit/Loss @ {str(self.Portfolio[option_invested[i]].LastTradeProfit)}")
                    self.Log (f"| {self.Time}   [-]--- REASON: ||  <{(self.Maximum_Expiry)} DTE  | {(option_invested[i].ID.Date - self.Time).days} DTE")
                    
                if not self.LastTime == self.Time.day:
                    if len(option_invested)> ((self.number_options)-1):
                        self.Log("Currently Holding 2 Contracts - MAXIMUM") 
                        return
                    else: 
                        self.Log("only holding one contract")
                        for i in data.OptionChains:
                            chains = i.Value
                            self.BuyCall(chains)
          

        if not option_invested:
            self.SetRuntimeStatistic("Total Profit", (f"$ {round(self.Portfolio.TotalPortfolioValue - 100000)}"))
            self.Log("No Options invested")
            for i in data.OptionChains:
                chains = i.Value
                self.BuyCall(chains)
                             
        # Buy options - Call
    def BuyCall(self,chains):
        self.S = self.Securities[self.equity].Close
        
        expiry = sorted(chains,key = lambda x: x.Expiry, reverse=True)[0].Expiry
        calls = [i for i in chains if i.Expiry == expiry and i.Right == OptionRight.Call]
        call_contracts = sorted(calls,key = lambda x: abs(x.Strike - x.UnderlyingLastPrice < 0))

        if len(call_contracts) == 0: 
            self.Debug("No Contracts Found") 
            return

        self.Log("Contracts Found!")
        self.call = call_contracts[0]
        self.K = self.call.Strike
        self.T = ((self.call.Expiry - self.Time).days)/(365)
        option_invested = [x.Key for x in self.Portfolio if x.Value.Invested and x.Value.Type==SecurityType.Option]
        if (option_invested):
            for i in range(len(option_invested)): 
               if (str(self.Portfolio[option_invested[i]].Symbol.Value) == str(self.call)):
                self.Log("Already bought the Contract")
                return
        if self.r is None: 
            history = self.History(USTreasuryYieldCurveRate, self.yieldCurveTwo, 60, Resolution.Daily)
            twoyear = history['twoyear'][-1]
            self.r = twoyear / 100
            self.Debug((f"| Previous Rates Used -- Succesful {self.Time} -- {self.r}"))
        self.option_price = self.BlackScholesCall(self.S, self.K, self.T, self.r, self.sigma)
        self.Log((f"| Option Price Calculated - Succesful {self.Time} [+]--  Contracts  {str(self.call)} || Stock @ {str(self.call.UnderlyingLastPrice)} Option Price BS @ {(self.option_price)} rates: {self.r} Option Price Market @ {self.call.AskPrice} DTE @ {(self.call.Expiry - self.Time).days}"))
        if self.call.AskPrice > self.option_price:
            self.Log((f"| Option too Expensive! {self.Time})"))
            return
        if self.call.AskPrice == 0: return    
        self.SetHoldings(self.call.Symbol, 0.3)
        #quantity = self.Portfolio[self.call.Symbol].Quantity   # <-- quantity is zero and the following orders will be invalid
        #if quantity != 0:
            #self.LimitOrder(self.call.Symbol, -quantity, (self.call.AskPrice * self.Profit_Taking))
            #self.Log ("\r+-------------")
            #self.Log (f"| {self.Time} [+] Bought Contracts -- {str(self.call)}")
            #self.Log(f"Order Quantity filled: {self.Portfolio[self.call.Symbol].Quantity}; Fill price: {self.Portfolio[self.call.Symbol].AveragePrice} Limit price: {self.call.AskPrice * self.Profit_Taking}")
            #self.LastTime = self.Time.day
            #self.Log ("\r+-------------")
        #else: self.Log(f"| {self.Time} -- No Money")    
        
        

    def OnOrderEvent(self, orderEvent):
        order = self.Transactions.GetOrderById(orderEvent.OrderId)
        if order.Status == OrderStatus.Filled:
            if self.Portfolio[order.Symbol].Invested:
                quantity = self.Portfolio[order.Symbol].Quantity
                fill_price = self.Portfolio[order.Symbol].AveragePrice
                limit_price = fill_price * self.Profit_Taking
                self.LimitOrder(order.Symbol, -quantity, limit_price)
                self.Log(f"Order Quantity filled: {quantity}; Fill price: {fill_price} Limit price: {limit_price}")
        # Cancel remaining order if limit order or stop loss order is executed
        #if order.Status == OrderStatus.Filled:
            #if order.Type == OrderType.Limit or OrderType.StopMarket:
                #self.Transactions.CancelOpenOrders(order.Symbol)
        if order.Status == OrderStatus.Filled and order.Type == OrderType.Limit:
            self.Log(f"{self.Time}   [+]---  SELL  {str(order.Quantity)} || Price @ {str(order.LimitPrice)}|| Contract @ {str(order.Symbol)}")
        if order.Status == OrderStatus.Invalid:
            self.Log(f"| {self.Time} -- Insufficient Money")
        # Liquidate before options are exercised
        if order.Type == OrderType.OptionExercise:
            self.Liquidate()

    
    def BlackScholesCall(self, S, K, T, r, sigma):
        d1 = (log(S / K) + (r + sigma ** 2 / 2) * T) / (sigma * sqrt(T))
        d2 = d1 - sigma * sqrt(T)
        return S * norm.cdf(d1) - K * exp(-r * T) * norm.cdf(d2)

    def Plotting(self):  
        hist = self.History(self.equity, 2, Resolution.Daily)['close'].unstack(level= 0).dropna() 
        self.mkt.append(hist[self.equity].iloc[-1])
        spy_perf = self.mkt[-1] / self.mkt[0] * self.Balance
        self.Plot('Strategy Equity', 'SPY', spy_perf)