Overall Statistics
Total Trades
Average Win
Average Loss
Compounding Annual Return
Net Profit
Sharpe Ratio
Probabilistic Sharpe Ratio
Loss Rate
Win Rate
Profit-Loss Ratio
Annual Standard Deviation
Annual Variance
Information Ratio
Tracking Error
Treynor Ratio
Total Fees
Estimated Strategy Capacity
Lowest Capacity Asset
import numpy as np
from scipy.stats import norm
from statsmodels.tsa.arima.model import ARIMA

class SignalProcessor:
    ''' Models for volatility and price prediction '''
    def __init__(self, algo):
        ''' initiate a new instance of Model class object 
            algo: the algorithm instance'''
        self.algo = algo
        # initialize variable holding ARIMAX model
        self.arimax = None
    def processAltHistory(self, altHistoryDf):
        ''' process the raw Brain Language Metric of Company Filings historical data dataframe
            altHistoryDf: (n x m df) the raw historical data dataframe
            managementSentiment: (n x 1 series) series of sentiment level of management analysis off financial and operation 
            reportSentiment: (n x 1 series) series of sentiment level of financial report
            riskSentiment: (n x 1 series) series of sentiment level of risk factor statement'''
        managementSentiment = altHistoryDf.managementdiscussionanalyasisoffinancialconditionandresultsofoperations.unstack("symbol")
        reportSentiment = altHistoryDf.reportsentiment.unstack("symbol")
        riskSentiment = altHistoryDf.riskfactorsstatementsentiment.unstack("symbol")
        # we extract the sentiment levels from them
        def getSentiment(value):
            # we are not able to extract the ones without value (nan, which are float object)
            if not isinstance(value, float):
                sentiment = value.Sentiment
                if sentiment >= 0:
                    return sentiment
                    return abs(sentiment) * 1.5
            # fill nan with 0
                return 0
        # apply the function of extraction to the dataframes
        managementSentiment = managementSentiment.applymap(getSentiment)
        reportSentiment = reportSentiment.applymap(getSentiment)
        riskSentiment = riskSentiment.applymap(getSentiment)
        # align index
        managementSentiment.index = managementSentiment.index.date + timedelta(days=1)
        reportSentiment.index = reportSentiment.index.date + timedelta(days=1)
        riskSentiment.index = riskSentiment.index.date + timedelta(days=1)
        return managementSentiment, reportSentiment, riskSentiment
    def ARIMAXFitting(self, volatilities, altHistoryDf):
        ''' fitting a ARIMAX model for volatility modelling 
            volatilities: (n x 1 array) annualized volatility data series
            altHistoryDf: (n x m df) the raw historical data dataframe'''
        # data processing
        managementSentiment, reportSentiment, riskSentiment = self.processAltHistory(altHistoryDf)
        # convert volatility series into 2-d
        volatilities = pd.DataFrame(volatilities)
        # align our data
        df = pd.concat([volatilities, managementSentiment, reportSentiment, riskSentiment], axis=1).fillna(0)
        df.columns = ["volatility", "managementSentiment", "reportSentiment", "riskSentiment"]
        # ARIMAX, p & d & q from research ACF/PACF
        arima = ARIMA(df["volatility"], df[["managementSentiment", "reportSentiment", "riskSentiment"]],
                      order=(1, 1, 1))
            self.arimax = arima.fit()
            return True
            return False
    def VolatilityPrediction(self, n):
        ''' volatility prediction from ARIMAX model
            n: (int) number of steps of prediction
            (n x 3 array) series of prediction of the next step of volatility'''
        # set up zeros matrix (day without sentiment data consider 0, suppose no entry within 1 month)
        exog = np.zeros((n, 3))
        return self.arimax.predict(n, exog=exog)
    def PricePrediction(self, S_0, mu, volatility, iters=20000):
        ''' underlying stcok price prediction from SDE
            S_0: (float) initial stock price at time 0
            mu: (float) expectation of return
            volatility: (n x 1 array) the predicted volatility series
            iters: (int) number of iteration of Monte Carlo simulation
            (n size list) list of predicted terminal price'''
        # each time step length
        t = 1/365
        # set variable as terminal price of each step, at time 0 = current price
        S_t = np.array([S_0]*iters)
        # we predict the number of steps the volatility series provided
        for i in range(len(volatility)):
            # MonteCarlo simulation
            S_t = S_t * np.exp(t * (mu - volatility.iloc[i]**2/2) + volatility.iloc[i] * np.sqrt(t) * norm.rvs(size=iters)).reshape(-1, 1)
        return S_t
