Overall Statistics
Total Trades
1412
Average Win
0.98%
Average Loss
-0.64%
Compounding Annual Return
6.081%
Drawdown
28.600%
Expectancy
0.089
Net Profit
45.660%
Sharpe Ratio
0.419
Probabilistic Sharpe Ratio
3.992%
Loss Rate
57%
Win Rate
43%
Profit-Loss Ratio
1.55
Alpha
-0.002
Beta
0.527
Annual Standard Deviation
0.116
Annual Variance
0.014
Information Ratio
-0.429
Tracking Error
0.11
Treynor Ratio
0.093
Total Fees
$2474.99
Estimated Strategy Capacity
$23000000.00
Lowest Capacity Asset
SPY R735QTJ8XC9X
Portfolio Turnover
60.64%
# region imports
from AlgorithmImports import *
# endregion

import numpy as np
from scipy.signal import argrelextrema
import talib
from datetime import timedelta

class StochasticOptionsAlgorithm(QCAlgorithm):
    def Initialize(self):
        self.SetStartDate(2017, 1, 1)
        self.SetEndDate(2023, 5, 15)
        self.SetCash(100000)
        
        self.SetSecurityInitializer(self.SecurityInitializer)
        
        self.spy = self.AddEquity("SPY", Resolution.Minute).Symbol
        #self.spy_option = self.AddOption("SPY")
        self.option_symbol = None
        self.strike_price = None
        self.expiration_date = None
        self.underlying_price = None
        self.position = None

        self.k_prev = 0
        self.d_prev = 0

        self.lookback = 14
        self.low_period = 3
        self.high_period = 3

        # Create the desired custom bar consolidator for the symbol
        consolidator = TradeBarConsolidator(timedelta(minutes=30))
        # Create an event handler to be called on each new consolidated bar
        consolidator.DataConsolidated += self.ComputeStochastic
        # Link the consolidator with our symbol and add it to the algo manager
        self.SubscriptionManager.AddConsolidator(self.spy, consolidator)
        # Save the consolidator
        self.consolidator = consolidator

        #self.Schedule.On(self.DateRules.EveryDay(self.spy), self.TimeRules.Every(timedelta(minutes=10)),
            #self.TradeOptions)
        #self.AddRiskManagement(MaximumDrawdownPercentPerSecurity(0.1))

        self.stochastic = Stochastic(
            14, 3, 3
        )

        self.bb = BollingerBands(20, 2)

        self.SetWarmup(50)



    def OnData(self, data):
        if data.ContainsKey(self.spy):
            if data[self.spy]:
                self.underlying_price = data[self.spy].Close
                self.high_price = data[self.spy].High
                self.low_price = data[self.spy].Low

    def TradeOptions(self, sender, bar):
        
        self.ComputeStochastic(bar)
    

        if self.strike_price and self.option_symbol == None:

            
            #self.SetHoldings(self.spy, 0.5)
            self.option_symbol = self.AddOptionContract(self.strike_price, Resolution.Minute).Symbol
            if self.strike_price not in self.Securities:
                return
            ask = self.Securities[self.strike_price].AskPrice
            bid = self.Securities[self.strike_price].BidPrice

            # Ignore if ask is 0
            if ask == 0:
                self.option_symbol = None
                return
            target_risk = 0.02*self.Portfolio.TotalPortfolioValue
            risk_per_contract = 100.0*0.5*ask
            target_qty = int(target_risk/risk_per_contract)
            #if self.position == 'long':
                #self.Buy(self.strike_price, target_qty)
                


    def ComputeStochastic(self, sender, bar):
        if len(self.History(self.spy, 2*self.lookback)) < self.lookback:
            return

        if self.underlying_price is None:
            return

        self.stochastic.Update(bar)
        self.bb.Update(bar.EndTime, bar.Close)
        

        if not self.stochastic.IsReady or self.stochastic.StochK.Current.Value == 0 or not self.bb.IsReady or self.bb.UpperBand.Current.Value == 0:
            return

        bb_upper = self.bb.UpperBand.Current.Value
        bb_lower = self.bb.LowerBand.Current.Value

        history = self.History(self.spy, 2*self.lookback, Resolution.Minute)
        lows = history['low']
        highs = history['high']
        closes = history['close']

        k_line, d_line = talib.STOCH(highs, lows, closes, fastk_period=14,slowk_period=3,slowk_matype=0,slowd_period=3,slowd_matype=0)

        k_current = self.stochastic.StochK.Current.Value
        d_current = self.stochastic.StochD.Current.Value
        if self.Portfolio.Invested:
                
            # Exit conditions for call options
            if self.position == 'long':
                if k_current >= 93 or (k_current > 80 and self.CrossedBelowSignalLine(k_current, d_current, self.k_prev, self.d_prev)) or self.underlying_price < bb_lower or self.underlying_price > bb_upper:
                    self.Liquidate(self.spy)
                    self.option_symbol = None
                    self.position = None
                    self.strike_price = None

                    
            # Exit conditions for put options
            if self.position == 'short':
                if k_current <= 10.7 or (k_current <= 20 and self.CrossedAboveSignalLine(k_current, d_current, self.k_prev, self.d_prev)) or self.underlying_price < bb_lower or self.underlying_price > bb_upper:
                    self.Liquidate(self.spy)
                    self.option_symbol = None
                    self.position = None
                    self.strike_price = None

        else:
            '''
            # Entry conditions for put options
            if (k_current >= 93.7 or (k_current > 80 and k_current > d_current))  and self.position == None:
                self.MarketOrder(self.spy, -int(self.Portfolio.TotalPortfolioValue/self.underlying_price))
                self.position = 'short'

            '''            
            # Entry conditions for call options
            if (k_current <= 8 or (k_current <= 20 and k_current < d_current)) and self.position == None:
                self.MarketOrder(self.spy, int(self.Portfolio.TotalPortfolioValue/self.underlying_price))
                self.position = 'long'
            
                    
        

        self.k_prev = self.stochastic.StochK.Current.Value
        self.d_prev = self.stochastic.StochD.Current.Value
        


    def GetPutStrikePrice(self, num_strikes, option_right):
        
        option_chain = self.OptionChainProvider.GetOptionContractList(self.spy, self.Time.date())
        '''
        expiries = [
            x.ID.Date.date() for x in option_chain if x.ID.Date.date() >= self.expiration_date
        ]
        # Get unique expiries as list
        expiries = list(set(expiries))
        expiries.sort()

        expiry_time = expiries[0]
        '''
        # Sort list earliest to latest
        option_chain = sorted(option_chain, key=lambda x: self.underlying_price - x.ID.StrikePrice)
        #option_Chain = [contract for contract in option_chain if contract.ID.StrikePrice < self.underlying_price]
        option_chain = sorted(option_chain, key=lambda x: x.ID.StrikePrice)
        option_chain = [contract for contract in option_chain if contract.ID.StrikePrice > self.underlying_price]
        #option_chain.reverse()
        range_of_expiration = self.expiration_date + timedelta(days=1)
        option_chain = [contract for contract in option_chain if contract.ID.Date.date() >= self.expiration_date and contract.ID.Date.date() < range_of_expiration]

        if len(option_chain) < num_strikes:
            return None

        return option_chain[num_strikes - 1]

    def GetCallStrikePrice(self, num_strikes, option_right):
        option_chain = self.OptionChainProvider.GetOptionContractList(self.spy, self.Time.date())

        '''
        expiries = [
            x.ID.Date.date() for x in option_chain if x.ID.Date.date() >= self.expiration_date
        ]
        # Get unique expiries as list
        expiries = list(set(expiries))
        expiries.sort()

        expiry_time = expiries[0]
        '''
        option_chain = [contract for contract in option_chain if contract.ID.OptionRight == option_right]
        #option_chain = sorted(option_chain, key=lambda x: self.underlying_price - x.ID.StrikePrice)
        #option_Chain = [contract for contract in option_chain if contract.ID.StrikePrice < self.underlying_price]
        option_chain = sorted(option_chain, key=lambda x: x.ID.StrikePrice)
        option_chain = [contract for contract in option_chain if contract.ID.StrikePrice < self.underlying_price]
        option_chain.reverse()
        range_of_expiration = self.expiration_date + timedelta(days=1)
        option_chain = [contract for contract in option_chain if contract.ID.Date.date() == self.expiration_date]

        if len(option_chain) < num_strikes:
            return None

        return option_chain[num_strikes - 1]

    def GetLowerBollingerBand(self, period):
        prices = self.History(self.spy, period, Resolution.Minute)['close']
        sma = np.mean(prices)
        std = np.std(prices)
        return sma - 2 * std

    def GetUpperBollingerBand(self, period):
        prices = self.History(self.spy, period, Resolution.Minute)['close']
        sma = np.mean(prices)
        std = np.std(prices)
        return sma + 2 * std

    def CrossedAboveSignalLine(self, value, signal, prev_value, prev_signal):

        return prev_value <= prev_signal and value > signal

    def CrossedBelowSignalLine(self, value, signal, prev_value, prev_signal):

        return prev_value >= prev_signal and value < signal

    def CustomSecurityInitializer(self, security):
        if Extensions.IsOption(security.Symbol.SecurityType):
            security.SetOptionAssignmentModel(NullOptionAssignmentModel())


    def OnOrderEvent(self, orderEvent):
    
        if orderEvent.IsInTheMoney:
    
            order = self.Transactions.GetOrderById(orderEvent.OrderId)
            if order.Type == OrderType.OptionExercise:
                pass
                #self.Liquidate(self.spy)
            
    def SecurityInitializer(self, security):
        security.SetMarketPrice(self.GetLastKnownPrice(security))
        security.SetDataNormalizationMode(DataNormalizationMode.Raw)
        if Extensions.IsOption(security.Symbol.SecurityType):
            security.SetOptionAssignmentModel(NullOptionAssignmentModel())        


class CustomSecurityInitializer(BrokerageModelSecurityInitializer):
    def __init__(
        self, brokerage_model: IBrokerageModel, 
        security_seeder: ISecuritySeeder) -> None:
        super().__init__(brokerage_model, security_seeder)

    def Initialize(self, security: Security) -> None:
        """
        Define models to be used for securities as they are added to the 
        algorithm's universe.
        """
        # First, call the superclass definition
        # This method sets the reality models of each security using the 
        #  default reality models of the brokerage model
        super().Initialize(security)

        security.SetMarketPrice(self.GetLastKnownPrice(security))

        # Define the data normalization mode
        security.SetDataNormalizationMode(DataNormalizationMode.Raw)

        # Define the fee model to use for the security
        # security.SetFeeModel()
        # Define the slippage model to use for the security
        # security.SetSlippageModel()
        # Define the fill model to use for the security
        # security.SetFillModel()
        # Define the buying power model to use for the security
        # security.SetBuyingPowerModel()