Overall Statistics
Total Trades
344
Average Win
0.39%
Average Loss
-0.36%
Compounding Annual Return
6.275%
Drawdown
11.400%
Expectancy
0.278
Net Profit
36.337%
Sharpe Ratio
0.852
Probabilistic Sharpe Ratio
33.684%
Loss Rate
39%
Win Rate
61%
Profit-Loss Ratio
1.09
Alpha
0.054
Beta
-0.024
Annual Standard Deviation
0.062
Annual Variance
0.004
Information Ratio
-0.021
Tracking Error
0.179
Treynor Ratio
-2.168
Total Fees
$469.49
from datetime import timedelta
import numpy as np
import pandas as pd
import traceback

# https://www.cboe.com/micro/vix/vixwhite.pdf
# look here for some pandas inspiration.
# https://github.com/khrapovs/vix/blob/master/vix/reproduce_vix.py
# https://github.com/michaelchu/optopsy 

def compute_iv(chain,interest1mnth=0.0003,interest2mnth=0.00031):

    if len([x for x in chain]) < 5:
        #raise ValueError("not enought data?")
        return np.nan
        
    try:
        near_term_expiry = sorted([x for x in chain if (x.Expiry-x.Time).days <= 30], key=lambda x: x.Expiry)[-1].Expiry
        next_term_expiry = sorted([x for x in chain if (x.Expiry-x.Time).days > 30], key=lambda x: x.Expiry)[0].Expiry
    except:
        #raise ValueError("near and next term option not found?")
        return np.nan
        
    data = {}
    # filter the call and put options from the contracts
    for name,expiry,interest_rate in [('near',near_term_expiry,interest1mnth),('next',next_term_expiry,interest2mnth)]:
    
        calls = {x.Strike:x for x in chain if x.Expiry == expiry and x.Right == 0}
        puts = {x.Strike:x for x in chain if x.Expiry == expiry and x.Right == 1}
        
        if len(calls) < 5 or len(puts) < 5:
            #raise ValueError("bad data??")
            return np.nan
            
        # transform
        mydata = []
        strikes = sorted([x for x in calls.keys()])
        price = list(calls.values())[0].UnderlyingLastPrice
        timenow = list(calls.values())[0].Time
        nt = (expiry-timenow).days*24*60
        t = nt / 525600
        r = interest_rate
        
        p_ind = np.argmin([np.abs(x-price) for x in strikes])
        p_strike = strikes[p_ind]
        
        call_df = pd.DataFrame([ \
            dict(strike=x.Strike,call_bid=x.BidPrice,call_ask=x.AskPrice) \
            for x in chain if x.Expiry == expiry and x.Right == 0]).sort_values("strike")

        put_df = pd.DataFrame([ \
            dict(strike=x.Strike,put_bid=x.BidPrice,put_ask=x.AskPrice) \
            for x in chain if x.Expiry == expiry and x.Right == 1]).sort_values("strike")
        
        merged = call_df.merge(put_df,on='strike')
        merged['call_mid']=(merged.call_bid+merged.call_ask)/2
        merged['put_mid']=(merged.put_bid+merged.put_ask)/2
        
        
        # merged columns:
        # call_bid,call_mid,call_ask,strike,put_bid,put_mid,put_ask
        
        atm_ind = np.argmin(np.abs((merged.call_mid-merged.put_mid).values))
        y = interest_rate
        c = merged.call_mid.values[atm_ind]
        p = merged.put_mid.values[atm_ind]
        s = merged.strike.values[atm_ind]
        f = s+np.exp(y*t)*(c-p)
        k0 = np.max([x for x in strikes if x<f])
        
        merged = merged.sort_values('strike',ascending=True)
        merged['call_bid_zero_cumsum']=(merged['call_bid']==0).cumsum()
        merged = merged.sort_values('strike',ascending=False)
        merged['put_bid_zero_cumsum']=(merged['put_bid']==0).cumsum()
        merged = merged.sort_values('strike',ascending=True)
        # filter out super otm contracts, i.e. all(cumsum<=1,bid!=0,strike>price)
        merged['use_put'] = np.logical_and((merged.put_bid!=0).values,np.logical_and((merged.put_bid_zero_cumsum <= 1).values,(merged.strike < k0).values))
        merged['use_put_and_call'] = merged.strike == k0
        merged['use_call'] = np.logical_and((merged.call_bid!=0).values,np.logical_and((merged.call_bid_zero_cumsum <= 1).values,(merged.strike > k0).values))
        merged['strike_p1'] = merged.strike.shift(-1)
        merged['strike_m1'] = merged.strike.shift(1)
        
        
        def compute_contrib(x):
            
            if np.isnan(x.strike_m1) or np.isnan(x.strike_p1):
                return np.nan

            if x.use_put_and_call:
                qk = np.mean([x.put_bid,x.put_ask,x.call_bid,x.call_ask])
            elif x.use_call:
                qk = np.mean([x.call_bid,x.call_ask])
            elif x.use_put:
                qk = np.mean([x.put_bid,x.put_ask])
            else:
                return np.nan
                
            dk = (x.strike_p1-x.strike_m1)/2
            k2 = x.strike**2
            contrib = (dk/k2)*np.exp(r*t)*qk
        
            return contrib
        
        merged['contrib'] = merged.apply(lambda x: compute_contrib(x), axis=1)
        
        # compute voaltility at x-term
        sigsqr = (2/t)*np.nansum(merged.contrib) - (1/t)*((f/k0)-1)**2
        
        data[name] = dict(
            t=t,
            nt=nt,
            sigsqr=sigsqr,
        )
    
    if len(data) != 2:
        return np.nan
    
    n365 = 525600
    n30 = 43200
    t1 = data['near']['t']
    t2 = data['next']['t']
    nt1 = data['near']['nt']
    nt2 = data['next']['nt']
    sigsqr1 = data['near']['sigsqr']
    sigsqr2 = data['next']['sigsqr']
    
    iv = 100 *np.sqrt(
        (t1*sigsqr1*((nt2-n30)/(nt2-nt1))+t2*sigsqr2*((n30-nt1)/(nt2-nt1)))*(n365/n30)
    )
    
    return iv
    

