Overall Statistics |
Total Trades 0 Average Win 0% Average Loss 0% Compounding Annual Return 0% Drawdown 0% Expectancy 0 Net Profit 0% Sharpe Ratio 0 Probabilistic Sharpe Ratio 0% Loss Rate 0% Win Rate 0% Profit-Loss Ratio 0 Alpha 0 Beta 0 Annual Standard Deviation 0 Annual Variance 0 Information Ratio 0 Tracking Error 0 Treynor Ratio 0 Total Fees $0.00 Estimated Strategy Capacity $0 Lowest Capacity Asset |
from SymbolData import SymbolData from TradeManagement import TradeManagement import QuantConnect as qc class CryingBlueFlamingo(QCAlgorithm): def Initialize(self): self.SetStartDate(2020, 7, 1) # Set Start Date self.SetEndDate(2020, 7, 3) # Set End Date self.SetCash(1000000) # Set Strategy Cash self.symbol_data = {} self.trade_managers = {} self.benchmark = "SPY" self.AddEquity(self.benchmark) tickers = ["SPY", "TLT", "AAPL" , "AMZN", "MSFT"] for ticker in tickers: symbol = self.AddEquity(ticker).Symbol option = self.AddOption(ticker) option.SetFilter(-15, +15, timedelta(0), timedelta(10)) option.PriceModel = qc.Securities.Option.OptionPriceModels.BjerksundStensland() option.PriceModel.EnableGreekApproximation = True self.symbol_data[symbol] = SymbolData(self, symbol, option.Symbol) self.trade_managers[symbol] = TradeManagement(self, symbol) # Want to open a short position before the market closes self.Schedule.On(self.DateRules.EveryDay(self.benchmark), self.TimeRules.BeforeMarketClose(self.benchmark, 1), self.Rebalance) def Rebalance(self): '''Fires everyday 1 minute before market close''' for symbol, symbol_data in self.symbol_data.items(): if not symbol_data.IsReady: continue signal = self.CalculateSignal(symbol_data) #hammer_signal = self.CalculateSignal(symbol_data) trade_manager = self.trade_managers[symbol] # Go short if there is a Hanging Man Signal if signal and not self.Portfolio[symbol].Invested: # trade_manager = self.trade_managers[symbol] #trade_manager.CreateEntry(-10) trade_manager.CreateEntry(-.1) if self.Portfolio[symbol].Invested: trade_manager.days_active += 1 ''' # Go long if there is a hammer signal if hammer_signal: trade_manager = self.trade_managers[symbol] trade_manager.CreateEntry(10) ''' def OnData(self, data): for symbol, trade_manager in self.trade_managers.items(): if not self.Portfolio[symbol].Invested: continue current_price = self.Securities[symbol].Price stop_loss = trade_manager.stop_loss take_profit = trade_manager.take_profit days_active = trade_manager.days_active if current_price > stop_loss or current_price < take_profit: trade_manager.Liquidate() if days_active > 2: trade_manager.Liquidate() self.Debug(f"{symbol} -- held for {days_active}...Liquidating") #Finds a Red Hanging Man candle whose High is higher than the prior 5 days highs, and above the 20SMA. def CalculateSignal(self, symbol_data): # Daily bars bars = symbol_data.bar_window # Minute bars #self.Debug(f"Rolling window for {symbol_data.symbol} on {self.Time} is size {symbol_data.minute_bar_window.Count}") symbol_data.CalculateOHLC() max_price = max([x.High for x in symbol_data.minute_bar_window]) #self.Debug(f"Max price over minute consolidators for {symbol_data.symbol} on {self.Time} is {max_price}") low_price = min([x.Low for x in symbol_data.minute_bar_window]) #self.Debug(f"Low price over minute consolidators for {symbol_data.symbol} on {self.Time} is {low_price}") latest_daily_bar = symbol_data.summary_bar latest_consolidator = symbol_data.todays_minute_bars[0] first_consolidator = symbol_data.todays_minute_bars[-1] number_of_bars_today = len(symbol_data.todays_minute_bars) self.Debug(f"{symbol_data.symbol} - {latest_daily_bar.Time} -> {latest_daily_bar.EndTime} Todays' daily bar: {latest_daily_bar}") # checking if the high of the latest daily bar is greater than the high of all following bars uptrend = all([latest_daily_bar.High > bar.High for bar in list(bars)[:6]]) downtrend = all([latest_daily_bar.Low < bar.Low for bar in list(bars)[:6]]) red_bar = latest_daily_bar.Close < latest_daily_bar.Open #green_bar = latest_daily_bar.Close > latest_daily_bar.Open if red_bar: body = abs(latest_daily_bar.Open - latest_daily_bar.Close) shadow = abs(latest_daily_bar.Close - latest_daily_bar.Low) wick = abs(latest_daily_bar.High - latest_daily_bar.Open) #dayATR = abs(latest_daily_bar.High - latest_daily_bar.Low) hanging_man = (shadow > 2 * body) and (wick < 0.3 * body) ''' if green_bar: body = abs(latest_daily_bar.Close - latest_daily_bar.Open) shadow = abs(latest_daily_bar.Open - latest_daily_bar.Low) wick = abs(latest_daily_bar.High - latest_daily_bar.Close) dayATR = abs(latest_daily_bar.High - latest_daily_bar.Low) green_hammer = (shadow > 2 * body) and (wick < 0.3 * body) ''' sma = (sum([b.Close for b in list(bars)[:-1]]) + latest_daily_bar.Close) / 10 # latest_market_price price = self.Securities[symbol_data.symbol].Price above_sma = latest_daily_bar.Close > sma below_sma = latest_daily_bar.Close < sma #Hanging Man Signal signal = red_bar and uptrend and hanging_man and above_sma #Hammer Signal #hammer_signal = green_bar and downtrend and green_hammer and below_sma if signal: self.Debug(f" Signal Candle for {symbol_data.symbol} on {self.Time} is - Body: {body} , Wick: {wick} , shadow: {shadow}") self.Debug(f"Minute Bar Consolidator OHLC for Signal Day {symbol_data.symbol} on {self.Time} is {latest_daily_bar}") return signal ''' if hammer_signal: return hammer_signal ''' def CoarseSelection(self, coarse): # list of ~8500 stocks (coarse data) # coarse is a list of CoarseFundamental objects # Descending order sorted_by_liquidity = sorted(coarse, key=lambda c:c.DollarVolume, reverse=True) most_liquid_coarse = sorted_by_liquidity[:self.universe_size] # needs to return a list of Symbol object most_liquid_symbols = [c.Symbol for c in most_liquid_coarse] return most_liquid_symbols
from SymbolData import SymbolData class TradeManagement: def __init__(self, algorithm, symbol): self.algorithm = algorithm self.symbol = symbol self.days_active = 0 self.entry_price = None self.stop_loss = None self.take_profit = None def CreateEntry(self, quantity): # initial entry market order #self.algorithm.MarketOrder(self.symbol, quantity) current_price = self.algorithm.Securities[self.symbol].Price contract = self.select_option(OptionRight.Call, current_price * 1.10, 7) symbol_data = self.algorithm.symbol_data[self.symbol] # Update our 1 period ATR with latest bar, so we have today's range symbol_data.atr.Update(symbol_data.summary_bar) atr = symbol_data.atr.Current.Value self.entry_price = current_price self.stop_loss = self.entry_price + 0.5 * atr # Use High from the current day self.take_profit = self.entry_price - 1 * atr if contract is not None: self.algorithm.MarketOrder(contract.Symbol, -1) self.algorithm.Debug(f"Entering {self.symbol} on {{self.Time}}...Entry Price: {current_price}, Take Profit: {self.take_profit}, StopLoss: {self.stop_loss}") else: self.Debug(f"No contracts found for {self.symbol}") def Liquidate(self): self.algorithm.Debug(f"Liquidating.. {self.symbol}....{self.algorithm.Securities[self.symbol].Price}") self.algorithm.Liquidate(self.symbol) self.entry_price = None self.stop_loss = None self.take_profit = None self.days_active = 0 def select_option(self, right, expiry, strike): ''' right: OptionRight.Call or Put expiry: in dte (ex. 10) strike = 110, etc.. ''' option_chain_symbol = self.algorithm.symbol_data[self.symbol].option_chain_symbol option_chains = self.algorithm.CurrentSlice.OptionChains if option_chains.ContainsKey(option_chain_symbol): option_chain = option_chains[option_chain_symbol] else: self.Debug(f"{self.symbol} - NO OPTION CHAIN DATA...") contracts = [contract for contract in option_chain.Contracts.Values] # filtering for calls or puts filter_for_right = [s for s in contracts if s.Right == right] # sorted by expiry expiry_date = self.algorithm.Time + timedelta(days=expiry) sorted_by_expiry = sorted(filter_for_right, key=lambda c:abs(c.Expiry - expiry_date), reverse=False) if len(sorted_by_expiry) > 0: closest_expiry = sorted_by_expiry[0].Expiry contracts_with_desired_expiry = [c for c in filter_for_right if c.Expiry == closest_expiry] else: return None sorted_by_strike = sorted(contracts_with_desired_expiry, key=lambda c:abs(c.Strike - strike), reverse=False) if len(sorted_by_strike) > 0: selected_contract = sorted_by_strike[0] return selected_contract else: return None
class SymbolData: '''Containers to hold relevant data for each symbol''' def __init__(self, algorithm, symbol, option_chain): self.algorithm = algorithm self.symbol = symbol self.option_chain_symbol = option_chain # self.minute_consolidator = self.algorithm.SubscriptionManager.ResolveConsolidator(Resolution.Minute) self.minute_consolidator = TradeBarConsolidator(timedelta(minutes=1)) self.algorithm.SubscriptionManager.AddConsolidator(self.symbol, self.minute_consolidator) self.minute_consolidator.DataConsolidated += self.OnMinuteBar # defines daily consolidator and then registers to receive data self.daily_consolidator = TradeBarConsolidator(timedelta(days=1)) self.algorithm.SubscriptionManager.AddConsolidator(symbol, self.daily_consolidator) self.daily_consolidator.DataConsolidated += self.OnDailyBar # 1. instantiantes a SimpleMovingAverage object # 2. subscribes it to receive data self.sma = SimpleMovingAverage(10) # Test 10 vs 20 self.algorithm.RegisterIndicator(symbol, self.sma, self.daily_consolidator) self.atr = AverageTrueRange(1) self.algorithm.RegisterIndicator(symbol, self.atr, self.daily_consolidator) # holds recent bars self.bar_window = RollingWindow[TradeBar](10) self.minute_bar_window = RollingWindow[TradeBar](500) self.WarmUpIndicators() def WarmUpIndicators(self): # returns a dataframe history = self.algorithm.History(self.symbol, 20, Resolution.Daily) for bar in history.itertuples(): time = bar.Index[1] open = bar.open high = bar.high low = bar.low close = bar.close volume = bar.volume trade_bar = TradeBar(time, self.symbol, open, high, low, close, volume) self.sma.Update(time, close) self.atr.Update(trade_bar) self.bar_window.Add(trade_bar) def OnDailyBar(self, sender, bar): '''Fires each time our daily_consolidator produces a bar that bar is passed in through the bar parameter''' # save that bar to our rolling window self.bar_window.Add(bar) def OnMinuteBar(self, sender, bar): '''Fires each time our minute_consolidator produces a bar that bar is passed in through the bar parameter''' # save that bar to our rolling window self.minute_bar_window.Add(bar) # sorted(self.minute_bar_window, key = lambda thing: thing.Time) def KillDailyConsolidator(self): self.algorithm.SubscriptionManager.RemoveConsolidator(self.symbol, self.daily_consolidator) def KillMinuteConsolidator(self): self.algorithm.SubscriptionManager.RemoveConsolidator(self.symbol, self.minute_consolidator) def IsReady(self): return self.sma.IsReady and self.atr.IsReady and self.bar_window.IsReady and self.minute_bar_window.IsReady def CalculateOHLC(self): # Rolling window open bars = list(self.minute_bar_window) todays_bars = [bar for bar in bars if bar.Time.day == self.algorithm.Time.day] # desecending in time, larger indices -> further in past todays_bars_sorted = sorted(todays_bars, key=lambda b:b.Time, reverse=True) opening_bar = todays_bars_sorted[-1] open = opening_bar.Open # Rolling window close closing_bar = todays_bars_sorted[0] close = closing_bar.Close # High and low over period high = max([x.High for x in todays_bars_sorted]) low = min([x.Low for x in todays_bars_sorted]) # Calculate volume volume = sum([x.Volume for x in todays_bars_sorted]) # Time time = opening_bar.Time period = TimeSpan.FromMinutes((self.algorithm.Time - time).seconds // 60) # Create a summary trade bar self.summary_bar = TradeBar(time, self.symbol, open, high, low, close, volume, period) @property def todays_minute_bars(self): bars = list(self.minute_bar_window) # self.Debug(f"Filtering bars for {self.symbol} ON....{self.algorithm.Time.day}") todays_bars = [bar for bar in bars if bar.Time.day == self.algorithm.Time.day] # desecending in time, larger indices -> further in past todays_bars_sorted = sorted(todays_bars, key=lambda b:b.Time, reverse=True) return todays_bars_sorted