class Functions:
    def __init__(self, algo):
        ''' a new instance of class Functions 
            alog: algorithm instance'''
        self.algo = algo
    def GetOptionContracts(self, symbol, shortPutPrice, shortCallPrice, longPutPrice, longCallPrice):
        ''' get the contract for trading the Iron Condor
            symbol: the underlying symbol
            shortPutPrice: desire strike for put contract to be short selled
            shortCallPrice: desire strike for call contract to be short selled
            longPutPrice: desire strike for put contract to be long brought
            longCallPrice: desire strike for call contract to be long brought
            (option contracts x4) the 4 contracts to trade as Iron Condor'''
        # get list of options
        contracts = self.algo.OptionChainProvider.GetOptionContractList(symbol, self.algo.Time)
        # return None if no option contract available
        if not contracts: return None, None, None, None
        # only within 30 days and within 20 strikes as wider ones are prone to have slippage
        contracts = self.InitialFilter(symbol, contracts, -20, 20, 0, 30)
        # get calls
        calls = [contract for contract in contracts if contract.ID.OptionRight == OptionRight.Call]
        if not calls: return None, None, None, None
        # get contract closest to short call price from Percentile% and a close expiration within a month
        shortCallContract = sorted(sorted(calls, key = lambda x: x.ID.Date, reverse=True),
                                             key = lambda x: abs(shortCallPrice - x.ID.StrikePrice))[0]
        # get contract closest to long call price from Percentile% and same expiration date
        sameExpiryCalls = [contract for contract in calls if contract.ID.Date == shortCallContract.ID.Date]
        longCallContract = sorted(sameExpiryCalls, key = lambda x: abs(longCallPrice - x.ID.StrikePrice))[0]
        # get puts with same expiration
        sameExpiryPuts = [contract for contract in contracts if contract.ID.OptionRight == OptionRight.Put \
                                                            and contract.ID.Date == shortCallContract.ID.Date]
        if not sameExpiryPuts: return None, None, None, None
        # get contract closest to short put price from Percentile% and same expiration date
        shortPutContract = sorted(sameExpiryPuts, key = lambda x: abs(shortPutPrice - x.ID.StrikePrice))[0]
        # get contract closest to long put price from Percentile% and same expiration date
        match = [contract for contract in sameExpiryPuts if contract.ID.StrikePrice == shortPutContract.ID.StrikePrice - \
                                                            longCallContract.ID.StrikePrice + shortCallContract.ID.StrikePrice]
        # discontinue if no contract match Iron Condor requirement
        if not match: return None, None, None, None
        longPutContract = match[0]
        # subscribe
        return self.algo.AddOptionContract(shortPutContract, Resolution.Minute), \
               self.algo.AddOptionContract(shortCallContract, Resolution.Minute), \
               self.algo.AddOptionContract(longPutContract, Resolution.Minute), \
               self.algo.AddOptionContract(longCallContract, Resolution.Minute)
    def InitialFilter(self, underlyingsymbol, symbol_list, min_strike_rank, max_strike_rank, min_expiry, max_expiry):
        ''' This method is an initial filter of option contracts
            based on the range of strike price and the expiration date 
        if len(symbol_list) == 0 : return
        # fitler the contracts based on the expiry range
        contract_list = [i for i in symbol_list if min_expiry <= (i.ID.Date.date() - self.algo.Time.date()).days <= max_expiry]
        # find the strike price of ATM option
        atm_strike = sorted(contract_list,
                            key = lambda x: abs(x.ID.StrikePrice - self.algo.Securities[underlyingsymbol].Price))[0].ID.StrikePrice
        strike_list = sorted(set([i.ID.StrikePrice for i in contract_list]))
        # find the index of ATM strike in the sorted strike list
        atm_strike_rank = strike_list.index(atm_strike)
            min_strike = strike_list[atm_strike_rank + min_strike_rank + 1]
            max_strike = strike_list[atm_strike_rank + max_strike_rank - 1]

            min_strike = strike_list[0]
            max_strike = strike_list[-1]
        # skip weekly options
        filtered_contracts = [i for i in contract_list if i.ID.StrikePrice >= min_strike \
                                                        and i.ID.StrikePrice <= max_strike \
                                                        and not OptionSymbol.IsWeekly(i)]

        return filtered_contracts
    def GetPositionSize(self, shortPut, shortCall, longPut, longCall):
        ''' get the postion size as discrete individual bet by Kelly Criterion
            shortPut: put contract plan to be short selled
            shortCall: call contract plan to be short selled
            longPut: put contract plan to be long brought
            longCall: call contract plan to be long brought
            (int) the absolute number of contracts should be buy/short for each contract'''
        # get the spread first
        spread = longCall.StrikePrice - shortCall.StrikePrice
        # get all contract's current cost
        shortCallValue = shortCall.BidPrice
        shortPutValue = shortPut.BidPrice
        longCallValue = longCall.AskPrice
        longPutValue = longPut.AskPrice
        # total premium, minus commission 0.5*4
        premium = shortCallValue + shortPutValue - longCallValue - longPutValue - 2
        # return None to discontinue if no profitability
        if premium <= 0: return None
        # bet size with option contract multiplier 100
        bet = shortCallValue + shortPutValue + longCallValue + longPutValue
        # maximum profit and loss %
        maxProfitPercent = premium / bet
        maxLossPercent = (spread - premium) / bet
        # breakeven point, we need either one only as it's symmetric
        # if we factor spread as 1, this is the width of upper triangle (profit)
        breakEvenRatio = premium / spread
        # triangular area, up side is +, down side is -, 
        # we don't need to divide by 2 as there's 2 tails like this
        triangularArea = maxProfitPercent*breakEvenRatio - maxLossPercent*(1 - breakEvenRatio)
        # multiply by the ratio of all possibilities (difference in Percentile, already represent 2 tails)
        triangularArea *= self.algo.longPercentileLevel - self.algo.shortPercentileLevel
        # bet ratio, follows the equation in research notebook
        f = (maxProfitPercent * self.algo.shortPercentileLevel - maxLossPercent * (self.algo.longPercentileLevel - 1)) \
            /(maxProfitPercent * maxLossPercent * (self.algo.shortPercentileLevel - self.algo.longPercentileLevel + 1)) \
            - triangularArea
        # half the size such that half the risk but only 3/4 the expected return (Ed Thorp)
        f *= 0.5
        # since the ratio f is directly taking bet as base, we can use that as number of contract
        return max(min(int(f/100), self.algo.maxSize), 1)
from QuantConnect.DataSource import BrainCompanyFilingLanguageMetricsAll
import pandas as pd
from scipy.stats import norm
from Functions import Functions
from SignalProcessor import SignalProcessor

class CrawlingTanJellyfish(QCAlgorithm):

    def Initialize(self):
        # Parameter: determining the strike zone of iron condor
        # shortPercentileLevel must be smaller than longPercentileLevel
        self.shortPercentileLevel = 0.85
        self.longPercentileLevel = 0.95
        # parameter: maximum order size
        self.maxSize = 5
        self.SetStartDate(2016, 1, 1)
        tickers = ["FB", "AAPL", "AMZN", "NFLX", "GOOG"]
        self.symbols = [self.AddEquity(symbol, Resolution.Minute).Symbol for symbol in tickers]
         # dict contain corresponding alt data symbol for each symbol
        self.altSymbols = {}
        # dict contain corresponding ARIMAX model for each symbkl
        self.model = {}
        # dict contain corresponding entry condition check indicator
        self.checkOptions = {symbol: False for symbol in self.symbols}
        # dict contain corresponding historical trade bar data, we use 5+1 year (1 year offset annual volatility)
        self.history = {symbol: RollingWindow[TradeBar](252*6) for symbol in self.symbols}
        # dict contain order tickets
        self.tickets = {symbol: [] for symbol in self.symbols}
        # warm up rolling windows
        data = self.History(self.symbols, 252*6, Resolution.Daily)
        for symbol in self.symbols:
            for time, bar in data.loc[symbol].iterrows():
                tradeBar = TradeBar(time, symbol, bar.open, bar.high, bar.low, bar.close, bar.volume)
            # set up consolidator for future auto-update
            self.Consolidate(symbol, Resolution.Daily, self.DailyBarHandler)
        # schedule daily check for entry position if signal active
        self.Schedule.On(self.DateRules.EveryDay("AAPL"), self.TimeRules.At(10, 0), self.Entry)
        # class Functions for utilities
        self.fcn = Functions(self)
    def CustomSecurityInitializer(self, security):
    def DailyBarHandler(self, bar):
    def OnData(self, data):
        # If any symbol is going to have a split, we close our position
        for symbol in self.symbols:
            if data.Splits.ContainsKey(symbol):
                split = data.Splits[symbol]
                # will have a split on next day
                if split.Type == 0:
                    [self.SetHoldings(order, 0) for order in self.tickets[symbol]]
                    self.SetHoldings(symbol, 0)
                    self.tickets[symbol] = []
    def OnSecuritiesChanged(self, changes):
        for security in changes.AddedSecurities:
            if security.Type == SecurityType.Equity:
                # store the dataset symbol
                self.altSymbols[security.Symbol] = self.AddData(BrainCompanyFilingLanguageMetricsAll, security.Symbol).Symbol
                # create new ARIMAX model for symbol
    def OnOrderEvent(self, orderEvent):
        # liquidate if we have options being exercised at expiration
        if orderEvent.IsAssignment:
            order = self.Transactions.GetOrderById(orderEvent.OrderId)
            if order.Type == OrderType.OptionExercise:
                for symbol, orders in self.tickets.items():
                    if orderEvent.Symbol in orders:
                        [self.SetHoldings(order, 0) for order in orders]
                        self.SetHoldings(symbol, 0)
                        orders = []
    def CreateModel(self, symbol):
        ''' creation of ARIMAX model
            the underlying symbol'''
        # initiate new model
        self.model[symbol] = SignalProcessor(self)
        # set up the model with 5 year data warm up
        altHistory = self.History(self.altSymbols[symbol], timedelta(days=365*5), Resolution.Daily)
        # get annualized volatility
        data = pd.DataFrame(self.history[symbol])[::-1]
        history = data.applymap(lambda bar: bar.Close)
        history.index = data.applymap(lambda bar: bar.EndTime.date()).values.flatten().tolist()
        volatility = history.pct_change().rolling(252).std().dropna()
        # fit ARIMAX model
        self.model[symbol].ARIMAXFitting(volatility, altHistory)
    def Entry(self):
        ''' check if entry conditions satisfied 
            if yes, enter short Iron Condor position'''
        for symbol, symbolData in self.altSymbols.items():
            altHistory = self.History(symbolData, timedelta(days=365*5), Resolution.Daily)
            # if no newly updated data, return
            if altHistory.empty or self.Time - altHistory.unstack(0).index[-1] > timedelta(days=1): continue
            # liquidate previous orders if we have updated data
            [self.SetHoldings(order, 0) for order in self.tickets[symbol]]
            self.SetHoldings(symbol, 0)
            self.tickets[symbol] = []
            # get annualized volatility
            data = pd.DataFrame(self.history[symbol])[::-1]
            history = data.applymap(lambda bar: bar.Close)
            history.index = data.applymap(lambda bar: bar.EndTime.date()).values.flatten().tolist()
            volatility = history.pct_change().rolling(252).std().dropna()
            # update the ARIMAX model
            success = self.model[symbol].ARIMAXFitting(volatility, altHistory)
            if not success: continue
            # get next 21 day prediction on volatility
            predictions = self.model[symbol].VolatilityPrediction(21)
            # we continue only if the predicted volatility will drop
            if volatility.values[-1] > predictions.iloc[-1]:
                # get terminal price predicion of n paths
                terminalPrices = self.model[symbol].PricePrediction(history.iloc[-1], np.mean(history.pct_change()[1:].values), predictions)
                # fit for normal distribution, since we're using N(0, sigma^2) for Monte Carlo, we can assume normal
                loc, scale = norm.fit(terminalPrices)
                # get preset level desired price range
                shortPutPrice = norm.ppf((1-self.shortPercentileLevel)/2, loc=loc, scale=scale)
                shortCallPrice = norm.ppf((1+self.shortPercentileLevel)/2, loc=loc, scale=scale)
                longPutPrice = norm.ppf((1-self.longPercentileLevel)/2, loc=loc, scale=scale)
                longCallPrice = norm.ppf((1+self.longPercentileLevel)/2, loc=loc, scale=scale)
                # activate checking in data slice later
                self.checkOptions[symbol] = [shortPutPrice, shortCallPrice, longPutPrice, longCallPrice]
        # check option for activated symbols
        for symbol, check in self.checkOptions.items():
            if not check: continue
            # get option contracts
            shortPut, shortCall, longPut, longCall = self.fcn.GetOptionContracts(symbol, check[0], check[1], check[2], check[3])
            # if no contract return, discontinue
            if any([contract is None for contract in [shortCall, shortPut, longCall, longPut]]): continue
            # get postion size for trading the options
            quantity = self.fcn.GetPositionSize(shortPut, shortCall, longPut, longCall)
            # if no profitability expected, don't trade
            if quantity is None: continue
            # deactivate check
            self.checkOptions[symbol] = False
            # trade short Iron Condor, should be same quantity
            shortPutOrder = self.Sell(shortPut.Symbol, quantity)
            shortCallOrder = self.Sell(shortCall.Symbol, quantity)
            longPutOrder = self.Buy(longPut.Symbol, quantity)
            longCallOrder = self.Buy(longCall.Symbol, quantity)
            [self.tickets[symbol].append(order.Symbol) for order in [shortPutOrder, shortCallOrder, longPutOrder, longCallOrder]]