Overall Statistics
Total Trades
158
Average Win
2.81%
Average Loss
-2.95%
Compounding Annual Return
50.831%
Drawdown
13.500%
Expectancy
0.241
Net Profit
75.519%
Sharpe Ratio
1.658
Probabilistic Sharpe Ratio
72.593%
Loss Rate
36%
Win Rate
64%
Profit-Loss Ratio
0.95
Alpha
0.384
Beta
0.569
Annual Standard Deviation
0.217
Annual Variance
0.047
Information Ratio
1.954
Tracking Error
0.206
Treynor Ratio
0.632
Total Fees
$343.00
Estimated Strategy Capacity
$400000.00
Lowest Capacity Asset
SPY Y7Z53BL1TIW6|SPY R735QTJ8XC9X
Portfolio Turnover
9.11%
# region imports
from AlgorithmImports import *
# endregion

import numpy as np
from scipy.signal import argrelextrema
import talib
from datetime import timedelta

class StochasticOptionsAlgorithm(QCAlgorithm):
    def Initialize(self):
        self.SetStartDate(2022, 1, 1)
        self.SetEndDate(2023, 5, 15)
        self.SetCash(100000)
        
        self.SetSecurityInitializer(
            CustomSecurityInitializer(
                self.BrokerageModel, 
                FuncSecuritySeeder(self.GetLastKnownPrices)
            )
        )
        self.spy = self.AddEquity("SPY", Resolution.Minute).Symbol
        self.option_symbol = None
        self.strike_price = None
        self.expiration_date = None
        self.underlying_price = None
        self.position = None

        self.k_prev = 0
        self.d_prev = 0

        self.lookback = 14
        self.low_period = 3
        self.high_period = 3

        # Create the desired custom bar consolidator for the symbol
        consolidator = TradeBarConsolidator(timedelta(minutes=30))
        # Create an event handler to be called on each new consolidated bar
        consolidator.DataConsolidated += self.TradeOptions
        # Link the consolidator with our symbol and add it to the algo manager
        self.SubscriptionManager.AddConsolidator(self.spy, consolidator)
        # Save the consolidator
        self.consolidator = consolidator

        #self.Schedule.On(self.DateRules.EveryDay(self.spy), self.TimeRules.Every(timedelta(minutes=10)),
            #self.TradeOptions)
        #self.AddRiskManagement(MaximumDrawdownPercentPerSecurity(0.1))

        self.stochastic = Stochastic(
            14, 3, 3
        )

        self.bb = BollingerBands(20, 2)

        self.SetWarmup(50)



    def OnData(self, data):
        if data.ContainsKey(self.spy):
            if data[self.spy]:
                self.underlying_price = data[self.spy].Close
                self.high_price = data[self.spy].High
                self.low_price = data[self.spy].Low

    def TradeOptions(self, sender, bar):
        
        self.ComputeStochastic(bar)
        
        if self.option_symbol is None:
            

            if self.Securities.ContainsKey(self.spy):
                self.SetHoldings(self.spy, 0)

            if self.strike_price:
                self.option_symbol = self.AddOptionContract(self.strike_price, Resolution.Minute).Symbol
                if self.strike_price not in self.Securities:
                    return
                ask = self.Securities[self.strike_price].AskPrice
                # Ignore if ask is 0
                if ask == 0:
                    return
                target_risk = 0.02*self.Portfolio.TotalPortfolioValue
                risk_per_contract = 100.0*0.5*ask
                target_qty = int(target_risk/risk_per_contract)
                if self.position == 'long':
                    self.MarketOrder(self.strike_price, target_qty)
                    
                else:
                    self.MarketOrder(self.strike_price, -target_qty)

    def ComputeStochastic(self, bar):
        if len(self.History(self.spy, 2*self.lookback)) < self.lookback:
            return

        if self.underlying_price is None:
            return

        self.stochastic.Update(bar)
        self.bb.Update(bar.EndTime, bar.Close)
        

        if not self.stochastic.IsReady or self.stochastic.StochK.Current.Value == 0 or not self.bb.IsReady or self.bb.UpperBand.Current.Value == 0:
            return

        bb_upper = self.bb.UpperBand.Current.Value
        bb_lower = self.bb.LowerBand.Current.Value

        history = self.History(self.spy, 2*self.lookback, Resolution.Minute)
        lows = history['low']
        highs = history['high']
        closes = history['close']

        k_line, d_line = talib.STOCH(highs, lows, closes, fastk_period=14,slowk_period=3,slowk_matype=0,slowd_period=3,slowd_matype=0)

        k_current = self.stochastic.StochK.Current.Value
        d_current = self.stochastic.StochD.Current.Value
        
        # Exit conditions for call options
        if self.option_symbol is not None and self.option_symbol.SecurityType == SecurityType.Option and self.position == 'long':
            if k_current >= 93 or (k_current > 80 and self.CrossedBelowSignalLine(k_current, d_current, self.k_prev, self.d_prev)):
                self.Liquidate(self.option_symbol)
                self.option_symbol = None
                self.position = None
                self.strike_price = None

        
        # Exit conditions for put options
        if self.option_symbol is not None and self.option_symbol.SecurityType == SecurityType.Option and self.position == 'short':
            if k_current <= 10.7 or (k_current <= 20 and self.CrossedAboveSignalLine(k_current, d_current, self.k_prev, self.d_prev)):
                self.Liquidate(self.option_symbol)
                self.option_symbol = None
                self.position = None
                self.strike_price = None


        # Entry conditions for put options
        if (k_current >= 93.7 or (k_current > 80 and k_current > d_current))  and self.position == None:
            #self.MarketOrder(self.spy, -20)
            self.expiration_date = self.Time.date() + timedelta(days=2)
            self.strike_price = self.GetPutStrikePrice(1, OptionRight.Put)
            if self.strike_price:
                self.position = 'short'
            
        # Entry conditions for call options
        if (k_current <= 8 or (k_current <= 20 and k_current < d_current)) and self.position == None:
            #self.MarketOrder(self.spy, 20)
            self.expiration_date = self.Time.date() + timedelta(days=2)
            self.strike_price = self.GetCallStrikePrice(3, OptionRight.Call)
            if self.strike_price:
                self.position = 'long'
        

        self.k_prev = self.stochastic.StochK.Current.Value
        self.d_prev = self.stochastic.StochD.Current.Value
        


    def GetPutStrikePrice(self, num_strikes, option_right):
        
        option_chain = self.OptionChainProvider.GetOptionContractList(self.spy, self.Time.date())
        '''
        expiries = [
            x.ID.Date.date() for x in option_chain if x.ID.Date.date() >= self.expiration_date
        ]
        # Get unique expiries as list
        expiries = list(set(expiries))
        expiries.sort()

        expiry_time = expiries[0]
        '''
        # Sort list earliest to latest
        option_chain = sorted(option_chain, key=lambda x: self.underlying_price - x.ID.StrikePrice)
        #option_Chain = [contract for contract in option_chain if contract.ID.StrikePrice < self.underlying_price]
        option_chain = sorted(option_chain, key=lambda x: x.ID.StrikePrice)
        option_chain = [contract for contract in option_chain if contract.ID.StrikePrice > self.underlying_price]
        #option_chain.reverse()
        range_of_expiration = self.expiration_date + timedelta(days=1)
        option_chain = [contract for contract in option_chain if contract.ID.Date.date() >= self.expiration_date and contract.ID.Date.date() < range_of_expiration]

        if len(option_chain) < num_strikes:
            return None

        return option_chain[num_strikes - 1]

    def GetCallStrikePrice(self, num_strikes, option_right):
        option_chain = self.OptionChainProvider.GetOptionContractList(self.spy, self.Time.date())

        '''
        expiries = [
            x.ID.Date.date() for x in option_chain if x.ID.Date.date() >= self.expiration_date
        ]
        # Get unique expiries as list
        expiries = list(set(expiries))
        expiries.sort()

        expiry_time = expiries[0]
        '''
        option_chain = [contract for contract in option_chain if contract.ID.OptionRight == option_right]
        option_chain = sorted(option_chain, key=lambda x: self.underlying_price - x.ID.StrikePrice)
        #option_Chain = [contract for contract in option_chain if contract.ID.StrikePrice < self.underlying_price]
        option_chain = sorted(option_chain, key=lambda x: x.ID.StrikePrice)
        option_chain = [contract for contract in option_chain if contract.ID.StrikePrice < self.underlying_price]
        option_chain.reverse()
        range_of_expiration = self.expiration_date + timedelta(days=1)
        option_chain = [contract for contract in option_chain if contract.ID.Date.date() >= self.expiration_date and contract.ID.Date.date() < range_of_expiration]

        if len(option_chain) < num_strikes:
            return None

        return option_chain[num_strikes - 1]

    def GetLowerBollingerBand(self, period):
        prices = self.History(self.spy, period, Resolution.Minute)['close']
        sma = np.mean(prices)
        std = np.std(prices)
        return sma - 2 * std

    def GetUpperBollingerBand(self, period):
        prices = self.History(self.spy, period, Resolution.Minute)['close']
        sma = np.mean(prices)
        std = np.std(prices)
        return sma + 2 * std

    def CrossedAboveSignalLine(self, value, signal, prev_value, prev_signal):

        return prev_value <= prev_signal and value > signal

    def CrossedBelowSignalLine(self, value, signal, prev_value, prev_signal):

        return prev_value >= prev_signal and value < signal


class CustomSecurityInitializer(BrokerageModelSecurityInitializer):
    def __init__(
        self, brokerage_model: IBrokerageModel, 
        security_seeder: ISecuritySeeder) -> None:
        super().__init__(brokerage_model, security_seeder)

    def Initialize(self, security: Security) -> None:
        """
        Define models to be used for securities as they are added to the 
        algorithm's universe.
        """
        # First, call the superclass definition
        # This method sets the reality models of each security using the 
        #  default reality models of the brokerage model
        super().Initialize(security)

        # Define the data normalization mode
        security.SetDataNormalizationMode(DataNormalizationMode.Raw)

        # Define the fee model to use for the security
        # security.SetFeeModel()
        # Define the slippage model to use for the security
        # security.SetSlippageModel()
        # Define the fill model to use for the security
        # security.SetFillModel()
        # Define the buying power model to use for the security
        # security.SetBuyingPowerModel()