from QuantConnect.Data.Custom import Quandl
from QuantConnect.Python import PythonData
from QuantConnect.Data import SubscriptionDataSource
from datetime import datetime, timedelta
import decimal

class MyYield(PythonData):
    def GetSource(self, config, date, isLiveMode):
        url = "na"
        return SubscriptionDataSource(url,SubscriptionTransportMedium.RemoteFile)
    def Reader(self, config, line, date, isLiveMode):
        if not (line.strip() and line[0].isdigit()): return None
        inst = MyYield()
        inst.Symbol = config.Symbol
        try:
            data = line.split(',')
            # # Make sure we only get this data AFTER trading day - don't want forward bias.
            inst.Time = datetime.strptime(data[0], '%Y-%m-%d')+timedelta(hours=20)
            inst.Value = decimal.Decimal(data[1])
        except:
            return None
        return inst
        
class MyYield1mnth(MyYield):
    def GetSource(self, config, date, isLiveMode):
        url = "https://fred.stlouisfed.org/graph/fredgraph.csv?id=TB4WK"
        return SubscriptionDataSource(url,SubscriptionTransportMedium.RemoteFile)
class MyYield3mnth(MyYield):
    def GetSource(self, config, date, isLiveMode):
        url = "https://fred.stlouisfed.org/graph/fredgraph.csv?id=TB3MS"
        return SubscriptionDataSource(url,SubscriptionTransportMedium.RemoteFile)

from collections import deque
from scipy import stats

