Overall Statistics |
Total Trades 58 Average Win 0.08% Average Loss -0.17% Compounding Annual Return -68.254% Drawdown 30.700% Expectancy 0.235 Net Profit -17.102% Sharpe Ratio -0.669 Probabilistic Sharpe Ratio 18.798% Loss Rate 17% Win Rate 83% Profit-Loss Ratio 0.49 Alpha 0 Beta 0 Annual Standard Deviation 0.663 Annual Variance 0.439 Information Ratio -0.669 Tracking Error 0.663 Treynor Ratio 0 Total Fees $40.00 Estimated Strategy Capacity $440000.00 Lowest Capacity Asset SPY R735QTJ8XC9X |
#region imports from AlgorithmImports import * from QuantConnect.Logging import * from collections import deque from itertools import groupby from MarketHours import MarketHours from PortfolioHandler import Handler from PortfolioHandler.OStrategies import BullPutSpread, BearCallSpread from CustomIndicators import RipsterClouds, TTMSqueezePro, ATRLevels from Tools import ConsolidateIndicator, ExpirationRange, MainLogger, TradeCredit, DataHandler from CustomData import SPX0dte #endregion # TODO: class ATRSpreadsAlphaModel(AlphaModel): algorithm = None def __init__(self, algorithm, ticker): self.ticker = ticker self.algorithm = algorithm self.dataHandler = DataHandler(algorithm, ticker) self.symbol = self.dataHandler.AddSymbol() # self.spx0 = self.algorithm.AddData(SPX0dte, "SPX0", Resolution.Minute).Symbol self.marketHours = MarketHours(algorithm, self.ticker) self.portfolio = Handler(self.algorithm) # self.targetExpiration = ExpirationRange(25, 45) # self.neutralExpiration = ExpirationRange(7, 14) self.zeroDTE = ExpirationRange(0, 7) self.Log = MainLogger(self.algorithm) self.Credit = TradeCredit() self.ATRLevels = ATRLevels('ATRLevels', length = 14) algorithm.RegisterIndicator(self.ticker, self.ATRLevels, Resolution.Daily) self.algorithm.WarmUpIndicator(self.ticker, self.ATRLevels, Resolution.Daily) self.squeeze = TTMSqueezePro("Squeeze", length = 20) algorithm.RegisterIndicator(self.ticker, self.squeeze, Resolution.Daily) self.algorithm.WarmUpIndicator(self.ticker, self.squeeze, Resolution.Daily) self.indicators = { # 'ATRLevel': self.ATRLevels, # 'Squeeze' : self.squeeze, # 'Trade Plot': ["Call", "Put"] 'Trade Plot': ["BullPutSpread", "BearCallSpread"] } self.algorithm.benchmark.AddIndicators(self.indicators) def Update(self, algorithm, data): insights = [] if self.ticker not in data.Keys: return insights if algorithm.IsWarmingUp: return insights if not self.squeeze.IsReady or not self.ATRLevels.IsReady: return insights self.algorithm.benchmark.PrintBenchmark() if self.marketHours.get_CurrentClose().hour - 1 > self.algorithm.Time.hour > self.marketHours.get_CurrentOpen().hour + 2: # Monitor the positions if self.algorithm.Time.minute in list(range(10, 60))[0::10]: # insights.extend(self.MonitorBullSpread(data)) insights.extend(self.MonitorBearSpread(data)) # if self.marketHours.get_CurrentClose().hour - 1 == self.algorithm.Time.hour: if self.marketHours.get_CurrentOpen().hour == self.algorithm.Time.hour: # Scan for a new position. if self.algorithm.Time.minute == 45: # insights.extend(self.ScannerBullSpread(data)) insights.extend(self.ScannerBearSpread(data)) return insights # REGION: handling of spreads def MonitorBearSpread(self, data): insights = [] # - close if ITM # - add hedge if passed shortSide - (5$-10$) for bearCall in self.portfolio.BearCallSpreads(self.ticker): close = False hedge = False # pick the shortCall strike (it's the smallest value) shortCallStrike = bearCall.shortCall.Security.StrikePrice hedges = [] hedgeTrade = None for s in self.portfolio.BullPutSpreads(self.ticker): if s.shortPut.Security.StrikePrice == bearCall.longCall.Security.StrikePrice and s.longPut.Security.StrikePrice == bearCall.shortCall.Security.StrikePrice: hedges.append(s) if len(hedges) > 0: hedgeTrade = hedges[0] if self.__StockPrice() >= (shortCallStrike) and not hedgeTrade: hedge = True self.Log.Add("HEDGE bearCall {}: stockPrice({}) >= sortCallStrike({})".format(bearCall.ToString(), self.__StockPrice(), shortCallStrike )) if self.__StockPrice() >= (shortCallStrike): close = True self.Log.Add("CLOSE bearCall {}: stockPrice({}) >= sortCallStrike({})".format(bearCall.ToString(), self.__StockPrice(), shortCallStrike )) if close: self.Credit.Buy(bearCall.AskPrice()) self.Log.Add("[{}] BUY with credit {} / ${}(AskPrice) / {}%(Profit)".format(self.algorithm.Time.strftime("%A"), self.Credit.ToString(), bearCall.AskPrice(), bearCall.UnrealizedProfit()) ) self.Log.Add("ATR {}".format(self.ATRLevels.ToString())) insights.extend( [ Insight.Price(bearCall.shortCall.Symbol, Resolution.Minute, 15, InsightDirection.Flat), Insight.Price(bearCall.longCall.Symbol, Resolution.Minute, 15, InsightDirection.Flat) ] ) if hedge: hedge = self.__GetHedge(optionType = OptionRight.Put, shortStrike = bearCall.longCall.Security.StrikePrice, longStrike = bearCall.shortCall.Security.StrikePrice) self.Credit.Sell(hedge.AskPrice()) self.Log.Add("[{}] SCANNER - hedging with {} with credit {} / ${}(AskPrice)".format(self.algorithm.Time.strftime("%A"), hedge.ToString(), self.Credit.ToString(), hedge.AskPrice()) ) insights.extend( [ Insight.Price(hedge.shortPut, Resolution.Minute, 15, InsightDirection.Down), Insight.Price(hedge.longPut, Resolution.Minute, 15, InsightDirection.Up) ] ) self.Log.Print() return insights def ScannerBearSpread(self, data): insights = [] ## Buying conditions lenBearCalls = len(self.portfolio.BearCallSpreads(self.ticker)) # We have bear spreads already then just skip for now. if lenBearCalls > 0: return insights bearCall = self.__FindBearCall(data) self.Log.Add("SCANNER - Trying to find a bear call spread") if bearCall: self.Credit.Sell(bearCall.AskPrice()) self.Log.Add("[{}] SCANNER - buying {} with credit {} / ${}(AskPrice)".format(self.algorithm.Time.strftime("%A"), bearCall.ToString(), self.Credit.ToString(), bearCall.AskPrice()) ) self.Log.Add("ATR {}".format(self.ATRLevels.ToString())) insights.extend( [ Insight.Price(bearCall.shortCall, Resolution.Minute, 15, InsightDirection.Down), Insight.Price(bearCall.longCall, Resolution.Minute, 15, InsightDirection.Up), ] ) self.Log.Print() return insights def MonitorBullSpread(self, data): insights = [] # - ExpiresIn <= 2 # - profit > 80 # - profit <= -100 for bullPut in self.portfolio.BullPutSpreads(self.ticker): close = False # pick the shortPut strike (it's the smallest value) shortPutStrike = min(bullPut.Strike()) if self.__StockPrice() <= (shortPutStrike + 4): hedge = True self.Log.Add("HEDGE bullPut {}: stockPrice{} >= sortPutStrike - 4{}".format(bullPut.ToString(), self.__StockPrice(), shortPutStrike - 4 )) if self.__StockPrice() <= shortPutStrike: close = True self.Log.Add("CLOSE bullPut {}: stockPrice{} >= sortPutStrike{}".format(bullPut.ToString(), self.__StockPrice(), shortPutStrike )) if close: self.Credit.Buy(bullPut.AskPrice()) self.Log.Add("BUY with credit {} / ${} / {}%".format(self.Credit.ToString(), bullPut.AskPrice(), bullPut.UnrealizedProfit()) ) insights.extend( [ Insight.Price(bullPut.shortPut.Symbol, Resolution.Minute, 15, InsightDirection.Flat), Insight.Price(bullPut.longPut.Symbol, Resolution.Minute, 15, InsightDirection.Flat) ] ) self.Log.Print() return insights def ScannerBullSpread(self, data): insights = [] ## Buying conditions lenPutSpreads = len(self.portfolio.BullPutSpreads(self.ticker)) # We have calls already then just skip for now. if lenPutSpreads > 0: return insights bullPut = self.__FindBullPut(data) self.Log.Add("SCANNER - Trying to find a bull put spread") if bullPut: self.Credit.Sell(bullPut.AskPrice()) self.Log.Add("SCANNER - selling {} for premium {} / ${}".format(bullPut.ToString(), self.Credit.ToString(), bullPut.AskPrice()) ) insights.extend( [ Insight.Price(bullPut.longPut, Resolution.Minute, 15, InsightDirection.Up), Insight.Price(bullPut.shortPut, Resolution.Minute, 15, InsightDirection.Down) ] ) self.Log.Print() return insights # ENDREGION: handling of spreads def __StockPrice(self): return self.algorithm.Securities[self.ticker].Price def __GetHedge(self, optionType, shortStrike, longStrike): contracts = self.dataHandler.OptionContracts(expiration = self.zeroDTE, types = [optionType]) if not contracts: return None # only pick contracts that expire today. contracts = [x for x in contracts if x.ID.Date.date() == self.algorithm.Time.date()] # Iterate over the contracts that are grouped per day. for expiry, group in groupby(contracts, lambda x: x.ID.Date): group = list(group) soldOpt = min(group, key=lambda x: abs(x.ID.StrikePrice - shortStrike)) # strike is above soldCall by value $5 boughtOpt = min(group, key=lambda x: abs(x.ID.StrikePrice - longStrike)) if soldOpt is not None and boughtOpt is not None: self.dataHandler.AddOptions(contracts = [soldOpt, boughtOpt], resolution = Resolution.Minute) if optionType == OptionRight.Put: return BullPutSpread(self.algorithm, soldOpt, boughtOpt) elif optionType == OptionRight.Call: return BearCallSpread(self.algorithm, soldOpt, boughtOpt) def __FindBearCall(self, data): # Check if there is options data for this symbol. contracts = self.dataHandler.OptionContracts(expiration = self.zeroDTE, types = [OptionRight.Call]) if not contracts: return None # only pick contracts that expire today. contracts = [x for x in contracts if x.ID.Date.date() == self.algorithm.Time.date()] # Iterate over the contracts that are grouped per day. for expiry, group in groupby(contracts, lambda x: x.ID.Date): group = list(group) soldCall = min(group, key=lambda x: abs(x.ID.StrikePrice - (self.ATRLevels.NextLevel(2, bull = True) + 2))) # strike is above soldCall by value $5 boughtCall = min(group, key=lambda x: abs(x.ID.StrikePrice - (soldCall.ID.StrikePrice + 1))) if soldCall is not None and boughtCall is not None: self.dataHandler.AddOptions(contracts = [soldCall, boughtCall], resolution = Resolution.Minute) return BearCallSpread(self.algorithm, soldCall, boughtCall) return None def __FindBullPut(self, data): # Check if there is options data for this symbol. contracts = self.dataHandler.OptionContracts(expiration = self.zeroDTE, types = [OptionRight.Put]) if not contracts: return None # Iterate over the contracts that are grouped per day. for expiry, group in groupby(contracts, lambda x: x.ID.Date): group = list(group) # sort the options by credit # options = sorted(group, key = lambda x: self.algorithm.Securities[x.Value].Greeks.Delta) # with 6 delta. # soldPut = min(options, key=lambda x: abs(self.algorithm.Securities[x.Value].Greeks.Delta - .06)) soldPut = min(group, key=lambda x: abs(x.ID.StrikePrice - (self.ATRLevels.NextLevel(2, bull = True) - 5))) # strike is below soldPut by value 5$ boughtPut = min(group, key=lambda x: abs(x.ID.StrikePrice - (soldPut.ID.StrikePrice - 5))) if soldPut is not None and boughtPut is not None: self.dataHandler.AddOptions(contracts = [soldPut, boughtPut], resolution = Resolution.Minute) return BullPutSpread(self.algorithm, soldPut, boughtPut) return None
from System.Drawing import Color from AlgorithmImports import * class Benchmark: def __init__(self, algo, underlying, shares = 100, indicators = {}): self.algo = algo self.underlying = underlying # Variable to hold the last calculated benchmark value self.benchmarkCash = None self.benchmarkShares = shares self.BetaValue = 0 self.SeriesCount = 0 self.AddIndicators(indicators) self.resample = datetime.min self.resamplePeriod = (self.algo.EndDate - self.algo.StartDate) / 2000 def SetBeta(self, value): self.BetaValue = value def AddIndicators(self, indicators): self.indicators = indicators for name, indicator in self.indicators.copy().items(): if name == 'Squeeze': self.algo.AddChart(self.__SqueezeChart()) elif name == 'Trade Plot': self.algo.AddChart(self.__TradePlotChart(indicator)) del self.indicators['Trade Plot'] # TEMP # elif name == 'ATRLevel': # del self.indicators['ATRLevel'] else: self.algo.AddChart(Chart(name)) def PrintBenchmark(self): if self.algo.Time <= self.resample: return self.resample = self.algo.Time + self.resamplePeriod # self.__PrintBuyHold() self.__PrintTrades() self.__PrintCash() self.__PrintIndicators() self.__PrintBeta() def PrintTrade(self, order, trade, quantity): ''' Prints the price of the option on our trade chart. ''' plotTradeName = '' tradeStrike = trade.Strike() if isinstance(tradeStrike, list): if order.Symbol.ID.OptionRight == OptionRight.Put: tradeName = "{0} Put".format(trade.name) elif order.Symbol.ID.OptionRight == OptionRight.Call: tradeName = "{0} Call".format(trade.name) tradeStrike = order.Symbol.ID.StrikePrice else: tradeName = trade.name if quantity < 0: plotTradeName = 'Sell {}'.format(tradeName) elif quantity == 0: plotTradeName = 'Flat {}'.format(tradeName) else: plotTradeName = 'Buy {}'.format(tradeName) self.algo.Plot('Trade Plot', plotTradeName, tradeStrike) def __PrintBeta(self): if self.BetaValue > 0: self.algo.Plot('Trade Plot', 'Beta', self.BetaValue) def __PrintIndicators(self): ''' Prints the indicators array values to the Trade Plot chart. ''' for name, indicator in self.indicators.items(): if name == 'BB': self.__PlotBB(indicator) elif name == 'AROON': self.__PlotAROON(indicator) elif name == 'MACD': self.__PlotMACD(indicator) elif name == 'Squeeze': self.__PlotSqueeze(indicator) elif name == 'ATRLevel': self.__PlotATRLevel(indicator) else: self.algo.PlotIndicator(name, indicator) def __PlotBB(self, indicator): self.algo.Plot('BB', 'Price', self.__UnderlyingPrice()) self.algo.Plot('BB', 'BollingerUpperBand', indicator.UpperBand.Current.Value) self.algo.Plot('BB', 'BollingerMiddleBand', indicator.MiddleBand.Current.Value) self.algo.Plot('BB', 'BollingerLowerBand', indicator.LowerBand.Current.Value) def __PlotMACD(self, indicator): # self.algo.Plot('MACD', 'MACD', indicator.Current.Value) # self.algo.Plot('MACD', 'MACDSignal', indicator.Signal.Current.Value) # self.algo.Plot("MACD", "Price", self.__UnderlyingPrice()) self.algo.Plot("MACD", "Zero", 0) self.algo.Plot('MACD', 'MACDSignal', indicator.Signal.Current.Value) def __PlotAROON(self, indicator): self.algo.Plot('AROON', 'Aroon', indicator.Current.Value) self.algo.Plot('AROON', 'AroonUp', indicator.AroonUp.Current.Value) self.algo.Plot('AROON', 'AroonDown', indicator.AroonDown.Current.Value) def __PlotSqueeze(self, indicator): if indicator.MomentumHistogramColor() == Color.Aqua: self.algo.Plot('Squeeze', 'UP Bull MOM', indicator.Current.Value) elif indicator.MomentumHistogramColor() == Color.Blue: self.algo.Plot('Squeeze', 'DOWN Bull MOM', indicator.Current.Value) elif indicator.MomentumHistogramColor() == Color.Red: self.algo.Plot('Squeeze', 'DOWN Bear MOM', indicator.Current.Value) elif indicator.MomentumHistogramColor() == Color.Yellow: self.algo.Plot('Squeeze', 'UP Bear MOM', indicator.Current.Value) if indicator.Squeeze == 0: self.algo.Plot('Squeeze', 'No SQZ', 0) elif indicator.Squeeze == 1: self.algo.Plot('Squeeze', 'Low SQZ', 0) elif indicator.Squeeze == 2: self.algo.Plot('Squeeze', 'Mid SQZ', 0) elif indicator.Squeeze == 3: self.algo.Plot('Squeeze', 'High SQZ', 0) def __PlotATRLevel(self, indicator): self.algo.Plot('Trade Plot', 'UpperTrigger', indicator.UpperTrigger()) self.algo.Plot('Trade Plot', 'UpperMiddle', indicator.UpperMiddle()) self.algo.Plot('Trade Plot', 'UpperATR', indicator.UpperATR()) self.algo.Plot('Trade Plot', 'UpperExtension', indicator.UpperExtension()) # self.algo.Plot('Trade Plot', 'UpperMiddleExtension', indicator.UpperMiddleExtension()) # self.algo.Plot('Trade Plot', 'Upper2ATR', indicator.Upper2ATR()) # self.algo.Plot('Trade Plot', 'Upper2ATRExtension', indicator.Upper2ATRExtension()) # self.algo.Plot('Trade Plot', 'Upper2ATRMiddleExtension', indicator.Upper2ATRMiddleExtension()) # self.algo.Plot('Trade Plot', 'Upper3ATR', indicator.Upper3ATR()) self.algo.Plot('Trade Plot', 'Close', indicator.PreviousClose()) self.algo.Plot('Trade Plot', 'LowerTrigger', indicator.LowerTrigger()) self.algo.Plot('Trade Plot', 'LowerMiddle', indicator.LowerMiddle()) self.algo.Plot('Trade Plot', 'LowerATR', indicator.LowerATR()) self.algo.Plot('Trade Plot', 'LowerExtension', indicator.LowerExtension()) # self.algo.Plot('Trade Plot', 'LowerMiddleExtension', indicator.LowerMiddleExtension()) # self.algo.Plot('Trade Plot', 'Lower2ATR', indicator.Lower2ATR()) # self.algo.Plot('Trade Plot', 'Lower2ATRExtension', indicator.Lower2ATRExtension()) # self.algo.Plot('Trade Plot', 'Lower2ATRMiddleExtension', indicator.Lower2ATRMiddleExtension()) def __OriginalPlotATRLevel(self, indicator): self.algo.Plot('ATRLevel', 'UpperTrigger', indicator.UpperTrigger()) self.algo.Plot('ATRLevel', 'UpperMiddle', indicator.UpperMiddle()) self.algo.Plot('ATRLevel', 'UpperATR', indicator.UpperATR()) # self.algo.Plot('ATRLevel', 'UpperExtension', indicator.UpperExtension()) # self.algo.Plot('ATRLevel', 'UpperMiddleExtension', indicator.UpperMiddleExtension()) # self.algo.Plot('ATRLevel', 'Upper2ATR', indicator.Upper2ATR()) # self.algo.Plot('ATRLevel', 'Upper2ATRExtension', indicator.Upper2ATRExtension()) # self.algo.Plot('ATRLevel', 'Upper2ATRMiddleExtension', indicator.Upper2ATRMiddleExtension()) # self.algo.Plot('ATRLevel', 'Upper3ATR', indicator.Upper3ATR()) self.algo.Plot('ATRLevel', 'Close', indicator.PreviousClose()) self.algo.Plot('ATRLevel', 'LowerTrigger', indicator.LowerTrigger()) self.algo.Plot('ATRLevel', 'LowerMiddle', indicator.LowerMiddle()) self.algo.Plot('ATRLevel', 'LowerATR', indicator.LowerATR()) # self.algo.Plot('ATRLevel', 'LowerExtension', indicator.LowerExtension()) # self.algo.Plot('ATRLevel', 'LowerMiddleExtension', indicator.LowerMiddleExtension()) # self.algo.Plot('ATRLevel', 'Lower2ATR', indicator.Lower2ATR()) # self.algo.Plot('ATRLevel', 'Lower2ATRExtension', indicator.Lower2ATRExtension()) # self.algo.Plot('ATRLevel', 'Lower2ATRMiddleExtension', indicator.Lower2ATRMiddleExtension()) # self.algo.Plot('ATRLevel', 'Lower3ATR', indicator.Lower3ATR()) def __PrintCash(self): ''' Prints the cash in the portfolio in a separate chart. ''' self.algo.Plot('Cash', 'Options gain', self.algo.Portfolio.Cash) def __PrintTrades(self): ''' Prints the underlying price on the trades chart. ''' self.algo.Plot('Trade Plot', 'Price', self.__UnderlyingPrice()) def __PrintBuyHold(self): ''' Simulate buy and hold the shares. We use the same number of shares as the backtest. In this situation is 100 shares + the cash of the portfolio.''' if not self.benchmarkCash: self.benchmarkCash = self.algo.Portfolio.TotalPortfolioValue - self.benchmarkShares * self.__UnderlyingPrice() self.algo.Plot("Strategy Equity", "Buy & Hold", self.benchmarkCash + self.benchmarkShares * self.__UnderlyingPrice()) def __UnderlyingPrice(self): return self.algo.Securities[self.underlying].Close def __SqueezeChart(self): ''' It creates the Squeeze chart and adds all the series to generate a similar chart to the one available on TV etc.''' sqz = Chart('Squeeze') if self.SeriesCount <= 6: sqz.AddSeries(Series('No SQZ', SeriesType.Scatter, '', Color.Green, ScatterMarkerSymbol.Circle)) sqz.AddSeries(Series('Low SQZ', SeriesType.Scatter, '', Color.Black, ScatterMarkerSymbol.Circle)) sqz.AddSeries(Series('Mid SQZ', SeriesType.Scatter, '', Color.Red, ScatterMarkerSymbol.Circle)) sqz.AddSeries(Series('High SQZ', SeriesType.Scatter, '', Color.Orange, ScatterMarkerSymbol.Circle)) self.SeriesCount += 4 # If we already have 6 series added then just add one for momentum. if self.SeriesCount >= 6: sqz.AddSeries(Series('Momentum', SeriesType.Line, '', Color.White)) self.SeriesCount += 1 else: sqz.AddSeries(Series('UP Bull MOM', SeriesType.Bar, '', Color.Aqua)) sqz.AddSeries(Series('DOWN Bull MOM', SeriesType.Bar, '', Color.Blue)) sqz.AddSeries(Series('DOWN Bear MOM', SeriesType.Bar, '', Color.Red)) sqz.AddSeries(Series('UP Bear MOM', SeriesType.Bar, '', Color.Yellow)) self.SeriesCount += 4 return sqz # @param strategies [Array] can be "Call", "Sold Call", "Straddle", etc. def __TradePlotChart(self, strategies): ''' Create a trading chart that shows the price of the underlying and the trades as triangles. Trades are grouped by strategies that are defined by the indicator.''' tradingChart = Chart('Trade Plot') # On the Trade Plotter Chart we want 3 series: trades and price: for s in strategies: # we don't have enough slots for more Series. if self.SeriesCount > 8: break tradingChart.AddSeries(Series("Sell {}".format(s), SeriesType.Scatter, '$', Color.Red, ScatterMarkerSymbol.TriangleDown)) tradingChart.AddSeries(Series("Buy {}".format(s), SeriesType.Scatter, '$', Color.Green, ScatterMarkerSymbol.Triangle)) self.SeriesCount += 2 if self.SeriesCount < 10: tradingChart.AddSeries(Series('Price', SeriesType.Line, '$', Color.White)) self.SeriesCount += 1 return tradingChart
#region imports from AlgorithmImports import * #endregion #region imports from .Benchmark import Benchmark #endregion # Your New Python File
#region imports from AlgorithmImports import * #endregion # https://github.com/QuantConnect/Lean/blob/03f56481d4baf5cd803ff3a39bdd04d2a6050058/Algorithm.Python/CustomDataSPX0dteAlgorithm.py class SPX0dte(PythonData): '''SPX0dte Custom Data Class''' def GetSource(self, config, date, isLiveMode): source = "https://www.dropbox.com/s/nzb4byxztqw8dd7/UnderlyingOptionsIntervals_1800sec_oi_2021-04-26.csv?dl=1" return SubscriptionDataSource(source, SubscriptionTransportMedium.RemoteFile) def Reader(self, config, line, date, isLiveMode): if not (line.strip() and line[4].isdigit()): return None # New SPX object index = SPX0dte() index.Symbol = config.Symbol try: # Example File Format: # 'underlying_symbol,quote_datetime,root,expiration,strike,option_type,open,high,low,close,trade_volume,bid_size,bid,ask_size,ask,underlying_bid,underlying_ask,open_interest' # 'A,2021-04-26 10:00:00,A,2021-06-18,70.000,C,0.0000,0.0000,0.0000,0.0000,0,157,66.3000,284,68.8000,137.0600,137.2000,2' # Date, Open High Low Close Volume Turnover # 2011-09-13 7792.9 7799.9 7722.65 7748.7 116534670 6107.78 data = line.split(',') index.Time = datetime.strptime(data[1], "%Y-%m-%d %H:%M:%S") index.EndTime = index.Time + timedelta(minutes=30) index.Value = data[4] index["Open"] = float(data[1]) index["High"] = float(data[2]) index["Low"] = float(data[3]) index["Close"] = float(data[4]) except ValueError: # Do nothing return None return index
#region imports from AlgorithmImports import * from .SPX0dte import SPX0dte #endregion # Your New Python File
#region imports from AlgorithmImports import * from collections import deque from scipy import stats import talib from numpy import mean, array #endregion # Use like this: # # self.spy = self.AddEquity("SPY", Resolution.Daily).Symbol # self.ATRLevels = ATRLevels("ATRLevels", length = 14) # algorithm.RegisterIndicator(self.ticker, self.ATRLevels, Resolution.Daily) # self.algorithm.WarmUpIndicator(self.ticker, self.ATRLevels, Resolution.Daily) # // Set the appropriate timeframe based on trading mode # timeframe_func() => # timeframe = "D" # if trading_type == day_trading # timeframe := "D" # else if trading_type == multiday_trading # timeframe := "W" # else if trading_type == swing_trading # timeframe := "M" # else if trading_type == position_trading # timeframe := "3M" # else # timeframe := "D" class ATRLevels(PythonIndicator): TriggerPercentage = 0.236 MiddlePercentage = 0.618 def __init__(self, name, length = 14): # default indicator definition super().__init__() self.Name = name self.Value = 0 self.Time = datetime.min # set automatic warmup period + 1 day self.WarmUpPeriod = length + 1 self.length = length self.ATR = AverageTrueRange(self.length) # Holds 2 values the current close and the previous day/period close. self.PreviousCloseQueue = deque(maxlen=2) # Indicator to hold the period close, high, low, open self.PeriodHigh = Identity('PeriodHigh') self.PeriodLow = Identity('PeriodLow') self.PeriodOpen = Identity('PeriodOpen') @property def IsReady(self) -> bool: return self.ATR.IsReady def Update(self, input) -> bool: # update all the indicators with the new data dataPoint = IndicatorDataPoint(input.Symbol, input.EndTime, input.Close) bar = TradeBar(input.Time, input.Symbol, input.Open, input.High, input.Low, input.Close, input.Volume) ## Update SMA with data time and volume # symbolSMAv.Update(tuple.Index, tuple.volume) # symbolRSI.Update(tuple.Index, tuple.close) # symbolADX.Update(bar) # symbolATR.Update(bar) # symbolSMA.Update(tuple.Index, tuple.close) self.ATR.Update(bar) self.PreviousCloseQueue.appendleft(dataPoint) self.PeriodHigh.Update(input.Time, input.High) self.PeriodLow.Update(input.Time, input.Low) self.PeriodOpen.Update(input.Time, input.Open) if self.ATR.IsReady and len(self.PreviousCloseQueue) == 2: self.Time = input.Time self.Value = self.PreviousClose().Value return self.IsReady # Returns the previous close value of the period. # @return [Float] def PreviousClose(self): if len(self.PreviousCloseQueue) == 1: return None return self.PreviousCloseQueue[0] # Bear level method. This is represented usually as a yellow line right under the close line. # @return [Float] def LowerTrigger(self): return self.PreviousClose().Value - (self.TriggerPercentage * self.ATR.Current.Value) # biggest value 1ATR # Lower Midrange level. This is under the lowerTrigger (yellow line) and above the -1ATR line(lowerATR) # @return [Float] def LowerMiddle(self): return self.PreviousClose().Value - (self.MiddlePercentage * self.ATR.Current.Value) # Lower -1ATR level. # @return [Float] def LowerATR(self): return self.PreviousClose().Value - self.ATR.Current.Value # Lower Extension level. # @return [Float] def LowerExtension(self): return self.LowerATR() - (self.TriggerPercentage * self.ATR.Current.Value) # Lower Midrange Extension level. # @return [Float] def LowerMiddleExtension(self): return self.LowerATR() - (self.MiddlePercentage * self.ATR.Current.Value) # Lower -2ATR level. # @return [Float] def Lower2ATR(self): return self.LowerATR() - self.ATR.Current.Value # Lower -2ATR Extension level. # @return [Float] def Lower2ATRExtension(self): return self.Lower2ATR() - (self.TriggerPercentage * self.ATR.Current.Value) # Lower -2ATR Midrange Extension level. # @return [Float] def Lower2ATRMiddleExtension(self): return self.Lower2ATR() - (self.MiddlePercentage * self.ATR.Current.Value) # Lower -3ATR level. # @return [Float] def Lower3ATR(self): return self.Lower2ATR() - self.ATR.Current.Value def BearLevels(self): return [ self.LowerTrigger(), self.LowerMiddle(), self.LowerATR(), self.LowerExtension(), self.LowerMiddleExtension(), self.Lower2ATR(), self.Lower2ATRExtension(), self.Lower2ATRMiddleExtension(), self.Lower3ATR() ] # Bull level method. This is represented usually as a blue line right over the close line. # @return [Float] def UpperTrigger(self): return self.PreviousClose().Value + (self.TriggerPercentage * self.ATR.Current.Value) # biggest value 1ATR # Upper Midrange level. # @return [Float] def UpperMiddle(self): return self.PreviousClose().Value + (self.MiddlePercentage * self.ATR.Current.Value) # Upper 1ATR level. # @return [Float] def UpperATR(self): return self.PreviousClose().Value + self.ATR.Current.Value # Upper Extension level. # @return [Float] def UpperExtension(self): return self.UpperATR() + (self.TriggerPercentage * self.ATR.Current.Value) # Upper Midrange Extension level. # @return [Float] def UpperMiddleExtension(self): return self.UpperATR() + (self.MiddlePercentage * self.ATR.Current.Value) # Upper 2ATR level. def Upper2ATR(self): return self.UpperATR() + self.ATR.Current.Value # Upper 2ATR Extension level. # @return [Float] def Upper2ATRExtension(self): return self.Upper2ATR() + (self.TriggerPercentage * self.ATR.Current.Value) # Upper 2ATR Midrange Extension level. # @return [Float] def Upper2ATRMiddleExtension(self): return self.Upper2ATR() + (self.MiddlePercentage * self.ATR.Current.Value) # Upper 3ATR level. # @return [Float] def Upper3ATR(self): return self.Upper2ATR() + self.ATR.Current.Value def BullLevels(self): return [ self.UpperTrigger(), self.UpperMiddle(), self.UpperATR(), self.UpperExtension(), self.UpperMiddleExtension(), self.Upper2ATR(), self.Upper2ATRExtension(), self.Upper2ATRMiddleExtension(), self.Upper3ATR() ] def NextLevel(self, LevelNumber, bull = False, bear = False): dayOpen = self.PreviousClose().Value allLevels = [dayOpen] + self.BearLevels() + self.BullLevels() allLevels = sorted(allLevels, key = lambda x: x, reverse = False) bearLs = sorted(filter(lambda x: x <= dayOpen, allLevels), reverse = True) bullLs = list(filter(lambda x: x >= dayOpen, allLevels)) if bull: return bullLs[LevelNumber] if bear: return bearLs[LevelNumber] return None def Range(self): return self.PeriodHigh.Current.Value - self.PeriodLow.Current.Value def PercentOfAtr(self): return (self.Range() / self.ATR.Current.Value) * 100 def Warmup(self, history): for index, row in history.iterrows(): self.Update(row) # Method to return a string with the bull and bear levels. # @return [String] def ToString(self): return "Bull Levels: [{}]; Bear Levels: [{}]".format(self.BullLevels(), self.BearLevels())
#region imports from AlgorithmImports import * #endregion class RipsterClouds: ma_len1 = 8 # input(title='Short EMA1 Length', defval=8) // pullback ma_len2 = 9 # input(title='Long EMA1 Length', defval=9) // pullback ma_len3 = 5 # input(title='Short EMA2 Length', defval=5) // short trend ma_len4 = 12 # input(title='Long EMA2 Length', defval=12) // short trend ma_len5 = 20 # input(title='Short EMA3 Length', defval=20) // pivot ma_len6 = 21 # input(title='Long EMA3 Length', defval=21) // pivot ma_len7 = 34 # input(title='Short EMA4 Length', defval=34) // medium trend ma_len8 = 50 # input(title='Long EMA4 Length', defval=50) // medium trend ma_len9 = 180 # input(title='Short EMA5 Length', defval=180) // long trend ma_len10 = 200 # input(title='Long EMA5 Length', defval=200) // long trend ma_offset = 0 # input(title='Offset', defval=0) # var bool showLong = input(true, title='Show Long Alerts') # var bool showShort = input(true, title='Show Short Alerts') # showLine = input(false, title='Display EMA Line') # ema1 = input(true, title='Show EMA Cloud-1') # ema2 = input(true, title='Show EMA Cloud-2') # ema3 = input(true, title='Show EMA Cloud-3') # ema4 = input(true, title='Show EMA Cloud-4') # ema5 = input(true, title='Show EMA Cloud-5') def __init__(self, algorithm, ticker, resolution): self.algorithm = algorithm self.ticker = ticker self.resolution = resolution self.htf_ma1 = self.EMA(self.ma_len1) self.htf_ma2 = self.EMA(self.ma_len2) self.htf_ma3 = self.EMA(self.ma_len3) self.htf_ma4 = self.EMA(self.ma_len4) self.htf_ma5 = self.EMA(self.ma_len5) self.htf_ma6 = self.EMA(self.ma_len6) self.htf_ma7 = self.EMA(self.ma_len7) self.htf_ma8 = self.EMA(self.ma_len8) self.htf_ma9 = self.EMA(self.ma_len9) self.htf_ma10 = self.EMA(self.ma_len10) def EMA(self, malen): # src = input(title='Source', defval=hl2) return self.algorithm.EMA(self.ticker, malen, self.resolution, (Field.High + Field.Low) / 2) # no ideal if the hl/2 thing works!! # emacloudleading = input.int(0, minval=0, title='Leading Period For EMA Cloud') # mashort1 = htf_ma1 # malong1 = htf_ma2 # mashort2 = htf_ma3 # malong2 = htf_ma4 # mashort3 = htf_ma5 # malong3 = htf_ma6 # mashort4 = htf_ma7 # malong4 = htf_ma8 # mashort5 = htf_ma9 # malong5 = htf_ma10 # var color GREEN_COLOR = #0DCC0E # var color RED_COLOR = #F50202 # var color GREY_COLOR = #585C60 # var color BLUE_COLOR = #0097A7 # var color ORANGE_COLOR = #F57F17 # cloudcolour1 = mashort1 >= malong1 ? color.new(GREEN_COLOR, 70) : color.new(RED_COLOR, 70) # cloudcolour2 = mashort2 >= malong2 ? color.new(GREEN_COLOR, 65) : color.new(RED_COLOR, 65) # cloudcolour3 = mashort3 >= malong3 ? color.new(GREY_COLOR, 70) : color.new(GREY_COLOR, 70) # cloudcolour4 = mashort4 >= malong4 ? color.new(BLUE_COLOR, 65) : color.new(ORANGE_COLOR, 65) # cloudcolour5 = mashort5 >= malong5 ? color.new(#05bed5, 65) : color.new(#e65100, 65) # //03abc1 # mashortcolor1 = mashort1 >= mashort1[1] ? color.olive : color.maroon # mashortcolor2 = mashort2 >= mashort2[1] ? color.olive : color.maroon # mashortcolor3 = mashort3 >= mashort3[1] ? color.olive : color.maroon # mashortcolor4 = mashort4 >= mashort4[1] ? color.olive : color.maroon # mashortcolor5 = mashort5 >= mashort5[1] ? color.olive : color.maroon # mashortline1 = plot(ema1 ? mashort1 : na, color=showLine ? mashortcolor1 : na, linewidth=1, offset=emacloudleading, title='Short Leading EMA1') # mashortline2 = plot(ema2 ? mashort2 : na, color=showLine ? mashortcolor2 : na, linewidth=1, offset=emacloudleading, title='Short Leading EMA2') # mashortline3 = plot(ema3 ? mashort3 : na, color=showLine ? mashortcolor3 : na, linewidth=1, offset=emacloudleading, title='Short Leading EMA3') # mashortline4 = plot(ema4 ? mashort4 : na, color=showLine ? mashortcolor4 : na, linewidth=1, offset=emacloudleading, title='Short Leading EMA4') # mashortline5 = plot(ema5 ? mashort5 : na, color=showLine ? mashortcolor5 : na, linewidth=1, offset=emacloudleading, title='Short Leading EMA5') # malongcolor1 = malong1 >= malong1[1] ? color.green : color.red # malongcolor2 = malong2 >= malong2[1] ? color.green : color.red # malongcolor3 = malong3 >= malong3[1] ? color.green : color.red # malongcolor4 = malong4 >= malong4[1] ? color.green : color.red # malongcolor5 = malong5 >= malong5[1] ? color.green : color.red # malongline1 = plot(ema1 ? malong1 : na, color=showLine ? malongcolor1 : na, linewidth=3, offset=emacloudleading, title='Long Leading EMA1') # malongline2 = plot(ema2 ? malong2 : na, color=showLine ? malongcolor2 : na, linewidth=3, offset=emacloudleading, title='Long Leading EMA2') # malongline3 = plot(ema3 ? malong3 : na, color=showLine ? malongcolor3 : na, linewidth=3, offset=emacloudleading, title='Long Leading EMA3') # malongline4 = plot(ema4 ? malong4 : na, color=showLine ? malongcolor4 : na, linewidth=3, offset=emacloudleading, title='Long Leading EMA4') # malongline5 = plot(ema5 ? malong5 : na, color=showLine ? malongcolor5 : na, linewidth=3, offset=emacloudleading, title='Long Leading EMA5') # fill(mashortline1, malongline1, color=cloudcolour1, title='MA Cloud1') # fill(mashortline2, malongline2, color=cloudcolour2, title='MA Cloud2') # fill(mashortline3, malongline3, color=cloudcolour3, title='MA Cloud3') # fill(mashortline4, malongline4, color=cloudcolour4, title='MA Cloud4') # fill(mashortline5, malongline5, color=cloudcolour5, title='MA Cloud5') # // store variables for selling and buying. # start = false // trigger for showing the alerts # var state = false # // this is a permanent signal. # var buy_signal = false # shortTrend = math.min(mashort1, malong1) # mediumTrend = math.max(mashort4, malong4) # // figure out a way to add an alert after each day so like a reset. # bool newDay = ta.change(time("D")) # var bool highInterval = timeframe.ismonthly or timeframe.isweekly or timeframe.isdaily # if newDay and not highInterval # start := true # // bullish # if mashort1 >= malong1 and shortTrend >= mediumTrend # buy_signal := true # if state == false # // start hedge # state := true # start := true # start # else if shortTrend < mediumTrend # buy_signal := false # if state == true # state := false # start := true # start # _buying_trigger = state and start # _selling_trigger = state == false and start # // Add pins for buying and selling. # plotshape(showLong and _buying_trigger, style=shape.triangleup, size=size.normal, color=color.new(color.green, 0), text='Go Long', title='Go Long', location=location.belowbar) # plotshape(showShort and _selling_trigger, style=shape.triangledown, size=size.normal, color=color.new(color.red, 0), text='Go Short', title='Go Short') # alertcondition(_buying_trigger, title='Go Long', message='Go Long') # alertcondition(_selling_trigger, title='Go Short', message='Go Short') # // Turn these on for strategy tester when we can do hedging with pine script # // strategy.entry(id="EL", direction=strategy.long, when=_buying_trigger and strategy.position_size == 0) # // strategy.close(id="EL", when=_selling_trigger and strategy.position_size != 0)
#region imports from AlgorithmImports import * from collections import deque from scipy import stats import talib from numpy import mean, array #endregion # Good indicator template: https://www.quantconnect.com/forum/discussion/12691/python-indicator-template/p1 # I did not use it here but it should derive from. # Use like this: # # self.spy = self.AddEquity("SPY", Resolution.Daily).Symbol # NOT SURE WHERE TO PUT THE EXTRA PARAMETERS LIKE SECURITY AND ALGORITHM§ # 1. in this first try i'm sending the algorithm when creating it. # self.squeeze = TTMSqueezePro("squeeze", 21) # self.RegisterIndicator(self.spy, self.squeeze, Resolution.Daily) # history = self.History(self.spy, 21, Resolution.Daily) # self.squeeze.Warmup(history) # TODO: there is a problem with the values being one day late. I'm not sure if it's because of how we close it but it might be the daily period. class TTMSqueezePro(PythonIndicator): Squeeze = 0 # like value this is the second part of the indicator. # //BOLLINGER BANDS BB_mult = 2.0 # input.float(2.0, "Bollinger Band STD Multiplier") # //KELTNER CHANNELS KC_mult_high = 1.0 # input.float(1.0, "Keltner Channel #1") KC_mult_mid = 1.5 # input.float(1.5, "Keltner Channel #2") KC_mult_low = 2.0 # input.float(2.0, "Keltner Channel #3") SQZ_COLORS = ['green', 'black', 'red', 'orange'] # according to this example we don't need to pass security and algo as that # will be passed with RegisterIndicator. # https://github.com/QuantConnect/Lean/blob/b9d3d999170f385e2371fc3fef1823f2ddd4c83b/Algorithm.Python/CustomWarmUpPeriodIndicatorAlgorithm.py def __init__(self, name, length = 20): # default indicator definition super().__init__() self.Name = name self.Value = 0 self.Time = datetime.min self.Squeeze = 0 # set automatic warmup period self.WarmUpPeriod = length * 3 self.length = length self.queue = deque(maxlen=self.length) self.queueMean = deque(maxlen=self.length) self.queueSqz = deque(maxlen=self.length) # define the base indicators to use self.BB = BollingerBands(self.length, self.BB_mult) self.KCHigh = KeltnerChannels(self.length, self.KC_mult_high) self.KCMid = KeltnerChannels(self.length, self.KC_mult_mid) self.KCLow = KeltnerChannels(self.length, self.KC_mult_low) self.MAX = Maximum(self.length) self.MIN = Minimum(self.length) self.SMA = SimpleMovingAverage('SMA', self.length) @property def IsReady(self) -> bool: # it's ready when: # - we have enough data to calculate the momentum oscilator value # - we have enough data to calculate the squeeze data return (len(self.queueMean) >= self.length) and self.BB.IsReady and self.KCHigh.IsReady and self.KCMid.IsReady and self.KCLow.IsReady def ManualUpdate(self, input) -> bool: self.Update(input) if self.IsReady: self.Current = IndicatorDataPoint(input.Symbol, input.Time, self.Value) return self.IsReady def Update(self, input) -> bool: # update all the indicators with the new data dataPoint = IndicatorDataPoint(input.Symbol, input.Time, input.Close) self.BB.Update(dataPoint) self.SMA.Update(dataPoint) # Feed into the Max and Min indicators the highest and lowest values only self.MAX.Update(IndicatorDataPoint(input.Symbol, input.Time, input.High)) self.MIN.Update(IndicatorDataPoint(input.Symbol, input.Time, input.Low)) # Keltner channels indicators self.KCHigh.Update(input) self.KCMid.Update(input) self.KCLow.Update(input) # Calculate the mom oscillator only after we get the proper amount of values for the array. if len(self.queueMean) >= self.length: self.Time = input.Time self.Value = self.MomOscillator() # self.Current = IndicatorDataPoint(input.Symbol, input.Time, self.Value) # self.OnUpdated(self.Current) data = IndicatorDataPoint(input.Symbol, input.Time, self.Value) if len(self.queue) > 0 and data.Time == self.queue[0].Time: self.queue[0] = data else: self.queue.appendleft(data) # calculate momentum oscilator. if self.MAX.IsReady and self.MIN.IsReady and self.SMA.IsReady: data = IndicatorDataPoint(input.Symbol, input.Time, self.MeanPrice(input.Close)) if len(self.queueMean) > 0 and data.Time == self.queueMean[0].Time: self.queueMean[0] = data else: self.queueMean.appendleft(data) # Add the value and sqz status to a queue so we can check later if # we switched status recently. if self.BB.IsReady and self.KCHigh.IsReady and self.KCMid.IsReady and self.KCLow.IsReady: self.Squeeze = self.SqueezeValue() data = IndicatorDataPoint(input.Symbol, input.Time, self.Squeeze) if len(self.queueSqz) > 0 and data.Time == self.queueSqz[0].Time: self.queueSqz[0] = data else: self.queueSqz.appendleft(data) return self.IsReady def KC_basis(self): return self.sma.Current.Value def BB_basis(self): # return self.sma.Current.Value return self.BB.MiddleBand.Current.Value def BB_upper(self): return self.BB.UpperBand.Current.Value def BB_lower(self): return self.BB.LowerBand.Current.Value def KC_upper_high(self): return self.KCHigh.UpperBand.Current.Value def KC_lower_high(self): return self.KCHigh.LowerBand.Current.Value def KC_upper_mid(self): return self.KCMid.UpperBand.Current.Value def KC_lower_mid(self): return self.KCMid.LowerBand.Current.Value def KC_upper_low(self): return self.KCLow.UpperBand.Current.Value def KC_lower_low(self): return self.KCLow.LowerBand.Current.Value # //SQUEEZE CONDITIONS def NoSqz(self): return self.BB_lower() < self.KC_lower_low() or self.BB_upper() > self.KC_upper_low() # NO SQUEEZE: GREEN def LowSqz(self): return self.BB_lower() >= self.KC_lower_low() or self.BB_upper() <= self.KC_upper_low() # LOW COMPRESSION: BLACK def MidSqz(self): return self.BB_lower() >= self.KC_lower_mid() or self.BB_upper() <= self.KC_upper_mid() # MID COMPRESSION: RED def HighSqz(self): return self.BB_lower() >= self.KC_lower_high() or self.BB_upper() <= self.KC_upper_high() # HIGH COMPRESSION: ORANGE # //SQUEEZE DOTS COLOR # sq_color = HighSqz ? color.new(color.orange, 0) : MidSqz ? color.new(color.red, 0) : LowSqz ? color.new(color.black, 0) : color.new(color.green, 0) def SqueezeColor(self): if self.HighSqz(): return 'orange' elif self.MidSqz(): return 'red' elif self.LowSqz(): return 'black' else: return 'green' def SqueezeValue(self): return self.SQZ_COLORS.index(self.SqueezeColor()) def MomentumHistogramColor(self): bullish = Color.Aqua if self.queue[0].Value > self.queue[1].Value else Color.Blue bearish = Color.Red if self.queue[0].Value < self.queue[1].Value else Color.Yellow return bullish if self.Bullish() else bearish def Bullish(self): return self.queue[0].Value > 0 def Bearish(self): return self.queue[0].Value <= 0 # This calculates the mean price value that we'll add to a series type/array and we'll use to # calculate the momentum oscilator (aka linear regression) def MeanPrice(self, price): return price - mean([mean([self.MAX.Current.Value, self.MIN.Current.Value]), self.SMA.Current.Value]) def LosingMomentum(self, maxDays = 2): # reverse the momentum values slicedQueue = list(self.queue)[:maxDays] # if absolute then we also check the momentum `color` # go over the reversed values and make sure they are decreasing return all(earlier.Value > later.Value for later, earlier in zip(slicedQueue, slicedQueue[1:])) def GainingMomentum(self, maxDays = 2): # reverse the momentum values slicedQueue = list(self.queue)[:maxDays] # if absolute then we also check the momentum `color` # go over the reversed values and make sure they are increasing return all(earlier.Value < later.Value for later, earlier in zip(slicedQueue, slicedQueue[1:])) # It's squeezing if the colors are different than green. def Squeezing(self): current = self.queueSqz[0] return current.Value != self.SQZ_COLORS.index('green') def SqueezeChange(self, toColor = 'green'): earlier = self.queueSqz[1] last = self.queueSqz[0] colorIndex = self.SQZ_COLORS.index(toColor) return last.Value == colorIndex and earlier.Value != last.Value def SqueezeDuration(self, over = 2): # pick last `over` days but today/current value slicedQueue = list(self.queueSqz)[1:over+1] colorIndex = self.SQZ_COLORS.index('green') # go over the reversed values and make sure they are increasing return all(val.Value != colorIndex for val in slicedQueue) def MomOscillator(self): ''' //MOMENTUM OSCILLATOR mom = ta.linreg(close - math.avg(math.avg( ta.highest(high, length), ta.lowest(low, length) ), ta.sma(close, length) ), length, 0) https://www.quantconnect.com/forum/discussion/10168/least-squares-linear-regression/p1/comment-28627 https://www.tradingview.com/pine-script-reference/v4/#fun_linreg // linreg = intercept + slope * (length - 1 - offset) // where length is the y argument, // offset is the z argument, // intercept and slope are the values calculated with the least squares method on source series (x argument). -> linreg(source, length, offset) → series[float] ''' # x = [range(len(self.queueMean))] # y = self.queueMean # slope, intercept = stats.linregress(x, y)[0], stats.linregress(x, y)[1] # linreg = intercept + slope * (self.length - 1) # we need to reverse the queue in order to get the most recent regression series = array([m.Value for m in reversed(self.queueMean)]) size = len(series) # considering we are not taking a shorter regression value we are going to get the last value # of the returned array as that is where the linar regression value sits the rest are `nan` linreg = talib.LINEARREG(series, size)[size - 1] return linreg def Warmup(self, history): for index, row in history.iterrows(): self.Update(row)
#region imports from AlgorithmImports import * from .RipsterClouds import RipsterClouds from .TTMSqueezePro import TTMSqueezePro from .ATRLevels import ATRLevels #endregion # Your New Python File
#region imports from AlgorithmImports import * #endregion class MarketHours: def __init__(self, algorithm, symbol): self.algorithm = algorithm self.hours = algorithm.Securities[symbol].Exchange.Hours def get_CurrentOpen(self): return self.hours.GetNextMarketOpen(self.algorithm.Time, False) def get_CurrentClose(self): return self.hours.GetNextMarketClose(self.get_CurrentOpen(), False)
#region imports from AlgorithmImports import * from itertools import groupby from QuantConnect.Logging import * from MarketHours import MarketHours from PortfolioHandler import Handler from PortfolioHandler.OStrategies import Straddle from Tools import ExpirationRange, MainLogger, TradeCredit #endregion # TODO: # - force it to work once an hour # - force it to work every 5 minutes # - force it to work once a day # - check if LEAP is properly rolled so it does not pick a too close one. Ideal is 365 days DTE! # - try to roll for a credit including the LEAP value # - when monitoring not just close the position also roll and leave the scanner just for opening positions. (not expected to cause any results if we include credit checks) # - # TODO: # Replicate the result from this spreadsheet: https://docs.google.com/spreadsheets/d/1-Pr-_arX_mdM2O_oDMHnhFZNRBti4N07/edit#gid=2038006255 # Details of best result # Run Seq. Run Date Code Version Portfolio Return CAGR Percent Change Max Drawdown (%) Max Drawdown ($) Opening Trades Winners Losers Avg Gain (Winners) Avg Loss (Losers) Geometric Sharpe Ratio Baseline # v2.0038 8/25/2020 2.1 962.83% 32.81% 91.54% -36.35% -$282,701.00 322 211 111 $13,975.38 -$17,891.65 0.955 Loss # Environmental Backtest Settings Sequence Parameters # Start Date End Date Initial Cash Fill Orders Close Orders Allocation Straddle Quantity # 4/2/2012 7/31/2020 $100,000 Mid Price Mid Price 0.50 0 # DTE Threshold Price Threshold Open DTE Min Open DTE Max Price Var Min Price Var Max # 35 0.065 335 425 -$0.50 $0.50 # DTE Threshold Loss Limit Threshold Price Call Threshold Price Put Threshold Open DTE Min Open DTE Max DTE Max Expansion Open DTE Multiplier Price Var Min Price Var Max Price Markup # 1 -3.25 0.050 0.050 5 15 30 1.05 -$0.50 $0.50 $0.10 # FINAL FORM: seems like short rolling works great it's the LEAP that gives too little profit (is that the way it needs to be??!) # THIS DOES NOT SEEM TO WORK!!! # https://www.quantconnect.com/forum/discussion/10619/options-margin-calculation-cash-covered-puts/p1 class CustomBuyingPowerModel(BuyingPowerModel): # def GetMaximumOrderQuantityForTargetBuyingPower(self, parameters): # quantity = super().GetMaximumOrderQuantityForTargetBuyingPower(parameters).Quantity # quantity = np.floor(quantity / 100) * 100 # return GetMaximumOrderQuantityResult(quantity) def HasSufficientBuyingPowerForOrder(self, parameters): return HasSufficientBuyingPowerForOrderResult(True) class MilkTheCowAlphaModel(AlphaModel): algorithm = None def __init__(self, algorithm, ticker): self.ticker = ticker self.algorithm = algorithm self.symbol = algorithm.AddEquity(self.ticker) self.marketHours = MarketHours(algorithm, self.ticker) self.portfolio = Handler(self.algorithm) # self.symbol.SetBuyingPowerModel(CustomBuyingPowerModel()) # TEST default expirations: # self.LeapExpiration = ExpirationRange(335, 425) # self.ShortExpiration = ExpirationRange(5, 15) # TEST rolling expirations: # self.LeapExpiration = ExpirationRange(335, 425) # self.ShortExpiration = ExpirationRange(5, 30) self.LeapExpiration = ExpirationRange(335, 425) self.ShortExpiration = ExpirationRange(5, 30) self.Log = MainLogger(self.algorithm) self.Credit = TradeCredit() def Update(self, algorithm, data): insights = [] if algorithm.IsWarmingUp: return insights if self.ticker not in data.Keys: return insights self.algorithm.benchmark.PrintBenchmark() if self.marketHours.get_CurrentClose().hour - 2 > self.algorithm.Time.hour > self.marketHours.get_CurrentOpen().hour + 2: if self.algorithm.Time.minute == 30: insights.extend(self.Monitor(data)) if self.algorithm.Time.minute == 5: insights.extend(self.Scanner(data)) # if self.algorithm.Time.hour > self.marketHours.get_CurrentOpen().hour + 4 and self.algorithm.Time.minute == 15: # insights.extend(self.Monitor(data)) # if 11 > self.algorithm.Time.hour >= 10 and (self.algorithm.Time.minute == 15 or self.algorithm.Time.minute == 30 or self.algorithm.Time.minute == 45): # insights.extend(self.Scanner(data)) return Insight.Group(insights) def Monitor(self, data): insights = [] stockPrice = self.algorithm.Securities[self.ticker].Price longStraddles = self.portfolio.Straddles(self.ticker, expiration = self.LeapExpiration.ToArr(), short = False) shortStraddles = self.portfolio.Straddles(self.ticker, expiration = self.ShortExpiration.ToArr(), short = True) #### EXIT RULES # Exiting the strategy is defined as closing both straddles and is triggered by one of two conditions. # - Long LEAP Straddle DTE < 335 # - Short Straddle indicates over a 325% loss; anything less should just be adjusted, see below. if len(longStraddles) > 0 and len(shortStraddles) > 0: longStraddle = longStraddles[0] shortStraddle = shortStraddles[0] close = False # if longStraddle.ExpiresIn() < 335: # close = True # self.Log.Add("EXIT - longStraddle {}: expires in {} < 335".format(longStraddle.ToString(), longStraddle.ExpiresIn() )) if shortStraddle.UnrealizedProfit() <= -325: close = True self.Log.Add("EXIT - shortStraddle {}: unrealized profit {} <= -335".format(shortStraddle.ToString(), shortStraddle.UnrealizedProfit() )) if close: # I'm selling the leap self.Credit.Sell(leap = longStraddle.AskPrice()) # And buying the short self.Credit.Buy(short = shortStraddle.AskPrice()) self.Log.Add("EXIT - with credit {} / leap(${}) short(${})".format(self.Credit.ToString(), longStraddle.AskPrice(), shortStraddle.AskPrice())) return [ Insight.Price(longStraddle.Call.Symbol, Resolution.Minute, 15, InsightDirection.Flat), Insight.Price(longStraddle.Put.Symbol, Resolution.Minute, 15, InsightDirection.Flat), Insight.Price(shortStraddle.Call.Symbol, Resolution.Minute, 15, InsightDirection.Flat), Insight.Price(shortStraddle.Put.Symbol, Resolution.Minute, 15, InsightDirection.Flat) ] self.Log.Print() #### LONG LEAP STRADDLE # ##### Standard Roll Triggers: # - Less than 335 days expiration (DTE). # - When underlying price moves more than 6.5% from strike price. # - When profit on LEAP straddle is > 0.5% # __Note: IV Opportunity Rolling no longer recommended.__ # __When rolling, select strikes ATM and DTE = 365 +/- 30 (335 – 395).__ if len(longStraddles) > 0: longStraddle = longStraddles[0] close = False if longStraddle.ExpiresIn() < 335: close = True self.Log.Add("ROLL LEAP - longStraddle {}: expires in {} < 335".format(longStraddle.ToString(), longStraddle.ExpiresIn() )) if (longStraddle.Strike() / 1.065) > stockPrice or stockPrice > (longStraddle.Strike() * 1.065): close = True self.Log.Add("ROLL LEAP - longStraddle {0}: strike / 1.065({1}) > {2} or {2} > strike * 1.065({3})".format(longStraddle.ToString(), (longStraddle.Strike() / 1.065), stockPrice, (longStraddle.Strike() * 1.065))) # if longStraddle.UnrealizedProfit() > 0.5: # close = True # self.Log.Add("ROLL LEAP - longStraddle {}: unrealized profit {} > 0.5%".format(longStraddle.ToString(), longStraddle.UnrealizedProfit() )) if close: # I'm selling the leap self.Credit.Sell(leap = longStraddle.AskPrice()) self.Log.Add("ROLL LEAP - sell trigger with credit {} / leap(${})".format(self.Credit.ToString(), longStraddle.AskPrice())) insights.extend( [ Insight.Price(longStraddle.Call.Symbol, Resolution.Minute, 15, InsightDirection.Flat), Insight.Price(longStraddle.Put.Symbol, Resolution.Minute, 15, InsightDirection.Flat) ] ) self.Log.Print() # #### SHORT STRADDLE # ##### Standard Roll Triggers: # - 1 day to expiration (DTE). # - When underlying price moves more than 5% from strike price. # - 2-5 days to expiration (DTE) AND spot price is $1.00 or less away from the strike price. # - When profit on short straddle is > 20% # __When rolling, select strikes ATM and select the minimum DTE to yield a net credit, ideally 7 DTE, but with a maximum DTE of 30.__ # > - LEAP: Roll when the underlying price moves more than 6.5% from strike price. # > - When rolling, select strikes ATM and DTE 335-425 days out. # > - Short: Roll 1 day prior to expiration or when underlying price moves more than 5% from the strike price. # > - When rolling select strikes ATM and select the minimum DTE to yield a net credit (max of 30 DTE). if len(shortStraddles) > 0: shortStraddle = shortStraddles[0] close = False if shortStraddle.ExpiresIn() <= 1: close = True self.Log.Add("ROLL SHORT - shortStraddle {}: expires in {} <= 1".format(shortStraddle.ToString(), shortStraddle.ExpiresIn() )) if (shortStraddle.Strike() / 1.05) > stockPrice or stockPrice > (shortStraddle.Strike() * 1.05): close = True self.Log.Add("ROLL SHORT - shortStraddle {0}: strike / 1.05({1}) > {2} or {2} > strike * 1.05({3})".format(shortStraddle.ToString(), (shortStraddle.Strike() / 1.05), stockPrice, (shortStraddle.Strike() * 1.05))) if 2 <= shortStraddle.ExpiresIn() <= 5 and ((shortStraddle.Strike() - 1) > stockPrice or stockPrice > (shortStraddle.Strike() + 1)): close = True self.Log.Add("ROLL SHORT - shortStraddle {0}: strike - 1 ({1}) > {2} or {2} > strike + 1({3})".format(shortStraddle.ToString(), (shortStraddle.Strike() - 1), stockPrice, (shortStraddle.Strike() + 1))) if shortStraddle.UnrealizedProfit() > 20: close = True self.Log.Add("ROLL SHORT - shortStraddle {}: unrealized profit {} > 20%".format(shortStraddle.ToString(), shortStraddle.UnrealizedProfit() )) if close: # And buying the short self.Credit.Buy(short = shortStraddle.AskPrice()) self.Log.Add("ROLL SHORT - buy trigger with credit {} / short(${})".format(self.Credit.ToString(), shortStraddle.AskPrice())) insights.extend( [ Insight.Price(shortStraddle.Call.Symbol, Resolution.Minute, 15, InsightDirection.Flat), Insight.Price(shortStraddle.Put.Symbol, Resolution.Minute, 15, InsightDirection.Flat) ] ) self.Log.Print() return insights def Scanner(self, data): insights = [] lenShortStraddle = len(self.portfolio.Straddles(self.ticker, expiration = [0, self.ShortExpiration.Stop], short = True)) lenLeapStraddle = len(self.portfolio.Straddles(self.ticker, expiration = self.LeapExpiration.ToArr(), short = False)) # SCAN LEAP # - if no LEAP if lenLeapStraddle == 0: longStraddle = self.__FindStraddle(data, expiration = self.LeapExpiration, idealDTE=365) self.Log.Add("SCANNER LEAP - Trying to find a LEAP") if longStraddle: self.Credit.Buy(leap = longStraddle.AskPrice()) self.Log.Add("SCANNER LEAP - buying LEAP {} with credit {} / ${}".format(longStraddle.ToString(), self.Credit.ToString(), longStraddle.AskPrice()) ) insights.extend([ Insight.Price(longStraddle.Call, Resolution.Minute, 15, InsightDirection.Up), Insight.Price(longStraddle.Put, Resolution.Minute, 15, InsightDirection.Up), ]) self.Log.Print() # SCAN SHORT # - if LEAP and no SHORT # First open the leapStraddle and after that the shortStraddle! if lenShortStraddle == 0 and lenLeapStraddle > 0: shortStraddle = self.__FindStraddle(data, expiration = self.ShortExpiration, minCredit = self.Credit.LastShort * 1.3) self.Log.Add("SCANNER SHORT - Trying to find a SHORT") if shortStraddle: self.Credit.Sell(short = shortStraddle.AskPrice()) self.Log.Add("SCANNER SHORT - selling SHORT {} with credit {} / ${}".format( shortStraddle.ToString(), self.Credit.ToString(), shortStraddle.AskPrice() ) ) insights.extend([ Insight.Price(shortStraddle.Call, Resolution.Minute, 15, InsightDirection.Down), Insight.Price(shortStraddle.Put, Resolution.Minute, 15, InsightDirection.Down), ]) self.Log.Print() return insights def __FindStraddle(self, data, expiration, minCredit = 0.0, idealDTE = None): put = None call = None stockPrice = self.algorithm.Securities[self.ticker].Price contracts = self.algorithm.OptionChainProvider.GetOptionContractList(self.ticker, self.algorithm.Time.date()) if len(contracts) == 0 : return None # # only tradable contracts # # !!IMPORTANT!!: to escape the error `Backtest Handled Error: The security with symbol 'SPY 220216P00425000' is marked as non-tradable.` contracts = [x for x in contracts if self.algorithm.Securities[x.ID.Symbol].IsTradable] contracts = [i for i in contracts if expiration.Start <= (i.ID.Date.date() - self.algorithm.Time.date()).days <= expiration.Stop] if not contracts: return None contracts = sorted(contracts, key = lambda x: x.ID.Date, reverse = False) # Pick all the ATM contracts per day ATMcontracts = [] for expiry, g in groupby(contracts, lambda x: x.ID.Date): # Sort by strike as grouping without sorting does not work. dateGroup = sorted(list(g), key = lambda x: x.ID.StrikePrice, reverse = False) strikeGroup = [] for strike, sg in groupby(dateGroup, lambda x: x.ID.StrikePrice): sg = list(sg) # only select groups that have 2 contracts if len(sg) == 2: # assign if (doing basically a min/sorting function): # - no group is added in the strike group # - previous strike price - stock price difference is bigger than current strike price - stock price difference if len(strikeGroup) == 0 or (abs(stockPrice - strikeGroup[0].ID.StrikePrice) > abs(stockPrice - strike)): strikeGroup = sg GroupPut = next(filter(lambda option: option.ID.OptionRight == OptionRight.Put, strikeGroup), None) GroupCall = next(filter(lambda option: option.ID.OptionRight == OptionRight.Call, strikeGroup), None) ATMcontracts.extend([GroupCall, GroupPut]) contracts = ATMcontracts # add all the option contracts so we can access all the data. for c in contracts: self.algorithm.AddOptionContract(c, Resolution.Minute) # WARNING!! we have to select AskPrice like this in a separate list because otherwise it seems like the filtering might be too fast or the pointers are messed # up in python that by adding the option contract right above this could cause issues where AskPrice might be 0!! puts = [[self.algorithm.Securities[c.Value].AskPrice, c] for c in contracts if c.ID.OptionRight == OptionRight.Put] put = min(puts, key=lambda x: abs(x[0] - minCredit / 2))[1] # only select calls that have the same date as our put above for extra insurance that it's a straddle. calls = [[self.algorithm.Securities[c.Value].AskPrice, c] for c in contracts if c.ID.OptionRight == OptionRight.Call and c.ID.Date == put.ID.Date] call = min(calls, key=lambda x: abs(x[0] - minCredit / 2))[1] # if we have an ideal DTE defined then try and compare that with the one selected if idealDTE != None and self.__ExpiresIn(put) < idealDTE : idealPut = min(puts, key=lambda x: abs((x[1].ID.Date.date() - self.algorithm.Time.date()).days - idealDTE))[1] idealCall = [c for c in contracts if c.ID.Date == idealPut.ID.Date and c.ID.OptionRight == OptionRight.Call][0] # minPremium = self.algorithm.Securities[call.Value].AskPrice + self.algorithm.Securities[put.Value].AskPrice # idealPremium = self.algorithm.Securities[idealCall.Value].AskPrice + self.algorithm.Securities[idealPut.Value].AskPrice call = idealCall put = idealPut # self.algorithm.Securities[put].SetBuyingPowerModel(CustomBuyingPowerModel()) # self.algorithm.Securities[call].SetBuyingPowerModel(CustomBuyingPowerModel()) if not put or not call: return None if put.ID.StrikePrice != call.ID.StrikePrice: return None return Straddle(put, call) # Method that returns a boolean if the security expires in the given days # @param security [Security] the option contract def __ExpiresIn(self, security): return (security.ID.Date.date() - self.algorithm.Time.date()).days def __SignalDeltaPercent(self): return (self.indicators['MACD'].Current.Value - self.indicators['MACD'].Signal.Current.Value) / self.indicators['MACD'].Fast.Current.Value
# QUANTCONNECT.COM - Democratizing Finance, Empowering Individuals. # Lean Algorithmic Trading Engine v2.0. Copyright 2014 QuantConnect Corporation. # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. # You may obtain a copy of the License at http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. from AlgorithmImports import * from Selection.OptionUniverseSelectionModel import OptionUniverseSelectionModel class MilkTheCowOptionSelectionModel(OptionUniverseSelectionModel): '''Creates option chain universes that select only the two week earliest expiry call contract and runs a user defined optionChainSymbolSelector every day to enable choosing different option chains''' def __init__(self, select_option_chain_symbols, expirationRange = [7, 30]): super().__init__(timedelta(1), select_option_chain_symbols) self.expirationRange = expirationRange def Filter(self, filter): '''Defines the option chain universe filter''' return (filter.Strikes(-2, +2) .Expiration(self.expirationRange[0], self.expirationRange[1]) .IncludeWeeklys() .OnlyApplyFilterAtMarketOpen())
from AlgorithmImports import * class OptionsSpreadExecution(ExecutionModel): '''Execution model that submits orders while the current spread is tight. Note this execution model will not work using Resolution.Daily since Exchange.ExchangeOpen will be false, suggested resolution is Minute ''' def __init__(self, acceptingSpreadPercent=0.005): '''Initializes a new instance of the SpreadExecutionModel class''' self.targetsCollection = PortfolioTargetCollection() # Gets or sets the maximum spread compare to current price in percentage. self.acceptingSpreadPercent = Math.Abs(acceptingSpreadPercent) self.executionTimeThreshold = timedelta(minutes = 10) self.openExecutedOrders = {} def Execute(self, algorithm, targets): '''Executes market orders if the spread percentage to price is in desirable range. Args: algorithm: The algorithm instance targets: The portfolio targets''' # update the complete set of portfolio targets with the new targets self.UniqueTargetsByStrategy(algorithm, targets) self.ResetExecutedOrders(algorithm) # for performance we check count value, OrderByMarginImpact and ClearFulfilled are expensive to call if self.targetsCollection.Count > 0: for target in self.targetsCollection.OrderByMarginImpact(algorithm): symbol = target.Symbol if not self.TimeToProcess(algorithm, symbol): continue # calculate remaining quantity to be ordered unorderedQuantity = OrderSizing.GetUnorderedQuantity(algorithm, target) # check order entry conditions if unorderedQuantity != 0: # get security information security = algorithm.Securities[symbol] # TODO: check this against spreads.!! # if we are selling or buying an option then pick a favorable price # if we are trying to get out of the trade then execute at market price # if (target.Quantity != 0 and self.SpreadIsFavorable(security)) or target.Quantity == 0: stockPrice = security.Underlying.Price algorithm.MarketOrder(symbol, unorderedQuantity, tag = "Current stock price {0}".format(stockPrice)) self.openExecutedOrders[symbol.Value] = algorithm.Time self.targetsCollection.ClearFulfilled(algorithm) def TimeToProcess(self, algorithm, symbol): # if we executed the market order less than the executionTimeThreshold then skip key = symbol.Value openOrders = algorithm.Transactions.GetOpenOrders(symbol) if key in self.openExecutedOrders.keys(): if (self.openExecutedOrders[key] + self.executionTimeThreshold) > algorithm.Time: return False else: # cancel existing open orders for symbol and try again algorithm.Transactions.CancelOpenOrders(key, "The order did not fill in the expected threshold.") return True else: # Order was never processed for this Symbol. return True def ResetExecutedOrders(self, algorithm): # attempt to clear targets that have been filled later self.targetsCollection.ClearFulfilled(algorithm) # reset openExecutedOrders if no targets present if self.targetsCollection.Count == 0: self.openExecutedOrders = {} # TODO: improve this for insight Groups so we can do spreads def UniqueTargetsByStrategy(self, algorithm, targets): # check newly added targets for similar option strategies that have not been filled for target in targets: symbol = target.Symbol targetSecurity = algorithm.Securities[symbol] # TODO this does not work when we are trying to roll leap straddles and at the same time sell short straddles as they have the same quantity. if symbol.SecurityType == SecurityType.Option: # if an old strategy has not been filled then we are going to remove it and allow the new similar one to be tried. for t in self.targetsCollection: tSecurity = algorithm.Securities[t.Symbol] if (t.Symbol.SecurityType == symbol.SecurityType and \ tSecurity.Right == targetSecurity.Right and \ t.Quantity == target.Quantity and \ tSecurity.Expiry == targetSecurity.Expiry): self.targetsCollection.Remove(t.Symbol) self.targetsCollection.Add(target) def SpreadIsFavorable(self, security): '''Determines if the spread is in desirable range.''' # Price has to be larger than zero to avoid zero division error, or negative price causing the spread percentage < 0 by error # Has to be in opening hours of exchange to avoid extreme spread in OTC period return security.Exchange.ExchangeOpen \ and security.Price > 0 and security.AskPrice > 0 and security.BidPrice > 0 \ and (security.AskPrice - security.BidPrice) / security.Price <= self.acceptingSpreadPercent
from AlgorithmImports import * from OStrategies import * import pickle from itertools import chain from collections import deque # TODO: # - implement tracker of bought and sold trades using the ObjectStore. Store date/time bought, store date/time sold. Use RollingWindow for efficiency?! # Class that handles portfolio data. We have here any method that would search the portfolio for any of the contracts we need. class Handler: """ Type of tradable security / underlying asset Option = 2 # Option Security Type (2) FutureOption = 8 # Futures Options Security Type (8) IndexOption = 10 # Index Option Security Type (10) """ OptionClasses = [SecurityType.Option, SecurityType.FutureOption, SecurityType.IndexOption] def __init__(self, algo): self.algorithm = algo # Store max 10 trades. self.tradesTracker = deque(maxlen=10) # Returns all the sold options of the underlying. # @param underlying [String] # @param optionType [OptionRight.Call | OptionRight.Put] # @param maxDays [Integer] number of days in the future that the contracts are filtered by def UnderlyingSoldOptions(self, underlying, optionType, maxDays = 60): contracts = [] for option in self.algorithm.Portfolio.Values: security = option.Security if (option.Type in self.OptionClasses and str(security.Underlying) == underlying and security.Right == optionType and option.Quantity < 0 and (security.Expiry.date() - self.algorithm.Time.date()).days < maxDays): contracts.append(option) return contracts # Returns all the options that have a quantity +- # @param underlying [String] # @param types [OptionRight.Call | OptionRight.Put] def OptionStrategies(self, underlying, types = [OptionRight.Call, OptionRight.Put]): allContracts = {} # select all the puts/calls in our portfolio for option in self.algorithm.Portfolio.Values: security = option.Security if (option.Type in self.OptionClasses and security.Right in types and str(security.Underlying) == underlying and option.Quantity != 0): allContracts.setdefault(int(security.Expiry.timestamp()), []).append(option) return allContracts # Returns an OStrategies class that holds contracts in a certain structure. # @param underlying [String] # @param ignoreStored [Boolean] # @param expiration [Array] // Range # @param short [Boolean] # @return [BaseOptionStrategy::Straddle] def Straddles(self, underlying, ignoreStored = False, expiration = [3, 750], short = None): allContracts = self.OptionStrategies(underlying) contracts = [] for t, options in allContracts.items(): # if we have 2 contracts and they are short if len(options) != 2: continue # - short = None: add straddle # - short = True and Quantity > 0: continue # - short = True and Quantity < 0: add short straddle # - short = False and Quantity > 0: add long straddle # - short = False and Quantity < 0: continue if short != None: if short == True and sum(o.Quantity for o in options) > 0: continue if short == False and sum(o.Quantity for o in options) < 0: continue # WARNING! THIS ASSUMES WE HAVE OPTIONS ON A CERTAIN DAY THAT ARE EITHER SOLD OR BOUGHT AND JUST ONCE SET PER # UNDERLYING. WE MIGHT HAVE TO UPDATE THIS # pick the put and the call Put = next(filter(lambda option: option.Security.Right == OptionRight.Put, options), None) Call = next(filter(lambda option: option.Security.Right == OptionRight.Call, options), None) if not Put or not Call: continue # check expiration if it's in range. if expiration[1] < (Put.Security.Expiry.date() - self.algorithm.Time.date()).days < expiration[0]: continue # if it's a straddle (both strikes are the same) if Put.Security.StrikePrice == Call.Security.StrikePrice: contract = Straddle(self.algorithm, Put, Call) if not ignoreStored and contract.StrategyKey() not in self.ReadTrades("Straddles"): continue contracts.append(contract) return contracts # Returns an OStrategies class that holds contracts in a certain structure. # @param underlying [String] # @param ignoreStored [Boolean] # @param expiration [Array] // Range # @return [BaseOptionStrategy::SoldPut] def SoldPuts(self, underlying, ignoreStored = False, expiration = [0, 30]): # select all the calls in our portfolio allContracts = self.OptionStrategies(underlying, [OptionRight.Put]) contracts = [] for t, puts in allContracts.items(): # if it's more or less than 1 call in the group (per date) skip if len(puts) != 1: continue Put = puts[0] # if the quantity of the contracts is not sold skip if Put.Quantity >= 0: continue # check expiration if it's in range. if expiration[1] < (Put.Security.Expiry.date() - self.algorithm.Time.date()).days < expiration[0]: continue contract = SoldPut(self.algorithm, Put) if not ignoreStored and contract.StrategyKey() not in self.ReadTrades("SoldPuts"): continue contracts.append(contract) return contracts # Returns an OStrategies class that holds contracts in a certain structure. # @param underlying [String] # @param ignoreStored [Boolean] # @param expiration [Array] // Range # @return [BaseOptionStrategy::SoldCall] def SoldCalls(self, underlying, ignoreStored = False, expiration = [0, 30]): # select all the calls in our portfolio allContracts = self.OptionStrategies(underlying, [OptionRight.Call]) contracts = [] for t, calls in allContracts.items(): # if it's more or less than 1 call in the group (per date) skip if len(calls) != 1: continue Call = calls[0] # if the quantity of the contracts is not sold skip if Call.Quantity >= 0: continue # check expiration if it's in range. if expiration[1] < (Call.Security.Expiry.date() - self.algorithm.Time.date()).days < expiration[0]: continue contract = SoldCall(self.algorithm, Call) if not ignoreStored and contract.StrategyKey() not in self.ReadTrades("SoldCalls"): continue contracts.append(contract) return contracts # Returns an OStrategies class that holds contracts in a certain structure. # @param underlying [String] # @param ignoreStored [Boolean] # @param expiration [Array] // Range # @return [BaseOptionStrategy::OCall] def Calls(self, underlying, ignoreStored = False, expiration = [0, 30]): # select all the calls in our portfolio allContracts = self.OptionStrategies(underlying, [OptionRight.Call]) contracts = [] for t, calls in allContracts.items(): # if it's more or less than 1 call in the group (per date) skip if len(calls) != 1: continue Call = calls[0] # if the quantity of the contracts is sold skip if Call.Quantity <= 0: continue # check expiration if it's in range. if expiration[1] < (Call.Security.Expiry.date() - self.algorithm.Time.date()).days < expiration[0]: continue contract = OCall(self.algorithm, Call) if not ignoreStored and contract.StrategyKey() not in self.ReadTrades("Calls"): continue contracts.append(contract) return contracts # Returns an OStrategies class that holds contracts in a certain structure. # @param underlying [String] # @param ignoreStored [Boolean] # @param expiration [Array] // Range # @return [BaseOptionStrategy::OPut] def Puts(self, underlying, ignoreStored = False, expiration = [0, 30]): # select all the puts in our portfolio allContracts = self.OptionStrategies(underlying, [OptionRight.Put]) contracts = [] for t, puts in allContracts.items(): # if it's more or less than 1 call in the group (per date) skip if len(puts) != 1: continue Put = puts[0] # if the quantity of the contracts is sold skip if Put.Quantity <= 0: continue # check expiration if it's in range. if expiration[1] < (Put.Security.Expiry.date() - self.algorithm.Time.date()).days < expiration[0]: continue contract = OPut(self.algorithm, Put) if not ignoreStored and contract.StrategyKey() not in self.ReadTrades("Puts"): continue contracts.append(contract) return contracts # Returns an OStrategies class that holds contracts in a certain structure. # @param underlying [String] # @param ignoreStored [Boolean] # @return [BaseOptionStrategy::BullPutSpread] def BullPutSpreads(self, underlying, ignoreStored = False): # select all the puts in our portfolio allContracts = self.OptionStrategies(underlying, [OptionRight.Put]) contracts = [] # if we have 2 contracts per expiration then we have a put spread. Let's filter for bull put spreads now. # shortPut: higher strike than longPut // sold # longPut: lower strike than shortPut // bought for t, puts in allContracts.items(): # if we have 2 puts with equal quantities then we have a put spread if len(puts) == 2 and sum(put.Quantity for put in puts) == 0: shortPut = next(filter(lambda put: put.Quantity < 0, puts), None) longPut = next(filter(lambda put: put.Quantity > 0, puts), None) if shortPut.Security.StrikePrice > longPut.Security.StrikePrice: # TODO replace the OptionStrategies with the existing Lean code classes. # OptionStrategies.BullPutSpread(canonicalOption, shortPut.Security.StrikePrice, longPut.Security.StrikePrice, shortPut.Security.Expiry) contract = BullPutSpread(self.algorithm, shortPut, longPut) if not ignoreStored and contract.StrategyKey() not in self.ReadTrades("BullPutSpreads"): continue contracts.append(contract) return contracts # Returns an OStrategies class that holds contracts in a certain structure. # @param underlying [String] # @param ignoreStored [Boolean] # @return [BaseOptionStrategy::BearCallSpread] def BearCallSpreads(self, underlying, ignoreStored = False): # select all the calls in our portfolio allContracts = self.OptionStrategies(underlying, [OptionRight.Call]) contracts = [] # if we have 2 contracts per expiration then we have a call spread. Let's filter for bear call spreads now. # shortCall: lower strike than longCall // sold # longCall: higher strike than shortCall // bought for t, calls in allContracts.items(): # if we have 2 calls with equal quantities then we have a call spread if len(calls) == 2 and sum(call.Quantity for call in calls) == 0: shortCall = next(filter(lambda call: call.Quantity < 0, calls), None) longCall = next(filter(lambda call: call.Quantity > 0, calls), None) if shortCall.Security.StrikePrice < longCall.Security.StrikePrice: contract = BearCallSpread(self.algorithm, shortCall, longCall) if not ignoreStored and contract.StrategyKey() not in self.ReadTrades("BearCallSpreads"): continue contracts.append(contract) return contracts # Returns an OStrategies class that holds contracts in a certain structure. # @param underlying [String] # @param ignoreStored [Boolean] # @return [BaseOptionStrategy::IronCondor] def IronCondors(self, underlying, ignoreStored = False): allContracts = self.OptionStrategies(underlying) contracts = [] # if we have 4 allContracts per expiration then we have an iron condor for t, c in allContracts.items(): if len(c) == 4: calls = [call for call in c if call.Security.Right == OptionRight.Call] puts = [put for put in c if put.Security.Right == OptionRight.Put] # if we have 2 calls and 2 puts with equal quantities then we have a condor if (len(calls) == 2 and sum(call.Quantity for call in calls) == 0 and len(puts) == 2 and sum(put.Quantity for put in puts) == 0): shortCall = next(filter(lambda call: call.Quantity < 0, calls), None) longCall = next(filter(lambda call: call.Quantity > 0, calls), None) shortPut = next(filter(lambda put: put.Quantity < 0, puts), None) longPut = next(filter(lambda put: put.Quantity > 0, puts), None) contract = IronCondor(self.algorithm, longCall, shortCall, longPut, shortPut) if not ignoreStored and contract.StrategyKey() not in self.ReadTrades("IronCondors"): continue contracts.append(contract) return contracts def FindStrategy(self, key, symbol, strategies = ["IronCondors", "BearCallSpreads", "BullPutSpreads"]): for strategy in strategies: contract = next(filter(lambda contract: contract.StrategyKey() == key, getattr(self, strategy)(symbol, True))) if contract: return contract return None # Method to fix an option strat after an assignment. This would unwind the whole strat to make sure we are not left holding # the rest of the options. # 1. It unwinds calls and puts in case the underlying stock was assigned. # 2. It liquidates the remaining options of the original option strat. # @param order [OrderEvent] # @param strategies [Array] // Eg: ["IronCondors", "SoldPuts", "Calls", "Puts"] # @return [BaseOptionStrategy] // Eg: OPut, OCall, SoldCall, IronCondor def FixAssignment(self, order, strategies = ["Straddles"]): security = order.Symbol underlying = security.Underlying.Value trade = self.GetStoredTrade(order, strategies = strategies) if not trade: return # Only sold options can be assigned the bought one expire worthless. # Sell or buy the shares for the current order if security.ID.OptionRight == OptionRight.Put: self.algorithm.MarketOrder(underlying, - order.AbsoluteQuantity * 100, False) elif security.ID.OptionRight == OptionRight.Call: self.algorithm.MarketOrder(underlying, order.AbsoluteQuantity * 100, False) portfolioOptions = chain.from_iterable(self.OptionStrategies(underlying).values()) portfolioOptions = [c.Symbol.Value for c in portfolioOptions] # sell/buy the other options in the strategy/trade for contract in trade.optionLegs: contractKey = contract.Value if contractKey in portfolioOptions: self.algorithm.Liquidate(contractKey, tag = "Liquidating {0} from {1}".format(contractKey, trade.StrategyKey())) return trade # Returns the option strat based on provided order by using the ObjectStore to create a OStrategies class with all the Securities. # This does not look into the current portfolio holdings it just tries to build the class based on the order. # It's perfect/needed when an option strat or just one piece of it is removed/assigned as we can find the original strategy this way. # @param order [OrderEvent] # @param strategies [Array] // Eg: ["IronCondors", "SoldPuts", "Calls", "Puts"] # @return [BaseOptionStrategy] // Eg: OPut, OCall, SoldCall, IronCondor def GetStoredTrade(self, order, strategies = ["Straddles"]): security = order.Symbol orderKey = "".join(security.Value.split()) for strategy in strategies: trades = self.ReadTrades(strategy) for t in trades: tradeElements = t.split("_") strategyName = tradeElements[0] # for Put, Call buying we have a separate name with `O` as prefix. if strategyName in ['Put', 'Call']: strategyName = 'O' + strategyName # select all keys except first that would be the StrategyName strategyKeys = tradeElements[1:] if orderKey in strategyKeys: # add 3 spaces between the underlying symbol and the option details. Eg: SPX210115P0382500 we replace SPX with `SPX + 3 spaces`. optionValues = [c.replace(security.Underlying.Value, security.Underlying.Value + " ") for c in strategyKeys] return eval(strategyName)(self.algorithm, *[self.algorithm.Securities[o].Symbol for o in optionValues]) return None # Returns the current trade based on strategies and order (that is usually just one piece of the whole option strat). # This method ignores the ObjectStore and tries to find the option strat in the portfolio as it currently is. That means # it ignores sold options. # @param order [OrderEvent] # @param strategies [Array] // Eg: ["IronCondors", "BearCallSpreads"] def GetCurrentTrade(self, order, strategies = ["Straddles"]): symbol = order.Symbol security = symbol.ID # get all trades of all strategies for strategy in strategies: contracts = getattr(self, strategy)(security.Symbol, ignoreStored = True) for t in contracts: if symbol.Value in t.StrategyKeys(): return t return None # Updates the data in the ObjectStore to reflect the trades/strategies in the portfolio. # @param symbol [Symbol] # @param strategies [Array] // Eg: ["IronCondors", "BearCallSpreads"] def SyncStored(self, symbol, strategies): for strategy in strategies: strategyKeys = [c.StrategyKey() for c in getattr(self, strategy)(symbol, ignoreStored = True)] self.update_ObjectStoreKey(strategy, strategyKeys) # Removes all keys from the object store thus clearing all data. def clear_ObjectStore(self): # TODO: fix this as we only want to clear the current project keys not all. keys = [str(j).split(',')[0][1:] for _, j in enumerate(self.algorithm.ObjectStore.GetEnumerator())] for key in keys: self.algorithm.ObjectStore.Delete(key) # Updates the object store key with the new value without checking for the existing data. # @param key [String] # @param value [Array] def update_ObjectStoreKey(self, key, value): key = f"{self.algorithm.ProjectId}/{key}" self.algorithm.ObjectStore.SaveBytes(key, pickle.dumps(value)) # Add trades to the object store like the following params # @param key [String] // IronCondors # @param value [OptionStrategy] // Eg: IronCondor def AddTrade(self, key, value): jsonObj = self.ReadTrades(key) if value not in jsonObj: jsonObj.append(value) self.update_ObjectStoreKey(key, jsonObj) # Remove trades from the object store by these params # @param key [String] // IronCondors # @param value [OptionStrategy] // Eg: IronCondor def RemoveTrade(self, key, value): jsonObj = self.ReadTrades(key) jsonObj.remove(value) self.update_ObjectStoreKey(key, jsonObj) # Method that reads the ObjectStore for the stored trades. It just reads the bytes and parses the json. # @param key [String] # @return [List] def ReadTrades(self, key): key = f"{self.algorithm.ProjectId}/{key}" jsonObj = [] if self.algorithm.ObjectStore.ContainsKey(key): deserialized = bytes(self.algorithm.ObjectStore.ReadBytes(key)) jsonObj = (pickle.loads(deserialized)) if jsonObj is None: jsonObj = [] jsonObj = list(set(jsonObj)) # there should be unique values in our array return jsonObj
#region imports from AlgorithmImports import * #endregion # TODO update this to use the LEAN versions of the strategies and expand on them. Maybe we don't even have to do that. OPEN and CLOSE are not needed! class BaseOptionStrategy(QCAlgorithm): name = "" optionLegs = [] securityOptionLegs = [] expiryList = [] def __init__(self, algorithm, name, optionLegs): self.algorithm = algorithm self.name = name self.optionLegs = optionLegs def ToString(self): strikeDiffStr = "" if not isinstance(self.Strike(), list): difference = round( ( ( self.Strike() - self.UnderlyingPrice() ) / self.UnderlyingPrice() ) * 100, 2) strikeDiffStr = " - (${underlyingSymbol} @ ${underlying}) = {difference}%".format(underlyingSymbol = self.Underlying(), underlying = self.UnderlyingPrice(), difference = difference) return "{name}[${strike}{strikeDiffStr}; Exp. date: {expiration}; Exp. in: {expiresIn} days]".format( name = self.name, strike = self.Strike(), strikeDiffStr = strikeDiffStr, expiration = self.Expiration(), expiresIn = self.ExpiresIn() ) # Method that returns the number of days this strategy expires in. If we have multiple explirations we return an array. def ExpiresIn(self): expirations = list(set(self.ExpiryList())) if len(expirations) > 1: return [(ex - self.algorithm.Time.date()).days for ex in expirations] else: return (expirations[0] - self.algorithm.Time.date()).days def StrategyKey(self): keysStr = "_".join(["".join(o.split()) for o in self.StrategyKeys()]) return "{}_{}".format(self.NameKey(), keysStr) def StrategyKeys(self): if self.IsContract(): ids = [o.Value for o in self.optionLegs] else: ids = [o.Symbol.Value for o in self.SecurityOptionLegs()] return ids def UnrealizedProfit(self): if not self.IsHolding(): raise Exception("The {} strategy does not hold OptionHolding instances.".format(self.name)) return round(sum([c.UnrealizedProfitPercent for c in self.optionLegs]) / len(self.optionLegs) * 100, 2) # Checks if the expiration is the same def SameExpiration(self): expirations = list(set(self.ExpiryList())) if len(expirations) > 1: return False else: return True def Open(self): self.algorithm.portfolio.AddTrade("{}s".format(self.NameKey()), self.StrategyKeys()) def Close(self): self.algorithm.portfolio.RemoveTrade("{}s".format(self.NameKey()), self.StrategyKeys()) def ExpiryList(self): if self.IsContract(): exList = [x.Date.date() for x in self.SecurityOptionLegs()] else: exList = [x.Expiry.date() for x in self.SecurityOptionLegs()] self.expiryList = self.expiryList or exList return self.expiryList def Expiration(self): return self.ExpiryList()[0] def AskPrice(self): if self.IsContract(): prices = [self.algorithm.Securities[o.Value].AskPrice for o in self.optionLegs] else: prices = [o.AskPrice for o in self.SecurityOptionLegs()] return round(sum(prices), 2) def Quantity(self): return self.QuantityList()[0] # This method returns the quantity of the trade. It checks the portfolio for the values. def QuantityList(self): # check the trade state we are in. quantities = [] for leg in self.__PortfolioLegs(): quantities.append(leg.Quantity) return quantities # Returns total fees per trade. def TotalFees(self): fees = [] for leg in self.__PortfolioLegs(): fees.append(leg.TotalFees) return round(sum(fees), 2) def SaleAmount(self): # if the current holding amount is 0 then it's worthless # so we return 0. if self.HoldingCost() == 0: return 0 return self.HoldingCost() - self.RealizedProfit() def SalePrice(self): return self.SaleAmount()/1000 def UnrealizedProfit(self): profit = [] for leg in self.__PortfolioLegs(): profit.append(leg.UnrealizedProfit) return round(sum(profit), 2) def RealizedProfit(self): profit = [] for leg in self.__PortfolioLegs(): profit.append(leg.Profit) return round(sum(profit), 2) def BuyPrice(self): return self.HoldingCost()/1000 def HoldingCost(self): return round(sum(self.HoldingCostList()), 2) # Returns the cost (buy/entry $ amount) for all legs in the strat. def HoldingCostList(self): costs = [] for leg in self.__PortfolioLegs(): costs.append(leg.HoldingsCost) return costs def UnderlyingPrice(self): return self.algorithm.Securities[self.Underlying()].Price def Strike(self): if self.IsHolding() or self.IsContract(): strikes = [c.StrikePrice for c in self.SecurityOptionLegs()] else: strikes = [c.Strike for c in self.SecurityOptionLegs()] strikes = list(set(strikes)) if len(strikes) > 1: return strikes else: return strikes[0] def SecurityOptionLegs(self): if self.IsHolding(): self.securityOptionLegs = self.securityOptionLegs or [x.Security for x in self.optionLegs] # is this a contract Symbol? elif self.IsContract(): self.securityOptionLegs = self.securityOptionLegs or [x.ID for x in self.optionLegs] else: self.securityOptionLegs = self.securityOptionLegs or self.optionLegs return self.securityOptionLegs def IsContract(self): return hasattr(self.optionLegs[0], 'ID') def IsHolding(self): return isinstance(self.optionLegs[0], OptionHolding) def IsOption(self): return isinstance(self.optionLegs[0], Option) def Underlying(self): if self.IsHolding() or self.IsContract(): # This is a str. (it might not be in the case of the Holding though?!?) return self.SecurityOptionLegs()[0].Underlying.Symbol else: return self.SecurityOptionLegs()[0].UnderlyingSymbol def NameKey(self): return "".join(self.name.split()) # Returns the legs of the strat as porfolio holdings. def __PortfolioLegs(self): legs = [] for k in self.StrategyKeys(): legs.append(self.algorithm.Portfolio[k]) return legs class Straddle(BaseOptionStrategy): Put = None Call = None def __init__(self, algo, Put, Call): BaseOptionStrategy.__init__(self, algo, "Straddle", [Put, Call]) self.Call = Call self.Put = Put self.__StrikeCheck() if self.SameExpiration() == False: raise Exception("The expiration should be the same for all options.") def __StrikeCheck(self): # is this an option holding? if self.IsHolding(): callStrike = self.Call.Security.StrikePrice putStrike = self.Put.Security.StrikePrice # is this a contract Symbol? elif self.IsContract(): callStrike = self.Call.ID.StrikePrice putStrike = self.Put.ID.StrikePrice # is this a OptionChain Symbol? else: callStrike = self.Call.Strike putStrike = self.Put.Strike if callStrike != putStrike: raise Exception("The Call strike has to be equal to the Put strike.") # TODO fix this CoveredCall (SoldCall) strategy here so it works. Change the name also as it's not a CoveredCall that implies the buying of the stock. class SoldPut(BaseOptionStrategy): Put = None def __init__(self, algo, put): BaseOptionStrategy.__init__(self, algo, "Sold Put", [put]) self.Put = put class SoldCall(BaseOptionStrategy): Call = None def __init__(self, algo, call): BaseOptionStrategy.__init__(self, algo, "Sold Call", [call]) self.Call = call class OPut(BaseOptionStrategy): Option = None def __init__(self, algo, option): BaseOptionStrategy.__init__(self, algo, "Put", [option]) self.Option = option class OCall(BaseOptionStrategy): Option = None def __init__(self, algo, option): BaseOptionStrategy.__init__(self, algo, "Call", [option]) self.Option = option class BullPutSpread(BaseOptionStrategy): shortPut = None longPut = None def __init__(self, algo, shortPut, longPut): BaseOptionStrategy.__init__(self, algo, "Bull Put Spread", [shortPut, longPut]) self.longPut = longPut self.shortPut = shortPut self.__StrikeCheck() if self.SameExpiration() == False: raise Exception("The expiration should be the same for all options.") def __StrikeCheck(self): # is this an option holding? if self.IsHolding(): shortStrike = self.shortPut.Security.StrikePrice longStrike = self.longPut.Security.StrikePrice # is this a contract Symbol? elif self.IsContract(): shortStrike = self.shortPut.ID.StrikePrice longStrike = self.longPut.ID.StrikePrice # is this a OptionChain Symbol? else: shortStrike = self.shortPut.Strike longStrike = self.longPut.Strike if longStrike > shortStrike: raise Exception("The longPut strike has to be lower than the shortPut strike.") class BearCallSpread(BaseOptionStrategy): shortCall = None longCall = None def __init__(self, algo, shortCall, longCall): BaseOptionStrategy.__init__(self, algo, "Bear Call Spread", [shortCall, longCall]) self.longCall = longCall self.shortCall = shortCall self.__StrikeCheck() if self.SameExpiration() == False: raise Exception("The expiration should be the same for all options.") def __StrikeCheck(self): # is this an option holding? if self.IsHolding(): shortStrike = self.shortCall.Security.StrikePrice longStrike = self.longCall.Security.StrikePrice # is this a contract Symbol? elif self.IsContract(): shortStrike = self.shortCall.ID.StrikePrice longStrike = self.longCall.ID.StrikePrice # is this a OptionChain Symbol? else: shortStrike = self.shortCall.Strike longStrike = self.longCall.Strike if longStrike < shortStrike: raise Exception("The longCall strike has to be higher than the shortCall strike.") # An iron condor is an options strategy consisting of two puts (one long and one short) and two calls (one long and one short), and four strike prices, all with the same expiration date. # The iron condor earns the maximum profit when the underlying asset closes between the middle strike prices at expiration. class IronCondor(BaseOptionStrategy): bullPutSpread = None bearCallSpread = None def __init__(self, algo, longCall, shortCall, longPut, shortPut): BaseOptionStrategy.__init__(self, algo, "Iron Condor", [longCall, shortCall, longPut, shortPut]) self.bullPutSpread = BullPutSpread(shortPut, longPut) self.bearCallSpread = BearCallSpread(shortCall, longCall)
#region imports from AlgorithmImports import * from .Handler import Handler from .OStrategies import * #endregion # Your New Python File
# QUANTCONNECT.COM - Democratizing Finance, Empowering Individuals. # Lean Algorithmic Trading Engine v2.0. Copyright 2014 QuantConnect Corporation. # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. # You may obtain a copy of the License at http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. from AlgorithmImports import * from Selection.OptionUniverseSelectionModel import OptionUniverseSelectionModel class SafeCallOptionSelectionModel(OptionUniverseSelectionModel): '''Creates option chain universes that select only the two week earliest expiry call contract and runs a user defined optionChainSymbolSelector every day to enable choosing different option chains''' def __init__(self, select_option_chain_symbols, targetExpiration = 14): super().__init__(timedelta(1), select_option_chain_symbols) self.targetExpiration = targetExpiration def Filter(self, filter): '''Defines the option chain universe filter''' return (filter.Strikes(+3, +5) .Expiration(self.targetExpiration, self.targetExpiration * 2) .IncludeWeeklys() .OnlyApplyFilterAtMarketOpen())
#region imports from AlgorithmImports import * from QuantConnect.Logging import * from MarketHours import MarketHours from PortfolioHandler import Handler from PortfolioHandler.OStrategies import SoldCall from CustomIndicators import RipsterClouds from CustomIndicators import TTMSqueezePro from Tools import ExpirationRange, MainLogger, TradeCredit #endregion class SafeSoldCallAlphaModel(AlphaModel): options = {} algorithm = None sliceData = None def __init__(self, algorithm, ticker): self.ticker = ticker if self.ticker in ['SPX', 'SPY']: self.option = Symbol.Create(self.ticker, SecurityType.IndexOption, Market.USA, f"?{self.ticker}") else: self.option = Symbol.Create(self.ticker, SecurityType.Option, Market.USA, f"?{self.ticker}") self.tolerance = 0.1 self.algorithm = algorithm self.symbol = algorithm.AddEquity(self.ticker) self.marketHours = MarketHours(algorithm, self.ticker) self.portfolio = Handler(self.algorithm) self.targetExpiration = ExpirationRange(7, 30) self.Log = MainLogger(self.algorithm) self.Credit = TradeCredit() # INDICATORS version: self.ATRSlow = self.algorithm.ATR(self.ticker, 15, resolution = Resolution.Daily) self.ATRFast = self.algorithm.ATR(self.ticker, 6, resolution = Resolution.Daily) self.EMASlow = self.algorithm.EMA(self.ticker, 20, resolution = Resolution.Daily) self.EMAFast = self.algorithm.EMA(self.ticker, 6, resolution = Resolution.Daily) self.aroon = self.algorithm.AROON(self.ticker, 5, resolution = Resolution.Daily) # Consider changing this to TradeBar in order to do the model for multiple symbols?!! self.LastPrice = RollingWindow[float](5) # RollingWindow[TradeBar](5) self.LastRoll = 0 self.LastDay = 0 self.vixSymbol = self.algorithm.AddIndex("VIX", Resolution.Minute).Symbol self.VIX = 0 # Set up default Indicators, these indicators are defined on the Value property of incoming data (except ATR and AROON which use the full TradeBar object) # self.indicators = { # # 'ATRSlow' : self.ATRSlow, # # 'ATRFast' : self.ATRFast, # # 'EMASlow' : self.EMASlow, # # 'EMAFast' : self.EMAFast, # # 'AROON' : self.aroon # } # self.algorithm.benchmark.AddIndicators(self.indicators) # TODO: - draw a line on the trade plot that shows the constant strike price and try and make it not touch the stock price. def Update(self, algorithm, data): if algorithm.IsWarmingUp: return [] if self.ticker not in data.Keys: return [] if data.ContainsKey(self.vixSymbol): self.VIX = data[self.vixSymbol].Close # TODO print the RollPrice to see how the algo determines the proper strike to pick and make sure we are looking further based on that for safety self.algorithm.benchmark.SetBeta(self.__RollStrike()) self.algorithm.benchmark.PrintBenchmark() weekday = self.algorithm.Time.isoweekday() insights = [] # IMPORTANT!!: In order to fix the cancelled closing order instance only check to close an order after 12. The reason is the market orders that happen # before market open are converted to MarketOnOpen orders and it seems this does not get resolved. # THIS DOES A NEGATIVE RESULT # if self.algorithm.Time.hour > self.marketHours.get_CurrentOpen().hour + 2: # # Monitor every minute # if self.marketHours.get_CurrentClose().hour - 1 > self.algorithm.Time.hour: # insights.extend(self.MonitorCoveredCall(data)) # # SCAN just once an hour # if self.algorithm.Time.minute == 5 and self.marketHours.get_CurrentClose().hour - 2 > self.algorithm.Time.hour: # insights.extend(self.Scanner(data)) # if self.algorithm.Time.hour > self.marketHours.get_CurrentOpen().hour + 4: # insights.extend(self.MonitorHedgePut(data)) # if weekday == DayOfWeek.Thursday or weekday == DayOfWeek.Tuesday: # if 11 > self.algorithm.Time.hour >= 10 and (self.algorithm.Time.minute == 15 or self.algorithm.Time.minute == 45): # insights.extend(self.Scanner(data)) # POSITIVE RESULTS ($2158 / 0.87%) # TODO think about: # - if VIX is high check multiple times a day # - if VIX is low check once a day # slightly better the above. It's 50% less than checking every hour. # if self.VIX > 30: # if self.marketHours.get_CurrentClose().hour - 2 > self.algorithm.Time.hour > self.marketHours.get_CurrentOpen().hour + 2: # if self.algorithm.Time.minute in list(range(10, 60))[0::10]: # insights.extend(self.MonitorCoveredCall(data)) # if self.algorithm.Time.minute == 5: # insights.extend(self.Scanner(data)) # else: # if self.LastDay != self.algorithm.Time.day and self.marketHours.get_CurrentClose().hour - 2 > self.algorithm.Time.hour > self.marketHours.get_CurrentOpen().hour + 2: # self.LastDay = self.algorithm.Time.day # insights.extend(self.Scanner(data)) # insights.extend(self.MonitorCoveredCall(data)) if self.marketHours.get_CurrentClose().hour - 1 > self.algorithm.Time.hour > self.marketHours.get_CurrentOpen().hour + 2: if self.algorithm.Time.minute in list(range(10, 60))[0::10]: insights.extend(self.MonitorCoveredCall(data)) if self.algorithm.Time.minute == 5: insights.extend(self.Scanner(data)) return insights def MonitorHedgePut(self, data): insights = [] stockPrice = self.algorithm.Securities[self.ticker].Price for eput in self.portfolio.UnderlyingSoldOptions(self.ticker, OptionRight.Put): close = False expiresIn = self.__ExpiresIn(eput.Security) if eput.UnrealizedProfitPercent * 100 > 90: close = True # if already invested in this position then check if it expires today elif expiresIn == 0 and self.algorithm.Time.hour == self.marketHours.get_CurrentClose().hour - 4: close = True # elif stockPrice < eput.Security.StrikePrice: # close = True if close: insights.append( Insight.Price(eput.Symbol, Resolution.Minute, 1, InsightDirection.Flat) ) return insights def MonitorCoveredCall(self, data): insights = [] # v1 # - profit <= -100 # - profit > 95 # - ExpiresIn == 0 and Time.hour > 15 # v2 # - ExpiresIn <= 2 # - profit > 80 # - profit <= -100 # hedged = self.__IsHedged() stockPrice = self.algorithm.Securities[self.ticker].Price for call in self.portfolio.SoldCalls(self.ticker, expiration = [0, self.targetExpiration.Stop]): close = False hedge = False # if ((stockPrice * 1.1 > call.Strike()) and call.UnrealizedProfit() > -50): # close = True # self.Log.Add("ROLL - call {call}: stockPrice({stockPrice}) * 1.1 > strike{strike} and profit{profit} > -50".format( # call = call.ToString(), profit = call.UnrealizedProfit(), stockPrice = stockPrice, strike = call.Strike() ) # ) # if ((stockPrice * 1.2 > call.Strike()) and call.UnrealizedProfit() > -25): # close = True # self.Log.Add("ROLL - call {call}: stockPrice({stockPrice}) * 1.2 > strike{strike} and profit{profit} > -25".format( # call = call.ToString(), profit = call.UnrealizedProfit(), stockPrice = stockPrice, strike = call.Strike() ) # ) # v2 ? if call.ExpiresIn() <= 2: close = True self.Log.Add("ROLL - call {}: expiresIn{} <= 2".format(call.ToString(), call.ExpiresIn() )) # v2 ? # if call.UnrealizedProfit() > 80: # close = True # self.Log.Add("ROLL - call {}: unrealized profit {} > 80".format(call.ToString(), call.UnrealizedProfit() )) # v1 GOOD RESULT! if call.UnrealizedProfit() > 95: close = True self.Log.Add("ROLL - call {}: unrealized profit {} > 95".format(call.ToString(), call.UnrealizedProfit() )) # v1 GOOD RESULT! if call.UnrealizedProfit() <= -100: close = True self.Log.Add("ROLL - call {}: unrealized profit {} <= -100".format(call.ToString(), call.UnrealizedProfit() )) # if stockPrice >= call.Security.StrikePrice / 1.1: # close = True # hedge = True # if stockPrice > call.Strike() and self.algorithm.Time.hour >= 12 and call.ExpiresIn() < round(self.targetExpiration * 0.8): # close = True # hedge = True # if already invested in this position then check if it expires today # v1 GOOD RESULT! if call.ExpiresIn() == 0 and self.algorithm.Time.hour > 15: close = True self.Log.Add("ROLL - call {}: expiresIn == 0 and hour {} > 15".format(call.ToString(), self.algorithm.Time.hour )) # if self.indicators['MACD'].Signal.Current.Value >= 10 and self.indicators['RSI'].Current.Value >= 70 and stockPrice >= call.Security.StrikePrice / 1.5: # close = True # elif (self.__SignalDeltaPercent() < -self.tolerance and self.indicators['RSI'].Current.Value > 70): # close = True # hedge = True # if hedge and not hedged: # hedgeContract = self.__FindPut(data) # insights.append( # Insight.Price(hedgeContract.Symbol, Resolution.Minute, 1, InsightDirection.Down) # ) if close: self.Credit.Buy(call.AskPrice()) self.Log.Add("ROLL - buying with credit {} / ${}".format(self.Credit.ToString(), call.AskPrice())) insights.append( Insight.Price(call.Call.Symbol, Resolution.Minute, 15, InsightDirection.Flat) ) self.Log.Print() return insights def Scanner(self, data): insights = [] ## Buying conditions # if self.indicators['RSI'].Current.Value < 40: return insights # IF RSI is > 40 -- NO # | YES # if self.indicators['MACD'].Signal.Current.Value >= 10 and self.indicators['RSI'].Current.Value >= 70: # return insights # 0 positions covered calls or hedge -- NO # | YES # lenSoldCalls = len(self.portfolio.UnderlyingSoldOptions(self.ticker, OptionRight.Call)) lenSoldCalls = len(self.portfolio.SoldCalls(self.ticker, expiration = [0, self.targetExpiration.Stop])) # if lenSoldCalls > 0 or self.__IsHedged(): # return insights if lenSoldCalls == 0: # call = self.__FindCall(data) call = self.__IndicatorFindCall(data) self.Log.Add("SCANNER - Trying to find a call") if call: self.Credit.Sell(call.AskPrice()) self.Log.Add("SCANNER - selling {} with credit {} / ${}".format(call.ToString(), self.Credit.ToString(), call.AskPrice()) ) insights.append( Insight.Price(call.Call, Resolution.Minute, 15, InsightDirection.Down) ) self.Log.Print() return insights def __FindPut(self, data, delta = -0.6): chain = data.OptionChains.GetValue(self.option) if not chain: return None # The way we are defining expiration here is by taking an absolute value. So it might just be __ExpiresIn(x) > expiration contracts = [x for x in chain if self.__ExpiresIn(x) >= self.targetExpiration and x.Right == OptionRight.Put] # # only tradable contracts # # !!IMPORTANT!!: to escape the error `Backtest Handled Error: The security with symbol 'SPY 220216P00425000' is marked as non-tradable.` contracts = [x for x in contracts if self.algorithm.Securities[x.Symbol].IsTradable] if not contracts: return None return min(contracts, key=lambda x: abs(x.Greeks.Delta - delta)) def __FindCall(self, data): call = None stockPrice = self.__StockPrice() minCredit = 1 # default min credit of 100$ # in case we are rolling and we had a loss then roll for a bigger premium if self.Credit.LastValue > 1: minCredit = self.Credit.LastValue * 1.1 contracts = self.algorithm.OptionChainProvider.GetOptionContractList(self.ticker, self.algorithm.Time.date()) if len(contracts) == 0 : return None # # only tradable contracts # # !!IMPORTANT!!: to escape the error `Backtest Handled Error: The security with symbol 'SPY 220216P00425000' is marked as non-tradable.` contracts = [x for x in contracts if self.algorithm.Securities[x.ID.Symbol].IsTradable] # pick withing expiration range contracts = [i for i in contracts if self.targetExpiration.Start <= (i.ID.Date.date() - self.algorithm.Time.date()).days <= self.targetExpiration.Stop] if not contracts: return None # select only calls contracts = [x for x in contracts if x.ID.OptionRight == OptionRight.Call] # sort by date contracts = sorted(contracts, key = lambda x: x.ID.Date, reverse = False) # TODO make this work based on EMA and some formula below self.__RollStrike # pick contracts with strikes between 15% and 20% of spot price contracts = [x for x in contracts if stockPrice * 1.1 < x.ID.StrikePrice < stockPrice * 1.20] # contracts = ATMcontracts # add all the option contracts so we can access all the data. for c in contracts: self.algorithm.AddOptionContract(c, Resolution.Minute) calls = [[self.algorithm.Securities[c.Value].AskPrice, c] for c in contracts] call = min(calls, key=lambda x: abs(x[0] - minCredit))[1] if not call: return None return SoldCall(call) def __ChainFindCall(self, data, delta = 0.01): chain = data.OptionChains.GetValue(self.option) if not chain: return None # The way we are defining expiration here is by taking an absolute value. So it might just be __ExpiresIn(x) > expiration contracts = [x for x in chain if self.__ExpiresIn(x) >= self.targetExpiration and x.Right == OptionRight.Call] # # only tradable contracts # # !!IMPORTANT!!: to escape the error `Backtest Handled Error: The security with symbol 'SPY 220216P00425000' is marked as non-tradable.` contracts = [x for x in contracts if self.algorithm.Securities[x.Symbol].IsTradable] if not contracts: return None return min(contracts, key=lambda x: abs(x.Greeks.Delta - delta)) def __IsHedged(self): return len(self.portfolio.SoldPuts(self.ticker, expiration = [0, self.targetExpiration.Stop])) > 0 # Method that returns a boolean if the security expires in the given days # @param security [Security] the option contract def __ExpiresIn(self, security): return (security.Expiry.date() - self.algorithm.Time.date()).days def __SignalDeltaPercent(self): return (self.indicators['MACD'].Current.Value - self.indicators['MACD'].Signal.Current.Value) / self.indicators['MACD'].Fast.Current.Value def __StockPrice(self): return self.algorithm.Securities[self.ticker].Price def __IndicatorFindCall(self, data): call = None stockPrice = self.__StockPrice() minCredit = 1 # default min credit of 100$ # in case we are rolling and we had a loss then roll for a bigger premium if self.Credit.LastValue > 1: minCredit = self.Credit.LastValue * 1.1 contracts = self.algorithm.OptionChainProvider.GetOptionContractList(self.ticker, self.algorithm.Time.date()) if len(contracts) == 0 : return None # # only tradable contracts # # !!IMPORTANT!!: to escape the error `Backtest Handled Error: The security with symbol 'SPY 220216P00425000' is marked as non-tradable.` contracts = [x for x in contracts if self.algorithm.Securities[x.ID.Symbol].IsTradable] # pick within expiration range contracts = [i for i in contracts if self.targetExpiration.Start <= (i.ID.Date.date() - self.algorithm.Time.date()).days <= self.targetExpiration.Stop] if not contracts: return None # select only calls contracts = [x for x in contracts if x.ID.OptionRight == OptionRight.Call] # sort by date contracts = sorted(contracts, key = lambda x: x.ID.Date, reverse = False) # in order to incorporate minCredit we can pick a range of contracts around the rollPrice. rollStrike = self.__RollStrike() # pick 3 contracts lower and 3 contracts up from rollStrike # contracts = [x for x in contracts if 15 < abs(x.ID.StrikePrice - rollStrike) < 15] # for c in contracts: # self.algorithm.AddOptionContract(c, Resolution.Minute) # calls = [[self.algorithm.Securities[c.Value].AskPrice, c] for c in contracts] # call = min(calls, key=lambda x: abs(x[0] - minCredit))[1] # pick the contract at the defined RollPrice call = min(contracts, key = lambda x: abs(x.ID.StrikePrice - rollStrike)) # add all the option contracts so we can access all the data. self.algorithm.AddOptionContract(call, Resolution.Minute) if not call: return None return SoldCall(call) # Roll to a strike a certain percentage in price up def __RollStrike(self): ATRSlow = self.ATRSlow.Current.Value ATRFast = self.ATRFast.Current.Value EMASlow = self.EMASlow.Current.Value EMAFast = self.EMAFast.Current.Value stockPrice = self.__StockPrice() ema_ratio = EMAFast / EMASlow self.LastPrice.Add(stockPrice) # oldestPrice - currentPrice over the 5 day window move = self.LastPrice[self.LastPrice.Count-1] - self.LastPrice[0] #newStrike = (( # max(ATRSlow, ATRFast) / max(EMAFast, stockPrice) + 1 #) ** 3) * max(EMAFast, EMASlow, stockPrice) #* ema_ratio # TODO replace the 1.4 by EMA ratio #newStrike = (max(EMAFast, EMASlow, stockPrice) + max(ATRSlow, ATRFast)) * ema_ratio #newStrike = (max(EMAFast, stockPrice) + max(ATRSlow, ATRFast)) * (ema_ratio ** 1.5) # TODO because of these given ratios 1.25, 1.3 the testing starting with 2017 does not give good results. The swings in price were too big back then. newStrike = max(EMASlow, EMAFast, stockPrice)*1.25 * (ema_ratio ** 2) + max(ATRSlow, ATRFast) * 1 #max(EMAFast, stockPrice)*1.05 * (ema_ratio ** 2) + move * 0.8, newStrike = max( newStrike, max(EMAFast, stockPrice) * 1.25 #stockPrice * 1.03 #EMAFast ) # TODO instead of ATR lets just check how big the move was in the past 5 days and project it into the future newStrike = min( newStrike, max(EMAFast, stockPrice)*1.3 ) #newStrike = max(EMAFast, stockPrice) * 1.02 + move * 0.8 # dampen falloff if self.LastRoll > newStrike: newStrike = (self.LastRoll + newStrike*2) / 3 self.LastRoll = newStrike # in case of rolling we try to make up for the loss by not moving the strike too much #if stockPrice > strike: # newStrike = strike * 1.05 return newStrike
#region imports from AlgorithmImports import * from QuantConnect.Logging import * from collections import deque from MarketHours import MarketHours from PortfolioHandler import Handler from PortfolioHandler.OStrategies import OCall, OPut, SoldCall, SoldPut from CustomIndicators import RipsterClouds, TTMSqueezePro from Tools import ConsolidateIndicator, ExpirationRange, MainLogger, TradeCredit #endregion # TODO: # - open trades smaller like start with 5 contracts and add more until we get to 15 if the move keeps going # - add a check for profit on the calls if it falls to -20% then close it. # - add a loop so we can open trades on multiple stocks. This works great on large cap stocks so doing it on 200B+ works well. # - FIGURE out the best way to pick the strike for sold calls based on momentum (ATR weekly with some EMA component?! similar to what the old method is) # [this should be profitable before doing the rest of the sold version] # - ADD sold puts when bullish similar to sold calls. # - IMPLEMENT SELLING OF OPTIONS VIA SPREADS INSTEAD OF NAKED OPTIONS. class SqueezeNakedAlphaModel(AlphaModel): algorithm = None def __init__(self, algorithm, ticker): self.ticker = ticker self.algorithm = algorithm self.symbol = algorithm.AddEquity(self.ticker, resolution = Resolution.Minute) self.marketHours = MarketHours(algorithm, self.ticker) self.portfolio = Handler(self.algorithm) self.targetExpiration = ExpirationRange(25, 45) self.neutralExpiration = ExpirationRange(7, 14) self.Log = MainLogger(self.algorithm) self.Credit = TradeCredit() # indicators self.dataQueue = deque(maxlen=14) # region using ATR Levels(!! from Saty as a guide) # Create weekly calendar consolidator # self.ATR = AverageTrueRange(14) # self.tickerIdentity = Identity('PrevClose') # self.High = Identity('High') # self.Low = Identity('Low') # ConsolidateIndicator(self.algorithm, self.ticker, self.ATR, 'W').Call() # weekly consolidation for ATR # ConsolidateIndicator(self.algorithm, self.ticker, self.tickerIdentity, 'W').Call(Field.Close) # weekly consolidation for Close # ConsolidateIndicator(self.algorithm, self.ticker, self.High, 'W').Call(Field.High) # weekly consolidation for High # ConsolidateIndicator(self.algorithm, self.ticker, self.Low, 'W').Call(Field.Low) # weekly consolidation for Low # endregion # Indicator to get the previous close value self.tickerIdentity = self.algorithm.Identity(self.ticker, Resolution.Daily, Field.Close) # region using EMAS+ATR for call strike self.ATRSlow = AverageTrueRange(15) self.ATRFast = AverageTrueRange(6) ConsolidateIndicator(self.algorithm, self.ticker, self.ATRSlow, 'W').Call() # weekly consolidation for ATR ConsolidateIndicator(self.algorithm, self.ticker, self.ATRFast, 'W').Call() # weekly consolidation for ATR self.EMASlow = self.algorithm.EMA(self.ticker, 20, resolution = Resolution.Daily) self.EMAFast = self.algorithm.EMA(self.ticker, 5, resolution = Resolution.Daily) self.LastPrice = RollingWindow[float](5) # RollingWindow[TradeBar](5) self.LastRoll = 0 # endregion self.squeeze = TTMSqueezePro("Squeeze", length = 20) algorithm.RegisterIndicator(self.ticker, self.squeeze, Resolution.Daily) # history = self.algorithm.History(self.symbol.Symbol, 21, Resolution.Daily) # self.squeeze.Warmup(history.loc[self.ticker]) # TODO: think about warming up the indicator: https://www.quantconnect.com/docs/v2/writing-algorithms/indicators/manual-indicators # self.algorithm.SetWarmUp(TimeSpan.FromDays(60)) self.algorithm.WarmUpIndicator(self.ticker, self.squeeze, Resolution.Daily) self.indicators = { # 'Squeeze' : self.squeeze, # 'Trade Plot': ["Call", "Put"] 'Trade Plot': ["BullPutSpread", "BearCallSpread"] } self.algorithm.benchmark.AddIndicators(self.indicators) # In order to update the current day until a certain point (in our testing 5 mins before market close) we need to build a consolidator and update it # self.symbol = self.AddEquity("SPY", Resolution.Minute).Symbol self.consolidator = TradeBarConsolidator(timedelta(1)) self.algorithm.SubscriptionManager.AddConsolidator(self.symbol.Symbol, self.consolidator) # self.consolidator = TradeBarConsolidator(timedelta(days=1)) # # Update the Squeeze indicator with the consolidated bar # self.consolidator.DataConsolidated += (lambda _, bar : self.squeeze.Update(bar)) # self.SubscriptionManager.AddConsolidator(self.symbol, self.consolidator) self.algorithm.Schedule.On(self.algorithm.DateRules.EveryDay(self.ticker), self.algorithm.TimeRules.BeforeMarketClose(self.ticker, 5), self.UpdateSqueeze) # self.algorithm.Schedule.On(self.algorithm.DateRules.EveryDay(self.ticker), self.algorithm.TimeRules.BeforeMarketClose(self.ticker, 5), self.UpdateSqueeze) def UpdateSqueeze(self): data = self.algorithm.CurrentSlice if data.ContainsKey(self.ticker): # dataPoint = IndicatorDataPoint(self.symbol, data[self.ticker].EndTime, data[self.ticker].Close) # self.squeeze.ManualUpdate(data[self.ticker]) # self.algorithm.Plot('Daily Value', 'CLOSE', data[self.ticker].Close) # self.algorithm.Plot('Daily Value', 'HIGH', data[self.ticker].High) # self.algorithm.Plot('Daily Value', 'LOW', data[self.ticker].Low) if self.squeeze.IsReady: # self.algorithm.benchmark.SetBeta(self.__FindATRPrice(OptionRight.Call)) self.algorithm.benchmark.SetBeta(self.__RollStrike()) self.algorithm.benchmark.PrintBenchmark() def Update(self, algorithm, data): insights = [] # TODO: all works good it's just one day delay. Consider removing the registerIndicator line and update the indicator manually here before day end. # one problem might be with warmup. Consider this for warmup where we do that manually also: https://www.quantconnect.com/forum/discussion/1810/indicator-updates-after-hours-trading/p1/comment-5524 if self.ticker not in data.Keys: return insights # Update the squeeze with the data on the current day before market close. # if self.marketHours.get_CurrentClose().hour - 1 == self.algorithm.Time.hour and self.algorithm.Time.minute == 45: # bar = data.Bars[self.ticker] # self.squeeze.Update(bar) if algorithm.IsWarmingUp: return insights if not self.squeeze.IsReady: return insights # reset your indicators when splits and dividends occur. # If a split or dividend occurs, the data in your indicators becomes invalid because it doesn't account for the price adjustments that the split or dividend causes. if data.Splits.ContainsKey(self.ticker) or data.Dividends.ContainsKey(self.ticker): # Reset the indicator self.squeeze.Reset() if self.marketHours.get_CurrentClose().hour - 1 > self.algorithm.Time.hour > self.marketHours.get_CurrentOpen().hour + 2: if self.algorithm.Time.minute in list(range(10, 60))[0::10]: insights.extend(self.MonitorBull(data)) insights.extend(self.MonitorBear(data)) if self.marketHours.get_CurrentClose().hour - 1 == self.algorithm.Time.hour: # Update squeeze indicator for today. # TODO: manually update the squeeze indicator for the today value. I'm thinking it should be temporary and replaced once the regular update comes the next day. # if self.algorithm.Time.minute == 25: # self.squeeze.Update(self.consolidator.WorkingBar) # Check the existing position # if self.algorithm.Time.minute == 30: # insights.extend(self.MonitorBull(data)) # insights.extend(self.MonitorBear(data)) # Scan for a new position. if self.algorithm.Time.minute == 45: insights.extend(self.ScannerBull(data)) insights.extend(self.ScannerBear(data)) return insights # REGION: naked buy PUT/CALL def MonitorBear(self, data): insights = [] # - ExpiresIn <= 2 # - profit > 80 # - profit <= -100 for put in self.portfolio.Puts(self.ticker, expiration = [0, self.targetExpiration.Stop]): close = False if put.ExpiresIn() <= 7: close = True self.Log.Add("SELL put {}: expiresIn{} <= 7".format(put.ToString(), put.ExpiresIn() )) if put.UnrealizedProfit() < -10: close = True self.Log.Add("SELL put {}: {}% < -10%".format(put.ToString(), put.UnrealizedProfit()) ) # if put.UnrealizedProfit() >= 100: # close = True # self.Log.Add("SELL put {}: {} >= 100%".format(put.ToString(), put.UnrealizedProfit()) ) # as soon as we gain momentum over 2 days we sell. if self.squeeze.GainingMomentum(maxDays = 2): close = True self.Log.Add("SELL put {}: bullish momentum".format(put.ToString() )) if close: self.Credit.Sell(put.AskPrice()) self.Log.Add("SELL with credit {} / ${} / {}%".format(self.Credit.ToString(), put.AskPrice(), put.UnrealizedProfit()) ) insights.append( Insight.Price(put.Option.Symbol, Resolution.Minute, 15, InsightDirection.Flat) ) self.Log.Print() return insights def ScannerBear(self, data): insights = [] ## Buying conditions lenPuts = len(self.portfolio.Puts(self.ticker, expiration = [0, self.targetExpiration.Stop])) # We have Puts already then just skip for now. if lenPuts > 0: return insights # if we did not change from a squeeze to no sqz `green` then skip if not self.squeeze.SqueezeChange(toColor='green'): return insights # if the squeeze that happened since the change to `green` was not longer than 2 days skip if not self.squeeze.SqueezeDuration(over=2): return insights # only buy puts if we are losing momentum if not self.squeeze.LosingMomentum(): return insights # only buy if we are bearish if not self.squeeze.Bearish(): return insights # call = self.__FindCall(data) put = self.__IndicatorFindOption(data, optionType = OptionRight.Put) self.Log.Add("SCANNER - Trying to find a put") if put: self.Credit.Buy(put.AskPrice()) self.Log.Add("SCANNER - buying {} with credit {} / ${}".format(put.ToString(), self.Credit.ToString(), put.AskPrice()) ) insights.append( Insight.Price(put.Option, Resolution.Minute, 15, InsightDirection.Up) ) self.Log.Print() return insights def MonitorBull(self, data): insights = [] # - ExpiresIn <= 2 # - profit > 80 # - profit <= -100 for call in self.portfolio.Calls(self.ticker, expiration = [0, self.targetExpiration.Stop]): close = False if call.ExpiresIn() <= 7: close = True self.Log.Add("SELL call {}: expiresIn{} <= 7".format(call.ToString(), call.ExpiresIn() )) if call.UnrealizedProfit() < -10: close = True self.Log.Add("SELL call {}: {}% < -10%".format(call.ToString(), call.UnrealizedProfit()) ) # if call.UnrealizedProfit() >= 100: # close = True # self.Log.Add("SELL call {}: {} >= 100%".format(call.ToString(), call.UnrealizedProfit()) ) # as soon as we lose momentum over 2 days we sell. if self.squeeze.LosingMomentum(maxDays = 2): close = True self.Log.Add("SELL call {}: bearish momentum".format(call.ToString() )) if close: self.Credit.Sell(call.AskPrice()) self.Log.Add("SELL with credit {} / ${} / {}%".format(self.Credit.ToString(), call.AskPrice(), call.UnrealizedProfit()) ) insights.append( Insight.Price(call.Option.Symbol, Resolution.Minute, 15, InsightDirection.Flat) ) self.Log.Print() return insights def ScannerBull(self, data): insights = [] ## Buying conditions lenCalls = len(self.portfolio.Calls(self.ticker, expiration = [0, self.targetExpiration.Stop])) # We have calls already then just skip for now. if lenCalls > 0: return insights # if we did not change from a squeeze to no sqz `green` then skip if not self.squeeze.SqueezeChange(toColor='green'): return insights # if the squeeze that happened since the change to `green` was not longer than 2 days skip if not self.squeeze.SqueezeDuration(over=2): return insights # only buy calls if we are gaining momentum if not self.squeeze.GainingMomentum(): return insights # only buy if we are bullish if not self.squeeze.Bullish(): return insights # call = self.__FindCall(data) call = self.__IndicatorFindOption(data, optionType = OptionRight.Call) self.Log.Add("SCANNER - Trying to find a call") if call: self.Credit.Buy(call.AskPrice()) self.Log.Add("SCANNER - buying {} with credit {} / ${}".format(call.ToString(), self.Credit.ToString(), call.AskPrice()) ) insights.append( Insight.Price(call.Option, Resolution.Minute, 15, InsightDirection.Up) ) self.Log.Print() return insights # ENDREGION: monitor buy naked CALL/PUT def __StockPrice(self): return self.algorithm.Securities[self.ticker].Price def __GetContracts(self, data, expiration = None): if not expiration: return None contracts = self.algorithm.OptionChainProvider.GetOptionContractList(self.ticker, self.algorithm.Time.date()) if len(contracts) == 0 : return None # # only tradable contracts # # !!IMPORTANT!!: to escape the error `Backtest Handled Error: The security with symbol 'SPY 220216P00425000' is marked as non-tradable.` contracts = [x for x in contracts if self.algorithm.Securities[x.ID.Symbol].IsTradable] # pick within expiration range contracts = [i for i in contracts if expiration.Start <= (i.ID.Date.date() - self.algorithm.Time.date()).days <= expiration.Stop] return contracts def __FindATRPrice(self, optionType = OptionRight.Call): trigger_percentage = 0.236 middle_percentage = 0.618 atr = self.ATR.Current.Value previous_close = self.tickerIdentity.Current.Value lower_trigger = previous_close - (trigger_percentage * atr) # biggest value 1ATR upper_trigger = previous_close + (trigger_percentage * atr) # biggest value 1ATR lower_middle = previous_close - (middle_percentage * atr) # middle value 1ATR upper_middle = previous_close + (middle_percentage * atr) # middle value 1ATR lower_atr = previous_close - atr # lowest value 1ATR upper_atr = previous_close + atr # lowest value 1ATR # FOR EXTENSION VALUES!? lower_extension = (lower_atr) - (trigger_percentage * atr) # biggest value 2ATR upper_extension = (upper_atr) + (trigger_percentage * atr) # biggest value 2ATR lower_middle_extension = (lower_atr) - (middle_percentage * atr) # middle value 2ATR upper_middle_extension = (upper_atr) + (middle_percentage * atr) # middle value 2ATR lower_2atr_extension = lower_atr - atr # lowest value 2ATR upper_2atr_extension = upper_atr + atr # lowest value 2ATR if optionType == OptionRight.Call: # CALL price return upper_atr elif optionType == OptionRight.Put: # PUT price return lower_atr def __RollStrike(self): if not self.ATRSlow.IsReady or not self.ATRFast.IsReady or not self.EMASlow.IsReady or not self.EMAFast.IsReady: return 0 ATRSlow = self.ATRSlow.Current.Value ATRFast = self.ATRFast.Current.Value EMASlow = self.EMASlow.Current.Value EMAFast = self.EMAFast.Current.Value stockPrice = self.__StockPrice() ema_ratio = EMAFast / EMASlow self.LastPrice.Add(stockPrice) # oldestPrice - currentPrice over the 5 day window move = self.LastPrice[self.LastPrice.Count-1] - self.LastPrice[0] newStrike = max(EMASlow, EMAFast, stockPrice)*1.25 * (ema_ratio ** 2) + max(ATRSlow, ATRFast) * 1 newStrike = max( newStrike, max(EMAFast, stockPrice) * 1.25 ) newStrike = min( newStrike, max(EMAFast, stockPrice)*1.3 ) if self.LastRoll > newStrike: newStrike = (self.LastRoll + newStrike*2) / 3 self.LastRoll = newStrike return newStrike def __ATRFindOption(self, data, optionType = OptionRight.Call): call = None stockPrice = self.__StockPrice() minCredit = 1 # default min credit of 100$ # in case we are BUYing and we had a loss then roll for a bigger premium if self.Credit.LastValue > 1: minCredit = self.Credit.LastValue * 1.1 contracts = self.__GetContracts(data, expiration = self.neutralExpiration) if not contracts: return None # select only calls contracts = [x for x in contracts if x.ID.OptionRight == OptionRight.Call] # sort by date contracts = sorted(contracts, key = lambda x: x.ID.Date, reverse = False) # pick the contract by price # atrPrice = self.__FindATRPrice(optionType=optionType) atrPrice = self.__RollStrike() contract = min(contracts, key = lambda x: abs(x.ID.StrikePrice - atrPrice)) # add all the option contracts so we can access all the data. self.algorithm.AddOptionContract(contract, Resolution.Minute) if not contract: return None if optionType == OptionRight.Call: return SoldCall(contract) elif optionType == OptionRight.Put: return SoldPut(contract) # @param data [Array] # @param type [OptionRight] def __IndicatorFindOption(self, data, optionType = OptionRight.Call): contract = None stockPrice = self.__StockPrice() minCredit = 1 # default min credit of 100$ # in case we are rolling and we had a loss then roll for a bigger premium if self.Credit.LastValue > 1: minCredit = self.Credit.LastValue * 1.1 contracts = self.__GetContracts(data, expiration = self.targetExpiration) if not contracts: return None # select only optionType contracts = [x for x in contracts if x.ID.OptionRight == optionType] # sort by date contracts = sorted(contracts, key = lambda x: x.ID.Date, reverse = False) # pick the contract ATM contract = min(contracts, key = lambda x: abs(x.ID.StrikePrice - stockPrice)) # add all the option contracts so we can access all the data. self.algorithm.AddOptionContract(contract, Resolution.Minute) if not contract: return None if optionType == OptionRight.Call: return OCall(contract) elif optionType == OptionRight.Put: return OPut(contract)
#region imports from AlgorithmImports import * from QuantConnect.Logging import * from collections import deque from MarketHours import MarketHours from PortfolioHandler import Handler from PortfolioHandler.OStrategies import OCall, OPut, SoldCall, SoldPut from CustomIndicators import RipsterClouds, TTMSqueezePro from Tools import ConsolidateIndicator, ExpirationRange, MainLogger, TradeCredit #endregion # TODO: # - open trades smaller like start with 5 contracts and add more until we get to 15 if the move keeps going # - add a check for profit on the calls if it falls to -20% then close it. # - add a loop so we can open trades on multiple stocks. This works great on large cap stocks so doing it on 200B+ works well. # - FIGURE out the best way to pick the strike for sold calls based on momentum (ATR weekly with some EMA component?! similar to what the old method is) # [this should be profitable before doing the rest of the sold version] # - ADD sold puts when bullish similar to sold calls. # - IMPLEMENT SELLING OF OPTIONS VIA SPREADS INSTEAD OF NAKED OPTIONS. class SqueezeSoldNakedAlphaModel(AlphaModel): algorithm = None def __init__(self, algorithm, ticker, option): self.ticker = ticker self.option = option self.algorithm = algorithm self.symbol = algorithm.AddEquity(self.ticker, resolution = Resolution.Minute) self.marketHours = MarketHours(algorithm, self.ticker) self.portfolio = Handler(self.algorithm) self.targetExpiration = ExpirationRange(25, 45) self.neutralExpiration = ExpirationRange(7, 14) self.Log = MainLogger(self.algorithm) self.Credit = TradeCredit() # indicators self.dataQueue = deque(maxlen=14) # Indicator to get the previous close value self.tickerIdentity = self.algorithm.Identity(self.ticker, Resolution.Daily, Field.Close) # region using EMAS+ATR for call strike self.ATRSlow = AverageTrueRange(15) self.ATRFast = AverageTrueRange(6) ConsolidateIndicator(self.algorithm, self.ticker, self.ATRSlow, 'W').Call() # weekly consolidation for ATR ConsolidateIndicator(self.algorithm, self.ticker, self.ATRFast, 'W').Call() # weekly consolidation for ATR self.EMASlow = self.algorithm.EMA(self.ticker, 20, resolution = Resolution.Daily) self.EMAFast = self.algorithm.EMA(self.ticker, 5, resolution = Resolution.Daily) self.LastPrice = RollingWindow[float](5) # RollingWindow[TradeBar](5) self.LastRoll = 0 # endregion self.squeeze = TTMSqueezePro("Squeeze", length = 20) algorithm.RegisterIndicator(self.ticker, self.squeeze, Resolution.Daily) # history = self.algorithm.History(self.symbol.Symbol, 21, Resolution.Daily) # self.squeeze.Warmup(history.loc[self.ticker]) # TODO: think about warming up the indicator: https://www.quantconnect.com/docs/v2/writing-algorithms/indicators/manual-indicators # self.algorithm.SetWarmUp(TimeSpan.FromDays(60)) self.algorithm.WarmUpIndicator(self.ticker, self.squeeze, Resolution.Daily) self.indicators = { # 'Squeeze' : self.squeeze, 'Trade Plot': ["Call", "Put"] } self.algorithm.benchmark.AddIndicators(self.indicators) # In order to update the current day until a certain point (in our testing 5 mins before market close) we need to build a consolidator and update it # self.symbol = self.AddEquity("SPY", Resolution.Minute).Symbol self.consolidator = TradeBarConsolidator(timedelta(1)) self.algorithm.SubscriptionManager.AddConsolidator(self.symbol.Symbol, self.consolidator) # self.consolidator = TradeBarConsolidator(timedelta(days=1)) # # Update the Squeeze indicator with the consolidated bar # self.consolidator.DataConsolidated += (lambda _, bar : self.squeeze.Update(bar)) # self.SubscriptionManager.AddConsolidator(self.symbol, self.consolidator) self.algorithm.Schedule.On(self.algorithm.DateRules.EveryDay(self.ticker), self.algorithm.TimeRules.BeforeMarketClose(self.ticker, 5), self.UpdateSqueeze) # self.algorithm.Schedule.On(self.algorithm.DateRules.EveryDay(self.ticker), self.algorithm.TimeRules.BeforeMarketClose(self.ticker, 5), self.UpdateSqueeze) def UpdateSqueeze(self): data = self.algorithm.CurrentSlice if data.ContainsKey(self.ticker): # dataPoint = IndicatorDataPoint(self.symbol, data[self.ticker].EndTime, data[self.ticker].Close) # self.squeeze.ManualUpdate(data[self.ticker]) if self.squeeze.IsReady: # self.algorithm.benchmark.SetBeta(self.__FindATRPrice(OptionRight.Call)) self.algorithm.benchmark.SetBeta(self.__RollStrike()) self.algorithm.benchmark.PrintBenchmark() def Update(self, algorithm, data): insights = [] # TODO: all works good it's just one day delay. Consider removing the registerIndicator line and update the indicator manually here before day end. # one problem might be with warmup. Consider this for warmup where we do that manually also: https://www.quantconnect.com/forum/discussion/1810/indicator-updates-after-hours-trading/p1/comment-5524 if self.ticker not in data.Keys: return insights # Update the squeeze with the data on the current day before market close. # if self.marketHours.get_CurrentClose().hour - 1 == self.algorithm.Time.hour and self.algorithm.Time.minute == 45: # bar = data.Bars[self.ticker] # self.squeeze.Update(bar) if algorithm.IsWarmingUp: return insights if not self.squeeze.IsReady: return insights # reset your indicators when splits and dividends occur. # If a split or dividend occurs, the data in your indicators becomes invalid because it doesn't account for the price adjustments that the split or dividend causes. if data.Splits.ContainsKey(self.ticker) or data.Dividends.ContainsKey(self.ticker): # Reset the indicator self.squeeze.Reset() if self.marketHours.get_CurrentClose().hour - 1 > self.algorithm.Time.hour > self.marketHours.get_CurrentOpen().hour + 2: if self.algorithm.Time.minute in list(range(10, 60))[0::10]: insights.extend(self.MonitorSoldCall(data)) insights.extend(self.MonitorSoldPut(data)) if self.marketHours.get_CurrentClose().hour - 1 == self.algorithm.Time.hour: # Update squeeze indicator for today. # TODO: manually update the squeeze indicator for the today value. I'm thinking it should be temporary and replaced once the regular update comes the next day. # if self.algorithm.Time.minute == 25: # self.squeeze.Update(self.consolidator.WorkingBar) # Check the existing position # if self.algorithm.Time.minute == 30: # insights.extend(self.MonitorBull(data)) # insights.extend(self.MonitorBear(data)) # Scan for a new position. if self.algorithm.Time.minute == 45: insights.extend(self.ScannerSoldCall(data)) insights.extend(self.ScannerSoldPut(data)) return insights def MonitorSoldCall(self, data): insights = [] # - ExpiresIn <= 2 # - profit > 80 # - profit <= -100 for call in self.portfolio.SoldCalls(self.ticker, expiration = [0, self.neutralExpiration.Stop]): close = False if call.ExpiresIn() <= 2: close = True self.Log.Add("BUY - call {}: expiresIn{} <= 2".format(call.ToString(), call.ExpiresIn() )) # v1 GOOD RESULT! # if call.UnrealizedProfit() > 95: # close = True # self.Log.Add("BUY - call {}: unrealized profit {} > 95".format(call.ToString(), call.UnrealizedProfit() )) # v1 GOOD RESULT! if call.UnrealizedProfit() <= -100: close = True self.Log.Add("BUY - call {}: unrealized profit {} <= -100".format(call.ToString(), call.UnrealizedProfit() )) # v1 GOOD RESULT! if call.ExpiresIn() == 0 and self.algorithm.Time.hour > 15: close = True self.Log.Add("BUY - call {}: expiresIn == 0 and hour {} > 15".format(call.ToString(), self.algorithm.Time.hour )) # as soon as we get our squeeze started and it's a bullish one we close if self.squeeze.Bullish(): close = True self.Log.Add("BUY call {}: bullish".format(call.ToString() )) if self.squeeze.GainingMomentum(): close = True self.Log.Add("BUY call {}: gaining momentum".format(call.ToString() )) if close: self.Credit.Buy(call.AskPrice()) self.Log.Add("BUY with credit {} / ${} / {}%".format(self.Credit.ToString(), call.AskPrice(), call.UnrealizedProfit()) ) insights.append( Insight.Price(call.Call.Symbol, Resolution.Minute, 15, InsightDirection.Flat) ) self.Log.Print() return insights def ScannerSoldCall(self, data): insights = [] lenCalls = len(self.portfolio.SoldCalls(self.ticker, expiration = [0, self.neutralExpiration.Stop])) # We have Calls already then skip for now. if lenCalls > 0: return insights # We are on a green dot cycle so the squeeze released so skip! DANGER! # if not self.squeeze.Squeezing(): return insights if not self.squeeze.Bearish(): return insights if not self.squeeze.LosingMomentum(): return insights call = self.__ATRFindOption(data, optionType = OptionRight.Call) self.Log.Add("SCANNER - Trying to find a sold call") if call: self.Credit.Sell(call.AskPrice()) self.Log.Add("SCANNER - selling {} with credit {} / ${}".format(call.ToString(), self.Credit.ToString(), call.AskPrice()) ) insights.append( Insight.Price(call.Call, Resolution.Minute, 15, InsightDirection.Down) ) self.Log.Print() return insights def ScannerSoldPut(self, data): insights = [] ## Buying conditions lenPuts = len(self.portfolio.SoldPuts(self.ticker, expiration = [0, self.neutralExpiration.Stop])) # We have Puts already then just skip for now. if lenPuts > 0: return insights # We are on a green dot cycle so the squeeze released so skip! DANGER! if not self.squeeze.Squeezing(): return insights # call = self.__FindCall(data) put = self.__ATRFindOption(data, optionType = OptionRight.Put) self.Log.Add("SCANNER - Trying to find a sold put") if put: self.Credit.Sell(put.AskPrice()) self.Log.Add("SCANNER - selling {} with credit {} / ${}".format(put.ToString(), self.Credit.ToString(), put.AskPrice()) ) insights.append( Insight.Price(put.Put, Resolution.Minute, 15, InsightDirection.Down) ) self.Log.Print() return insights def __StockPrice(self): return self.algorithm.Securities[self.ticker].Price def __GetContracts(self, data, expiration = None): if not expiration: return None contracts = self.algorithm.OptionChainProvider.GetOptionContractList(self.ticker, self.algorithm.Time.date()) if len(contracts) == 0 : return None # # only tradable contracts # # !!IMPORTANT!!: to escape the error `Backtest Handled Error: The security with symbol 'SPY 220216P00425000' is marked as non-tradable.` contracts = [x for x in contracts if self.algorithm.Securities[x.ID.Symbol].IsTradable] # pick within expiration range contracts = [i for i in contracts if expiration.Start <= (i.ID.Date.date() - self.algorithm.Time.date()).days <= expiration.Stop] return contracts def __FindATRPrice(self, optionType = OptionRight.Call): trigger_percentage = 0.236 middle_percentage = 0.618 atr = self.ATR.Current.Value previous_close = self.tickerIdentity.Current.Value lower_trigger = previous_close - (trigger_percentage * atr) # biggest value 1ATR upper_trigger = previous_close + (trigger_percentage * atr) # biggest value 1ATR lower_middle = previous_close - (middle_percentage * atr) # middle value 1ATR upper_middle = previous_close + (middle_percentage * atr) # middle value 1ATR lower_atr = previous_close - atr # lowest value 1ATR upper_atr = previous_close + atr # lowest value 1ATR # FOR EXTENSION VALUES!? lower_extension = (lower_atr) - (trigger_percentage * atr) # biggest value 2ATR upper_extension = (upper_atr) + (trigger_percentage * atr) # biggest value 2ATR lower_middle_extension = (lower_atr) - (middle_percentage * atr) # middle value 2ATR upper_middle_extension = (upper_atr) + (middle_percentage * atr) # middle value 2ATR lower_2atr_extension = lower_atr - atr # lowest value 2ATR upper_2atr_extension = upper_atr + atr # lowest value 2ATR if optionType == OptionRight.Call: # CALL price return upper_atr elif optionType == OptionRight.Put: # PUT price return lower_atr def __RollStrike(self): if not self.ATRSlow.IsReady or not self.ATRFast.IsReady or not self.EMASlow.IsReady or not self.EMAFast.IsReady: return 0 ATRSlow = self.ATRSlow.Current.Value ATRFast = self.ATRFast.Current.Value EMASlow = self.EMASlow.Current.Value EMAFast = self.EMAFast.Current.Value stockPrice = self.__StockPrice() ema_ratio = EMAFast / EMASlow self.LastPrice.Add(stockPrice) # oldestPrice - currentPrice over the 5 day window move = self.LastPrice[self.LastPrice.Count-1] - self.LastPrice[0] newStrike = max(EMASlow, EMAFast, stockPrice)*1.25 * (ema_ratio ** 2) + max(ATRSlow, ATRFast) * 1 newStrike = max( newStrike, max(EMAFast, stockPrice) * 1.25 ) newStrike = min( newStrike, max(EMAFast, stockPrice)*1.3 ) if self.LastRoll > newStrike: newStrike = (self.LastRoll + newStrike*2) / 3 self.LastRoll = newStrike return newStrike def __ATRFindOption(self, data, optionType = OptionRight.Call): call = None stockPrice = self.__StockPrice() minCredit = 1 # default min credit of 100$ # in case we are BUYing and we had a loss then roll for a bigger premium if self.Credit.LastValue > 1: minCredit = self.Credit.LastValue * 1.1 contracts = self.__GetContracts(data, expiration = self.neutralExpiration) if not contracts: return None # select only calls contracts = [x for x in contracts if x.ID.OptionRight == OptionRight.Call] # sort by date contracts = sorted(contracts, key = lambda x: x.ID.Date, reverse = False) # pick the contract by price # atrPrice = self.__FindATRPrice(optionType=optionType) atrPrice = self.__RollStrike() contract = min(contracts, key = lambda x: abs(x.ID.StrikePrice - atrPrice)) # add all the option contracts so we can access all the data. self.algorithm.AddOptionContract(contract, Resolution.Minute) if not contract: return None if optionType == OptionRight.Call: return SoldCall(contract) elif optionType == OptionRight.Put: return SoldPut(contract)
#region imports from AlgorithmImports import * #endregion class ConsolidateIndicator: def __init__(self, algorithm, ticker, indicator, resolution): self.consolidator = TradeBarConsolidator(self.__Timeframe(resolution)) self.indicator = indicator self.ticker = ticker self.algorithm = algorithm def Call(self, field = None): # self.algorithm.SubscriptionManager.AddConsolidator(self.ticker, self.consolidator) if field: self.algorithm.RegisterIndicator(self.ticker, self.indicator, self.consolidator, field) else: self.algorithm.RegisterIndicator(self.ticker, self.indicator, self.consolidator) def __Timeframe(self, resolution): r = Resolution.Daily if resolution == 'W': r = Calendar.Weekly elif resolution == 'M': r = Calendar.Monthly elif resolultion == 'Q': r = Calendar.Quarterly elif resolution == 'Y': r = Calendar.Yearly return r
#region imports from AlgorithmImports import * #endregion class DataHandler: # The supported cash indices by QC https://www.quantconnect.com/docs/v2/writing-algorithms/datasets/tickdata/us-cash-indices#05-Supported-Indices # These need to be added using AddIndex instead of AddEquity CashIndices = ['VIX','SPX','NDX'] def __init__(self, algorithm, ticker): self.ticker = ticker self.algorithm = algorithm # Method that returns the Option contracts for the ticker/provided within the expiration set. # @param expiration [ExpirationRange] # @return [Array] def OptionContracts(self, expiration, types = [OptionRight.Call, OptionRight.Put]): contracts = self.algorithm.OptionChainProvider.GetOptionContractList(self.ticker, self.algorithm.Time) if len(contracts) == 0 : return None # # only tradable contracts # # !!IMPORTANT!!: to escape the error `Backtest Handled Error: The security with symbol 'SPY 220216P00425000' is marked as non-tradable.` if not self.__CashTicker(): contracts = [x for x in contracts if self.algorithm.Securities[x.ID.Symbol].IsTradable] # Select only provided types contracts = [x for x in contracts if x.ID.OptionRight in types] # Make sure we have the contracts sorted by Strike and Expiry contracts = sorted(contracts, key = lambda x: (x.ID.Date, x.ID.StrikePrice)) # pick within expiration range contracts = [i for i in contracts if expiration.Start <= (i.ID.Date.date() - self.algorithm.Time.date()).days <= expiration.Stop] return contracts # Method to add the ticker[String] data to the algorithm. # @param resolution [Resolution] # @return [Symbol] def AddSymbol(self, resolution = Resolution.Minute): if self.__CashTicker(): return self.algorithm.AddIndex(self.ticker, resolution = resolution) else: return self.algorithm.AddEquity(self.ticker, resolution = resolution) # Method to add option contracts data to the algorithm. # @param contracts [Array] # @param resolution [Resolution] # @return [Symbol] def AddOptions(self, contracts, resolution = Resolution.Minute): if self.__CashTicker(): for c in contracts: self.algorithm.AddIndexOptionContract(c, resolution) else: for c in contracts: self.algorithm.AddOptionContract(c, resolution) # Internal method to determine if we are using a cashticker to add the data. # @returns [Boolean] def __CashTicker(self): return self.ticker in self.CashIndices
#region imports from AlgorithmImports import * #endregion class ExpirationRange: def __init__(self, start, stop): self.Start = start self.Stop = stop def ToArr(self): return [self.Start, self.Stop]
#region imports from AlgorithmImports import * #endregion # Make this logger class so we can print logging messages in section for easier read. class MainLogger: Messages = [] Algorithm = None OptionClasses = [SecurityType.Option, SecurityType.FutureOption, SecurityType.IndexOption] def __init__(self, algorithm): self.Algorithm = algorithm def Add(self, message): self.Messages.append(message) def Print(self): if len(self.Messages) == 0: return self.Algorithm.Log("------***------") for m in self.Messages: self.Algorithm.Log(m) self.Algorithm.Log("------|||------") self.Messages = [] # Just prints a nice JSON for the orderEvent passed to it. It checks for options as well. # @param orderEvent [OrderEvent] def PrintOrderJson(self, orderEvent): symbol = orderEvent.Symbol json = { "to": "open", "time": self.Algorithm.Time.strftime('%Y-%m-%d %H:%M:%S'), "type": 'stock', "contract": symbol.Value, "expires_at": None, "quantity": orderEvent.AbsoluteFillQuantity, "price": orderEvent.FillPrice, "fees": orderEvent.OrderFee.Value.Amount, } json["action"] = 'buy' if orderEvent.FillQuantity > 0 else 'sell' # these attributes only make sense if we are trading an option if symbol.SecurityType in self.OptionClasses: json["expires_at"] = symbol.ID.Date.strftime("%Y-%m-%d") json["type"] = "option" json["strike"] = symbol.ID.StrikePrice # When we send an insight with flat that means we want to get out of the trade. portfolioHolding = self.Algorithm.Portfolio[symbol.Value] if portfolioHolding.Quantity == 0: json["to"] = "close" self.Algorithm.Log(json) def PrintTradeJson(self, trade, orderEvent): json = { "to": None, "time": self.Algorithm.Time.strftime('%Y-%m-%d %H:%M:%S'), "type": 'trade', "contract": trade.StrategyKey(), "expires_at": trade.Expiration().strftime('%Y-%m-%d'), "quantity": orderEvent.AbsoluteFillQuantity, "fees": trade.TotalFees(), } # check the trade state we are in. quantities = trade.QuantityList() # if all the trade options are 0 in quantity then we are closing it. # if all are not 0 then we are opening the trade. if all(c == 0 for c in quantities): json["to"] = "close" elif all(c != 0 for c in quantities): json["to"] = "open" # this check makes sure we only print when we open and close and don't get a message # when one of the sides of the trades are executed but the full trade. if not json["to"]: return None if json["to"] == 'close': json["price"] = trade.SaleAmount() elif json["to"] == 'open': json["price"] = trade.HoldingCost() # if we have a strategy then we will set the action as buy and sell based on to data. # if we don't have a strategy then we base the action on the quantity if len(quantities) > 1: json["action"] = 'buy' if json["to"] == "open" else 'sell' else: json["action"] = 'buy' if quantities[0] > 0 else 'sell' # the strike is the minimum strike in a strategy or the sole strike tradeStrike = trade.Strike() if isinstance(tradeStrike, list): json["strike"] = min(tradeStrike) else: json["strike"] = tradeStrike self.Algorithm.Log(json) def PrintTrade(self, order, trade, quantity): plotTradeName = '' tradeStrike = trade.Strike() if isinstance(tradeStrike, list): if order.Symbol.ID.OptionRight == OptionRight.Put: tradeName = "{0} Put".format(trade.name) elif order.Symbol.ID.OptionRight == OptionRight.Call: tradeName = "{0} Call".format(trade.name) tradeStrike = order.Symbol.ID.StrikePrice else: tradeName = trade.name if quantity < 0: plotTradeName = 'Sell {}'.format(tradeName) elif quantity == 0: plotTradeName = 'Flat {}'.format(tradeName) else: plotTradeName = 'Buy {}'.format(tradeName) self.Algorithm.Log("{} @ {}".format(plotTradeName, tradeStrike)) def PrintPortfolio(self): allContracts = [] # select all the options and equity in our portfolio for contract in self.Algorithm.Portfolio.Values: security = contract.Security if contract.Quantity != 0: allContracts.append(contract.ToString()) settledCash = self.Algorithm.Portfolio.CashBook["USD"].Amount unsettledCash = self.Algorithm.Portfolio.UnsettledCashBook["USD"].Amount self.Algorithm.Log("--Holdings--") self.Algorithm.Log(allContracts) self.Algorithm.Log("Settled Cash: ${}; Unsettled Cash: ${}".format(settledCash, unsettledCash)) self.Algorithm.Log("--|||--")
#region imports from AlgorithmImports import * #endregion # Class to hold the credit of our trade so we can separate LEAP profit/credit from short profit but also combine them when needed. class TradeCredit: Value = 0 LastValue = 0 # When we buy something we remove from credit as we are paying from balance to get the options. def Buy(self, value = 0): self.LastValue = value self.Value -= value return self.Value # When we are selling we add to credit as we are receiving premium for selling them. def Sell(self, value = 0): self.Value += value return self.Value def ToString(self): return "Total: ${} (last ${})".format(round(self.Value, 2) * 100, round(self.LastValue, 2))
#region imports from AlgorithmImports import * from .ConsolidateIndicator import ConsolidateIndicator from .MainLogger import MainLogger from .TradeCredit import TradeCredit from .ExpirationRange import ExpirationRange from .DataHandler import DataHandler #endregion # Your New Python File
#region imports from AlgorithmImports import * from Risk.MaximumDrawdownPercentPerSecurity import MaximumDrawdownPercentPerSecurity import numpy as np from PortfolioHandler import Handler #endregion class TrailingStopRisk(RiskManagementModel): '''Provides an implementation of IRiskManagementModel that limits the drawdown per holding to the specified percentage''' def __init__(self, maximumDrawdownPercent = 5, profitTarget = None, ticker = None, strategies = [], algo = None): '''Initializes a new instance of the MaximumDrawdownPercentPerSecurity class Args: maximumDrawdownPercent: The maximum percentage drawdown allowed for any single security holding''' self.maximumDrawdownPercent = -abs(maximumDrawdownPercent) self.profitTarget = profitTarget self.strategies = strategies self.algo = algo self.symbol = Symbol.Create(ticker, SecurityType.Equity, Market.USA) self.assetBestPnl = {} def ManageRisk(self, algorithm, targets): '''Manages the algorithm's risk at each time step Args: algorithm: The algorithm instance targets: The current portfolio targets to be assessed for risk''' targets = [] # portfolio = self.algo.portfolio portfolio = Handler(algorithm) # TODO: this works but it's clear it does not get the ObjectStore from our main algo?!! WHY? # TODO: fix this code to work with any strategy including SoldCalls. It might actually work! portfolio.SyncStored(self.symbol, self.strategies) for strategy in self.strategies: for contract in getattr(portfolio, strategy)(self.symbol): key = contract.StrategyKey() if key not in self.assetBestPnl.keys(): self.assetBestPnl[key] = contract.UnrealizedProfit() self.assetBestPnl[key] = np.maximum(self.assetBestPnl[key], contract.UnrealizedProfit()) pnl = contract.UnrealizedProfit() - self.assetBestPnl[key] # To handle profitTarget like 50% from when bought think of checking for if self.profitTarget is not None: if self.assetBestPnl[key] >= self.profitTarget and pnl < self.maximumDrawdownPercent: for c in contract.optionLegs: targets.append(PortfolioTarget(c.Symbol, InsightDirection.Flat)) else: if pnl < self.maximumDrawdownPercent: for c in contract.optionLegs: targets.append(PortfolioTarget(c.Symbol, InsightDirection.Flat)) return targets
# region imports from AlgorithmImports import * # from Alphas.ConstantAlphaModel import ConstantAlphaModel # from Selection.OptionUniverseSelectionModel import OptionUniverseSelectionModel # from Execution.ImmediateExecutionModel import ImmediateExecutionModel from TrailingStopRisk import TrailingStopRisk from Benchmark import Benchmark from PortfolioHandler import Handler from Tools import MainLogger # from UniverseSelection import OptionUniverseSelectionModel2 from OptionsSpreadExecution import OptionsSpreadExecution from Risk.NullRiskManagementModel import NullRiskManagementModel # Sold calls algo from SafeSoldCallAlphaModel import SafeSoldCallAlphaModel from SafeCallOptionSelectionModel import SafeCallOptionSelectionModel # MilkTheCow algo from MilkTheCowAlphaModel import MilkTheCowAlphaModel from MilkTheCowOptionSelectionModel import MilkTheCowOptionSelectionModel # Squeeze algo from SqueezeNakedAlphaModel import SqueezeNakedAlphaModel from SqueezeSoldNakedAlphaModel import SqueezeSoldNakedAlphaModel # ATR levels algo from ATRSpreadsAlphaModel import ATRSpreadsAlphaModel # endregion class AddAlphaModelAlgorithm(QCAlgorithm): def Initialize(self): ''' Initialise the data and resolution required, as well as the cash and start-end dates for your algorithm. All algorithms must initialized.''' # NOTE: [Sep 2020] After the first split for TSLA self.SetStartDate(2021, 1, 1) # Set Start Date self.SetEndDate(2021, 3, 1) # Set End Date self.SetCash(100_000) # Set Strategy Cash # Set settings and account setup self.UniverseSettings.Resolution = Resolution.Minute self.UniverseSettings.FillForward = False self.UniverseSettings.DataNormalizationMode = DataNormalizationMode.Raw # self.Portfolio.SetMarginCallModel(DefaultMarginCallModel(self.Portfolio, self.DefaultOrderProperties)) # THIS ALSO DOES NOT SEEM TO FIX THE MARGIN ISSUE # self.Portfolio.MarginCallModel = MarginCallModel.Null # Main variables self.ticker = self.GetParameter("ticker") self.benchmark = Benchmark(self, self.ticker) self.portfolio = Handler(self) # equity = Symbol.Create(self.ticker, SecurityType.Equity, Market.USA) self.SetSecurityInitializer(self.CustomSecurityInitializer) # self.SetSecurityInitializer(MySecurityInitializer(self.BrokerageModel, FuncSecuritySeeder(self.GetLastKnownPrices))) self.SetBrokerageModel(BrokerageName.InteractiveBrokersBrokerage, AccountType.Margin) # Set InteractiveBrokers Brokerage model # Milk the cow models # self.SetUniverseSelection(MilkTheCowOptionSelectionModel(self.SelectOptionChainSymbols, expirationRange=[7, 30])) # Options for short Straddle # self.SetUniverseSelection(MilkTheCowOptionSelectionModel(self.SelectOptionChainSymbols, expirationRange=[365, 395])) # Options for long Straddle # self.SetAlpha(MilkTheCowAlphaModel(self, self.ticker)) self.SetExecution(OptionsSpreadExecution(acceptingSpreadPercent=0.050)) # Squeeze model # self.SetAlpha(SqueezeNakedAlphaModel(self, self.ticker)) self.SetAlpha(ATRSpreadsAlphaModel(self, self.ticker)) # self.SetRiskManagement(TrailingStopRisk(algo = self, maximumDrawdownPercent = 20, profitTarget = 90, ticker = self.ticker, strategies = ["SoldCalls", "SoldPuts"])) self.SetPortfolioConstruction(SingleSharePortfolioConstructionModel()) # self.SetExecution(ImmediateExecutionModel()) self.SetRiskManagement(NullRiskManagementModel()) # Safe call models # self.SetUniverseSelection(SafeCallOptionSelectionModel(self.SelectOptionChainSymbols, targetExpiration=14)) # self.SetAlpha(SafeSoldCallAlphaModel(self, self.ticker)) # self.SetExecution(OptionsSpreadExecution(acceptingSpreadPercent=0.050)) # self.SetRiskManagement(TrailingStopRisk(algo = self, maximumDrawdownPercent = 100, profitTarget = 95, ticker = self.ticker, strategies = ["SoldCalls", "SoldPuts"])) # self.SetRiskManagement(TrailingStopRisk(algo = self, maximumDrawdownPercent = 20, profitTarget = 90, ticker = self.ticker, strategies = ["SoldCalls", "SoldPuts"])) # OTHER models from learning # self.SetUniverseSelection(OptionUniverseSelectionModel2(timedelta(1), self.SelectOptionChainSymbols)) # self.SetAlpha(ConstantOptionContractAlphaModel(InsightType.Price, InsightDirection.Up, timedelta(hours = 0.5))) # self.SetExecution(SpreadExecutionModel()) # self.SetExecution(MarketOrderExecutionModel()) # set the buying power model # security = self.AddEquity(self.ticker) # security.SetBuyingPowerModel(CustomBuyingPowerModel()) def SelectOptionChainSymbols(self, utcTime): return [ self.option ] def OnOrderEvent(self, orderEvent): # MILK THE COW ALGO! FROM: https://optionstradingiq.com/calendar-straddle/ # As long as we’re on the topic of rolling issues; what if you get assigned? # First, keep calm since it’s no big deal. # Simply close out the shares assigned, close out the other option in the short straddle and just reopen your short straddle using the guidelines. order = self.Transactions.GetOrderById(orderEvent.OrderId) # strategies = ["Calls", "Puts", "SoldCalls", "SoldPuts"] # Eg: ["Straddles", "SoldPuts"] strategies = ["BullPutSpreads", "BearCallSpreads"] if orderEvent.Status == OrderStatus.Filled: logger = MainLogger(self) logger.PrintPortfolio() # Print all actions/orders as json logger.PrintOrderJson(orderEvent = orderEvent) # if the order is an assignment then fix the problems caused. if orderEvent.IsAssignment and order.Type == OrderType.OptionExercise: # - after each event that assigns an option find out if the option was in a strategy and close it all (sell stock sell the other option/s) trade = self.portfolio.FixAssignment(order, strategies = strategies) else: # get the trade if stored first. trade = self.portfolio.GetStoredTrade(order, strategies = strategies) # if we can't find the trade stored then sync the object store with the trade. if not trade: self.portfolio.SyncStored(self.ticker, strategies = strategies) trade = self.portfolio.GetCurrentTrade(order, strategies = strategies) if trade: # Print the trades as stored in our DB as determined/wanted actions # logger.PrintTradeJson(trade = trade, orderEvent = orderEvent) self.benchmark.PrintTrade(order = order, trade = trade, quantity = order.Quantity) # REGION Margin call handling from https://github.com/QuantConnect/Lean/blob/master/Algorithm.Python/MarginCallEventsAlgorithm.py # def OnMarginCall(self, requests): # # Margin call event handler. This method is called right before the margin call orders are placed in the market. # # <param name="requests">The orders to be executed to bring this algorithm within margin limits</param> # # this code gets called BEFORE the orders are placed, so we can try to liquidate some of our positions # # before we get the margin call orders executed. We could also modify these orders by changing their quantities # for order in requests: # # liquidate an extra 10% each time we get a margin call to give us more padding # newQuantity = int(np.sign(order.Quantity) * order.Quantity * 1.1) # requests.remove(order) # requests.append(SubmitOrderRequest(order.OrderType, order.SecurityType, order.Symbol, newQuantity, order.StopPrice, order.LimitPrice, self.Time, "OnMarginCall")) # return requests # def OnMarginCallWarning(self): # # Margin call warning event handler. # # This method is called when Portfolio.MarginRemaining is under 5% of your Portfolio.TotalPortfolioValue # # a chance to prevent a margin call from occurring # spyHoldings = self.Securities["SPY"].Holdings.Quantity # shares = int(-spyHoldings * 0.005) # self.Error("{0} - OnMarginCallWarning(): Liquidating {1} shares of SPY to avoid margin call.".format(self.Time, shares)) # self.MarketOrder("SPY", shares) # def OnMarginCall(self, requests: List[SubmitOrderRequest]) -> None: # for order in requests: # # liquidate an extra 10% each time we get a margin call to give us more padding # newQuantity = int(np.sign(order.Quantity) * order.Quantity * 1.1) # requests.remove(order) # requests.append(SubmitOrderRequest(order.OrderType, order.SecurityType, # order.Symbol, newQuantity, order.StopPrice, # order.LimitPrice, 0, self.Time, "OnMarginCall")) # END REGION Margin call # Call this like this so it works always: ## In Initialize # self.SetSecurityInitializer(self.CustomSecurityInitializer) # AddEquity("SPY") # etc. # # https://www.quantconnect.com/forum/discussion/13199/greeks-with-optionchainprovider/p1/comment-38906 # https://www.quantconnect.com/forum/discussion/10236/options-delta-always-zero/p1/comment-29181 # https://www.quantconnect.com/docs/v2/writing-algorithms/reality-modeling/options-models/volatility/key-concepts def CustomSecurityInitializer(self, security: Security) -> None: '''Initialize the security with raw prices''' security.SetDataNormalizationMode(DataNormalizationMode.Raw) security.SetMarketPrice(self.GetLastKnownPrice(security)) # security.SetBuyingPowerModel(OptionStrategyPositionGroupBuyingPowerModel) security.SetBuyingPowerModel(CustomBuyingPowerModel()) if security.Type == SecurityType.Equity: security.VolatilityModel = StandardDeviationOfReturnsVolatilityModel(30) history = self.History(security.Symbol, 31, Resolution.Daily) if history.empty or 'close' not in history.columns: return for time, row in history.loc[security.Symbol].iterrows(): trade_bar = TradeBar(time, security.Symbol, row.open, row.high, row.low, row.close, row.volume) security.VolatilityModel.Update(security, trade_bar) elif security.Type == SecurityType.Option or security.Type == SecurityType.IndexOption: security.PriceModel = OptionPriceModels.CrankNicolsonFD() # BlackScholes() class CustomBuyingPowerModel(BuyingPowerModel): def GetMaximumOrderQuantityForTargetBuyingPower(self, parameters): quantity = super().GetMaximumOrderQuantityForTargetBuyingPower(parameters).Quantity quantity = np.floor(quantity / 100) * 100 return GetMaximumOrderQuantityResult(quantity) def HasSufficientBuyingPowerForOrder(self, parameters): return HasSufficientBuyingPowerForOrderResult(True) # class MySecurityInitializer(BrokerageModelSecurityInitializer): # def __init__(self, brokerage_model: IBrokerageModel, security_seeder: ISecuritySeeder) -> None: # super().__init__(brokerage_model, security_seeder) # def Initialize(self, security: Security) -> None: # # First, call the superclass definition # # This method sets the reality models of each security using the default reality models of the brokerage model # super().Initialize(security) # security.SetDataNormalizationMode(DataNormalizationMode.Raw) # security.SetMarketPrice(self.GetLastKnownPrice(security)) # # Next, overwrite some of the reality models # security.SetBuyingPowerModel(OptionStrategyPositionGroupBuyingPowerModel()) # if security.Type == SecurityType.Equity: # security.VolatilityModel = StandardDeviationOfReturnsVolatilityModel(30) # history = self.History(security.Symbol, 31, Resolution.Daily) # if history.empty or 'close' not in history.columns: # return # for time, row in history.loc[security.Symbol].iterrows(): # trade_bar = TradeBar(time, security.Symbol, row.open, row.high, row.low, row.close, row.volume) # security.VolatilityModel.Update(security, trade_bar) # elif security.Type == SecurityType.Option or security.Type == SecurityType.IndexOption: # security.PriceModel = OptionPriceModels.CrankNicolsonFD() # BlackScholes() class SingleSharePortfolioConstructionModel(PortfolioConstructionModel): '''Portfolio construction model that sets target quantities to 1 for up insights and -1 for down insights''' def CreateTargets(self, algorithm, insights): targets = [] for insight in insights: targets.append(PortfolioTarget(insight.Symbol, insight.Direction * self.TargetQuantity(algorithm, insight.Symbol))) return targets # Method that defines how many option contracts to sell or buy by symbol. # Here we can expand this to be variable by Symbol or defined by a parameter or by portfolio alocation based on margin available. def TargetQuantity(self, algo, symbol): # algo.CalculateOrderQuantity(symbol, 1) # TODO calculate the quantity based on target allocation. Like if we set the allocation to 50% we should get the cashValue / 2 and try and # get the margin requirement based on that. Not sure how?!? return 5