Created with Highcharts 12.1.2EquityJan 2024Jan…Feb 2024Mar 2024Apr 2024May 2024Jun 2024Jul 2024Aug 2024Sep 2024Oct 2024Nov 2024Dec 2024Jan 2025Feb 202575k100k125k150k-10-5000.51010100M200M050k100k051015
Overall Statistics
Total Orders
36
Average Win
2.10%
Average Loss
-0.92%
Compounding Annual Return
24.943%
Drawdown
8.300%
Expectancy
1.415
Start Equity
100000
End Equity
127329.05
Net Profit
27.329%
Sharpe Ratio
1.219
Sortino Ratio
1.553
Probabilistic Sharpe Ratio
80.171%
Loss Rate
27%
Win Rate
73%
Profit-Loss Ratio
2.29
Alpha
0.037
Beta
0.628
Annual Standard Deviation
0.096
Annual Variance
0.009
Information Ratio
-0.131
Tracking Error
0.08
Treynor Ratio
0.186
Total Fees
$36.00
Estimated Strategy Capacity
$98000000.00
Lowest Capacity Asset
FB V6OIPNZEM8V9
Portfolio Turnover
0.70%
from AlgorithmImports import *
from scipy.optimize import brentq
import math
from scipy.stats import norm

class StockBuyingWithIV(QCAlgorithm):
    def Initialize(self):
        self.SetStartDate(2024, 1, 1)
        self.SetEndDate(2025, 1, 30)
        self.SetCash(100000)
        
        # List of stocks to trade
        self.tech_stocks = ["AAPL", "MSFT", "GOOGL", "AMZN", "META", "TSLA", "NVDA"]
        self.equities = {}
        self.rsi = {}
        self.ema = {}
        self.options = {}

        for stock in self.tech_stocks:
            equity = self.AddEquity(stock, Resolution.Minute).Symbol
            self.equities[stock] = equity
            
            # Add technical indicators
            # 14 day RSI - overbought (RSI > 70) or oversold (RSI < 30).
            self.rsi[stock] = self.RSI(equity, 14, MovingAverageType.Wilders, Resolution.Minute, Field.Close)
            # 50 day EMA 
            # EMA based on last 50 minutes close - short term 
            # self.ema[stock] = self.EMA(equity, 50, Resolution.Minute, Field.Close)
            # based on 50 day - long term
            self.ema[stock] = self.EMA(equity, 50, Resolution.Daily, Field.Close)

            # Add options to calculate implied volatility
            option = self.AddOption(stock, Resolution.Minute)
            option.SetFilter(self.UniverseFilter)
            self.options[stock] = option

            # Schedule daily evaluation for each stock
            self.Schedule.On(self.DateRules.EveryDay(self.equities[stock]), 
                             self.TimeRules.AfterMarketOpen(self.equities[stock], 30), 
                             lambda stock=stock: self.Evaluate(stock))

    def UniverseFilter(self, universe):
        # Select strikes within +/- 2 of the ATM strike and expirations up to 30 days
        return universe.Strikes(-2, 2).Expiration(timedelta(0), timedelta(30))

    def Evaluate(self, stock):
        # Fetch the current stock price
        price = self.Securities[self.equities[stock]].Price
        
        # Get the RSI and EMA values for the stock
        current_rsi = self.rsi[stock].Current.Value
        current_ema = self.ema[stock].Current.Value

        # Fetch the implied volatility for the stock using the options data
        iv = self.GetImpliedVolatility(stock)
        
        # Condition to buy based on RSI, EMA, and IV strategy
        # Buy when RSI is below 30 (oversold), price is above EMA (indicating uptrend), and IV is low
        if current_rsi < 30 and price > current_ema and iv < 0.5:
            self.BuyStock(stock)
        # SELL CONDITION 
        elif current_rsi > 70 and price < current_ema:
            self.SellStock(stock)

    def GetImpliedVolatility(self, stock):
        # Get the option chain for the stock
        chain = self.CurrentSlice.OptionChains.get(self.options[stock].Symbol)
        if not chain:
            return 0.4  # Default IV if no options are found
        
        # Calculate underlying price and identify ATM options
        underlying_price = self.Securities[self.equities[stock]].Price
        atm_call, atm_put = self.GetATMOptions(chain, underlying_price)
        if not atm_call or not atm_put:
            return 0.4  # Default IV if ATM options are not found

        # Calculate and log the implied volatilities of the ATM options
        call_iv = self.CalculateIV(atm_call, underlying_price)
        put_iv = self.CalculateIV(atm_put, underlying_price)
        if call_iv is None or put_iv is None:
            return 0.4  # Default IV if calculation fails

        # Return the average of the call and put IV
        return (call_iv + put_iv) / 2

    def GetATMOptions(self, chain, underlying_price):
        # Select the option contracts that are closest to being ATM
        atm_contract = min(chain, key=lambda x: abs(x.Strike - underlying_price))
        atm_calls = [o for o in chain if o.Strike == atm_contract.Strike and o.Right == OptionRight.Call]
        atm_puts = [o for o in chain if o.Strike == atm_contract.Strike and o.Right == OptionRight.Put]
        
        return (atm_calls[0] if atm_calls else None, atm_puts[0] if atm_puts else None)

    def CalculateIV(self, contract, underlying_price):
        # Calculate implied volatility using the Black-Scholes model and brentq numerical method
        market_price = (contract.BidPrice + contract.AskPrice) / 2
        if market_price <= 0:
            return None

        T = (contract.Expiry - self.Time).days / 365.0
        if T <= 0:
            self.Debug("Skipping contract with non-positive time to expiry")
            return None

        def bs_price(sigma):
            # Define the Black-Scholes pricing formula dependent on sigma
            d1 = (math.log(underlying_price / contract.Strike) + (0.01 + 0.5 * sigma**2) * T) / (sigma * math.sqrt(T))
            d2 = d1 - sigma * math.sqrt(T)
            if contract.Right == OptionRight.Call:
                return underlying_price * norm.cdf(d1) - contract.Strike * math.exp(-0.01 * T) * norm.cdf(d2)
            else:  # Put
                return contract.Strike * math.exp(-0.01 * T) * norm.cdf(-d2) - underlying_price * norm.cdf(-d1)

        # Use brentq to find the sigma that makes the theoretical price equal to the market price
        try:
            return brentq(lambda sigma: bs_price(sigma) - market_price, 0.01, 2)
        except ValueError:
            return None

    def BuyStock(self, stock):
        # Execute market order to buy stock if not already holding
        # if self.Portfolio[stock].Invested == False:
        
        # self.MarketOrder(stock, 100)  # Buy 100 shares of stock
        self.SetHoldings(stock, 0.1)
        self.Debug(f"Bought {stock}")

        # Debug to show current holdings
        self.Debug(f"Current Holdings: {[f'{symbol}: {holding.Quantity}' for symbol, holding in self.Portfolio.items() if holding.Invested]}")

    def SellStock(self, stock):
        # if you hold stock 
        if self.Portfolio[stock].Invested:
            self.Liquidate(stock)
            self.Debug(f"Sold all shares of {stock}")

    # need way to balance protfolio of what we hold