class OptionAlgorithm(QCAlgorithm):

    def Initialize(self):
        self.SetStartDate(2015, 3, 1)
        self.SetEndDate(2020, 4, 1)
        
        self.SetCash(100000)
        
        resolution = Resolution.Minute
        
        self.SetBenchmark("SPY")
        
        self.spy = self.AddEquity("SPY", resolution).Symbol

        option = self.AddOption(self.spy, resolution)
        option.SetFilter(self.UniverseFunc)
        
        self.vix = self.AddData(Quandl,"CHRIS/CBOE_VX1", Resolution.Daily).Symbol
        
        self.ivr = None
        self.slice = None
        self.period = 125
        self.queue = deque(maxlen=self.period)
        
        self.interest1mnth = np.nan
        self.interest2mnth = np.nan
        self.interest3mnth = np.nan

        self.yield_1mnth = self.AddData(MyYield1mnth, "MYYIELD").Symbol
        self.yield_3mnth = self.AddData(MyYield3mnth, "MYYIELD").Symbol
        
        self.Schedule.On(
            self.DateRules.EveryDay(),
            self.TimeRules.AfterMarketOpen(self.spy, 60),
            Action(self.MyCompute)
        )

        self.Schedule.On(
            self.DateRules.EveryDay(),
            self.TimeRules.AfterMarketOpen(self.spy, 95),
            Action(self.MyTrade)
        )
        
        myplot = Chart('myplot')
        # plot actual, expected, rank
        myplot.AddSeries(Series('iv', SeriesType.Line, 0))
        myplot.AddSeries(Series('vix', SeriesType.Line, 0))
        myplot.AddSeries(Series('ivr', SeriesType.Line, 0))

    def UniverseFunc(self, universe):
        # obtain 30 DTE options +/- 2*sd from last price.
        #return universe.IncludeWeeklys().Expiration(timedelta(20), timedelta(40)).Strikes(-5,5)
        data = self.History(self.spy,timedelta(days=360*2),Resolution.Daily)
        sd = np.std(data['close'].pct_change(30).dropna().values)
        price = data['close'].values[-1]
        l_lim,u_lim = int(-2*sd*price),int(2*sd*price)
        return universe.IncludeWeeklys().Expiration(timedelta(20), timedelta(40)).Strikes(l_lim,u_lim)
        
    def OnData(self,slice):
        self.slice = slice

        if slice.ContainsKey(self.yield_1mnth):
            self.interest1mnth = slice[self.yield_1mnth].Value
            
        if slice.ContainsKey(self.yield_3mnth):
            self.interest3mnth = slice[self.yield_3mnth].Value

        if not np.isnan(self.interest1mnth) and not np.isnan(self.interest3mnth):
            self.interest2mnth = np.mean([self.interest1mnth,self.interest3mnth])
    
    def MyTrade(self):
        if self.ivr is None:
            return
        if self.ivr > 70:
            self.SetHoldings(self.spy,-0.2)
        else:
            # risk on.
            self.SetHoldings(self.spy, 0.8)

    def MyCompute(self):
        slice = self.slice
        
        if slice is None:
            return
        
        if slice.OptionChains.Count == 0:
            return
        
        for sliceitem in slice.OptionChains:
            
            chain = sliceitem.Value
            
            if len([x for x in chain]) < 1:
                continue
            
            iv = compute_iv(chain,interest1mnth=self.interest1mnth*0.01,interest2mnth=self.interest2mnth*0.01)
            
            if np.isnan(iv):
                continue
            
            self.queue.append(iv)
            self.Plot('myplot', 'iv', iv)

            # try:
            #     pass
            # except:
            #     self.Log(traceback.format_exc())

            data = self.History(self.vix,timedelta(days=10),Resolution.Daily)
            if len(data)>0:
                values = data['close'].values
                self.Plot('myplot', 'vix', values[-1])
            
            if len(self.queue) > 10: # == self.period:
                self.ivr = stats.percentileofscore(self.queue, iv)
                self.Plot('myplot', 'ivr', self.ivr)