Overall Statistics |
Total Trades 7 Average Win 0% Average Loss -74.00% Compounding Annual Return 0% Drawdown 101.100% Expectancy -1 Net Profit -101.140% Sharpe Ratio -0.519 Sortino Ratio -0.207 Probabilistic Sharpe Ratio 0.000% Loss Rate 100% Win Rate 0% Profit-Loss Ratio 0 Alpha 0 Beta 0 Annual Standard Deviation 1.926 Annual Variance 3.71 Information Ratio -0.515 Tracking Error 1.926 Treynor Ratio 0 Total Fees $1104.75 Estimated Strategy Capacity $48000.00 Lowest Capacity Asset AMD 3025CPPRRW1C6|AMD R735QTJ8XC9X Portfolio Turnover -0.20% |
# region imports from AlgorithmImports import * # endregion class CreativeBlackMule(QCAlgorithm): def Initialize(self): self.SetStartDate(2015, 1, 1) self.SetEndDate(2015, 12, 1) self.Balance = 100000 tickers = ["SPY", "QQQ", "IWM", "NVDA", "AAPL", "AMD", "AMZN", "MSFT"] self.symbols = [self.AddEquity(ticker, Resolution.Daily).Symbol for ticker in tickers] self.SetWarmUp(timedelta(100)) #self.rocp = {symbol: self.ROCP(symbol, 30) for symbol in symbols} self.EnableAutomaticIndicatorWarmUp = True self.indicator = {symbol: self.DCH(symbol, 20, 20) for symbol in self.symbols} self.SetSecurityInitializer(self.customSecurityInitializer) self.DCH_previous_Up = {symbol: None for symbol in self.symbols} self.DCH_previous_Down = {symbol: None for symbol in self.symbols} self.DCH_previous_Middle = {symbol: None for symbol in self.symbols} self.SetBrokerageModel(BrokerageName.InteractiveBrokersBrokerage, AccountType.Cash) self.vix = self.AddData(CBOE, "VIX").Symbol self.IVlvl = 0.5 self.option_positions = [] self.Schedule.On(self.DateRules.EveryDay("SPY"), self.TimeRules.AfterMarketOpen("SPY", 60), self.AfterMarketOpen) self.Schedule.On(self.DateRules.EveryDay("SPY"), self.TimeRules.AfterMarketOpen("SPY", 30), self.VIX) def OnData(self, data: Slice): if self.IsWarmingUp: return for symbol in self.symbols: if symbol in data.Bars: bar = data.Bars[symbol] if self.indicator[symbol].IsReady: if self.DCH_previous_Up[symbol] is not None and self.DCH_previous_Down[symbol] is not None: if self.rank > self.IVlvl: if ((bar.Close - self.DCH_previous_Up[symbol]) / (bar.Close) > 0.025) : security = self.Securities[symbol] self.buyoptions(security) if ((bar.Close - self.DCH_previous_Down[symbol]) / (bar.Close) < -0.01): security = self.Securities[symbol] self.buyputs(security) #if bar.Close <= self.DCH_previous_Down[symbol]: #if self.Portfolio[symbol].IsLong: #self.SetHoldings(symbol, 0) #self.SetHoldings(symbol, -0.1) #if not self.Portfolio[symbol].IsShort: #self.SetHoldings(symbol, -0.1) #self.Debug("Shorting") #if not self.can_short and self.Portfolio[symbol].IsLong: #self.SetHoldings(symbol, 0) self.DCH_previous_Up[symbol] = self.indicator[symbol].UpperBand.Current.Value self.DCH_previous_Down[symbol] = self.indicator[symbol].LowerBand.Current.Value self.DCH_previous_Middle[symbol] = self.indicator[symbol].Current.Value # Plot indicator and prices #self.Plot(f"{symbol}", "Donchian Channel High", self.DCH_previous_Up[symbol]) #self.Plot(f"{symbol}", "High Price", bar.High) #self.Plot(f"{symbol}", "Close Price", bar.Close) #self.Plot(f"{symbol}", "Low Price", bar.Low) #self.Plot(f"{symbol}", "Donchian Channel Low", self.DCH_previous_Down[symbol]) #self.DCH_previous_Up[symbol] = self.indicator[symbol].UpperBand.Current.Value #self.DCH_previous_Down[symbol] = self.indicator[symbol].LowerBand.Current.Value #for symbol, symbols in slice.Bars.items(): #if symbol_data not in data.Bars: #return #bar = data.Bars[symbol_data] #if self.DCH_previous_Up is not None and self.DCH_previous_Down is not None: #if bar.Close > self.DCH_previous_Up and not self.Portfolio[self.symbols].IsLong: #self.SetHoldings(self.symbols, 1) #if bar.Close <= self.DCH_previous_Down: #if self.can_short and not self.Portfolio[self.symbols].IsShort: #self.SetHoldings(self.symbols, -1) #if not self.can_short and self.Portfolio[self.symbols].IsLong: #self.SetHoldings(self.symbols, 0) #self.DCH_previous_Up = self.indicator.UpperBand.Current.Value #self.DCH_previous_Down = self.indicator.LowerBand.Current.Value def AfterMarketOpen(self): option_invested = [x.Key for x in self.Portfolio if x.Value.Invested and x.Value.Type==SecurityType.Option] if option_invested: #self.SetRuntimeStatistic("Total Profit", (f"$ {round(self.Portfolio.TotalPortfolioValue - 10000)}")) for i, symbol in enumerate(option_invested): if (self.Time + timedelta(10) > option_invested[i].ID.Date): self.Liquidate(option_invested[i], tag = f"Expiration - Profit/Loss @ {str(self.Portfolio[option_invested[i]].LastTradeProfit)}") self.option_positions.remove(option_invested[i].Value) self.Debug (f"| {self.Time} [+]--- Liquidate Option @ {str(self.Portfolio[option_invested[i]].Symbol.Value)} || Stock @ {str(self.Securities[option_invested[i].Underlying.Value].Price)}|| Profit/Loss @ {str(self.Portfolio[option_invested[i]].LastTradeProfit)}") self.Log (f"| {self.Time} [-]--- REASON: || <{(10)} DTE | {(option_invested[i].ID.Date - self.Time).days} DTE") def customSecurityInitializer(self, security): bar = self.GetLastKnownPrice(security) security.SetMarketPrice(bar) def buyoptions(self, security): security.SetDataNormalizationMode(DataNormalizationMode.Raw) # Filter for out of the money call options and expiring at least 60 days option_chain = self.OptionChainProvider.GetOptionContractList(security.Symbol, self.Time) call_options = [option for option in option_chain if option.ID.OptionRight == 0] otm_call_options = sorted([option for option in call_options if option.ID.StrikePrice > security.Close and (option.ID.Date - self.Time).days >= 30]) if len(otm_call_options) > 0: self.call = otm_call_options[0] #subscribe to the option contract, so that we can buy it if self.call.Value in self.option_positions: return option_buy = self.AddOptionContract(self.call, Resolution.Minute) if option_buy.AskPrice == 0: self.Debug("No prices") return self.SetHoldings(option_buy.Symbol, 0.1) self.option_positions.append(self.call.Value) #self.MarketOrder(option_buy.Symbol, 1) self.Debug(f" {self.Time} -- -- Bought call option {option_buy} for {option_buy.AskPrice} // Underlying stock @ {option_buy.Underlying.Close} // Limit {option_buy.AskPrice * 1.3} ") def buyputs(self, security): security.SetDataNormalizationMode(DataNormalizationMode.Raw) option_chain = self.OptionChainProvider.GetOptionContractList(security.Symbol, self.Time) put_options = [option for option in option_chain if option.ID.OptionRight == 1] # Filter for out of the money call options and expiring at least 60 days otm_put_options = sorted([option for option in put_options if (option.ID.StrikePrice < security.Close) and (option.ID.Date - self.Time).days >= 30 and (option.ID.Date - self.Time).days < 40]) if len(otm_put_options) > 0: self.put = otm_put_options[-1] #subscribe to the option contract, so that we can buy it if self.put.Value in self.option_positions: return option_buyput = self.AddOptionContract(self.put, Resolution.Minute) if option_buyput.AskPrice == 0: self.Debug("No prices") return self.SetHoldings(option_buyput.Symbol, 0.1) self.option_positions.append(self.put.Value) #self.MarketOrder(option_buyput.Symbol, 1) self.Debug(f" {self.Time} -- -- Bought put option {option_buyput} for {option_buyput.AskPrice} // Underlying stock @ {option_buyput.Underlying.Close} // Limit {option_buyput.AskPrice * 1.3} ") def OnOrderEvent(self, orderEvent): order = self.Transactions.GetOrderById(orderEvent.OrderId) if order.Status == OrderStatus.Filled and order.Direction == OrderDirection.Buy and order.SecurityType == SecurityType.Option and order.Symbol.ID.OptionRight == OptionRight.Call: quantity = math.floor((self.Portfolio[order.Symbol].Quantity) / 2) fill_price = self.Portfolio[order.Symbol].AveragePrice limit_price = fill_price * 1.3 self.LimitOrder(order.Symbol, -quantity, limit_price * 1.3) if order.Status == OrderStatus.Filled and order.Direction == OrderDirection.Buy and order.SecurityType == SecurityType.Option and order.Symbol.ID.OptionRight == OptionRight.Put: quantity = math.floor((self.Portfolio[order.Symbol].Quantity) / 2) fill_price = self.Portfolio[order.Symbol].AveragePrice limit_price = fill_price * 1.3 self.LimitOrder(order.Symbol, -quantity, limit_price * 1.3) if order.Status == OrderStatus.Filled and (order.Type == OrderType.Limit): options_quantity = self.Portfolio[order.Symbol].Quantity if options_quantity > 0: fill_price = self.Portfolio[order.Symbol].AveragePrice limit_price2 = fill_price * 1.5 self.LimitOrder(order.Symbol, -options_quantity, limit_price2) def VIX(self): VIXhistory = self.History(CBOE, self.vix, 90, Resolution.Daily) # (Current - Min) / (Max - Min) self.rank = ((self.Securities[self.vix].Price - min(VIXhistory["low"])) / (max(VIXhistory["high"]) - min(VIXhistory["low"])))