from AlgorithmImports import *
import datetime as dt
class RsiMacdAlphaModel(AlphaModel):
'''Alpha model that uses an STO_RSI and MACD values to create insights'''
last_consolidation_time = None
minute = 0
def __init__(self,
algo,
fastPeriod = 14,
slowPeriod = 34,
signalPeriod = 9,
rsiPeriod = 14,
movingAverageType = MovingAverageType.Exponential,
resolution = Resolution.Minute):
''' Initializes a new instance of the MacdAlphaModel class
Args:
fastPeriod: The MACD fast period
slowPeriod: The MACD slow period</param>
signalPeriod: The smoothing period for the MACD signal
rsiPeriod: period for STO_RSI calculation
movingAverageType: The type of moving average to use in the MACD'''
self.algo = algo
self.fastPeriod = fastPeriod
self.slowPeriod = slowPeriod
self.signalPeriod = signalPeriod
self.rsiPeriod = rsiPeriod
self.movingAverageType = movingAverageType
self.resolution = resolution
self.insightPeriod = timedelta(days=10)
self.bounceThresholdPercent = 0
self.symbolData = {}
self.Fminute = None
resolutionString = Extensions.GetEnumString(resolution, Resolution)
movingAverageTypeString = Extensions.GetEnumString(movingAverageType, MovingAverageType)
self.Name = '{}({},{},{},{},{})'.format(self.__class__.__name__, fastPeriod, slowPeriod, signalPeriod, movingAverageTypeString, resolutionString)
def Update(self, algorithm, data):
''' Determines an insight for each security based on it's current MACD signal
Args:
algorithm: The algorithm instance
data: The new data available
Returns:
The new insights generated'''
insights = []
if self.algo.enable_reverse_shift:
self.if_reverse_shift(algorithm)
if self.last_consolidation_time is not None and self.minute != self.last_consolidation_time.minute:
self.minute = self.last_consolidation_time.minute
for key, sd in self.symbolData.items():
self.key = key
if (sd.Security.Price == 0) or (not sd.MACD.IsReady) or (algorithm.IsWarmingUp):# or \
# (not algorithm.IsMarketOpen(key)):
continue
sd.UpdateData()
# Check previous position and determine the trade
if self.algo.Portfolio[self.algo.long_symbol].Invested:
if sd.should_exit_tqqq:
insights = self.go_flat(insights)
elif self.algo.Portfolio[self.algo.short_symbol].Invested:
if sd.should_exit_sqqq:
insights = self.go_flat(insights)
else:
if sd.should_enter_tqqq:
insights = self.buy_tqqq(insights)
self.algo.rm_model.maximumDrawdownPercent = 0.05
elif sd.should_enter_sqqq:
insights = self.buy_sqqq(insights)
self.algo.rm_model.maximumDrawdownPercent = 0.05
elif self.algo.rm_liquidation: #For Friday Exit: ((algorithm.Time.weekday() == 4) and (algorithm.Time.time()>dt.time(15,45)))
insights.append(Insight.Price(self.algo.short_symbol, self.insightPeriod, InsightDirection.Flat))
insights.append(Insight.Price(self.algo.long_symbol, self.insightPeriod, InsightDirection.Flat))
self.algo.rm_liquidation = False
return insights
def go_flat(self,insights):
insights.append(Insight.Price(self.algo.short_symbol, self.insightPeriod, InsightDirection.Flat))
insights.append(Insight.Price(self.algo.long_symbol, self.insightPeriod, InsightDirection.Flat))
return insights
def buy_tqqq(self,insights):
insights.append(Insight.Price(self.algo.long_symbol, self.insightPeriod, InsightDirection.Up))
insights.append(Insight.Price(self.algo.short_symbol, self.insightPeriod, InsightDirection.Flat))
return insights
def buy_sqqq(self,insights):
insights.append(Insight.Price(self.algo.short_symbol, self.insightPeriod, InsightDirection.Up))
insights.append(Insight.Price(self.algo.long_symbol, self.insightPeriod, InsightDirection.Flat))
return insights
def if_reverse_shift(self,algorithm):
if algorithm.Portfolio.Invested:
if self.last_fconsolidation_time is not None and self.Fminute != self.last_fconsolidation_time.minute:
self.Fminute = self.last_fconsolidation_time.minute
for key, sd in self.symbolData.items():
normalized_signal = sd.MACD_15.Signal.Current.Value / sd.Security.Price
is_macd_bullish = normalized_signal > self.bounceThresholdPercent
is_rsi_bullish = sd.STO_RSI_15.Current.Value >= 80
is_rsi_bearish = sd.STO_RSI_15.Current.Value <= 20
if algorithm.Portfolio[self.algo.long_symbol].Invested:
if not is_macd_bullish and is_rsi_bearish:
self.algo.rm_model.maximumDrawdownPercent = 0.12
elif algorithm.Portfolio[self.algo.short_symbol].Invested:
if is_macd_bullish and is_rsi_bullish:
self.algo.rm_model.maximumDrawdownPercent = 0.12
def consolidation_handler(self, sender, bar):
self.last_consolidation_time = bar.Time
def fifteenconsolidation_handler(self, sender, bar):
self.last_fconsolidation_time = bar.Time
def OnSecuritiesChanged(self, algorithm, changes):
'''Event fired each time the we add/remove securities from the data feed.
This initializes the MACD for each added security and cleans up the indicator for each removed security.
Args:
algorithm: The algorithm instance that experienced the change in securities
changes: The security additions and removals from the algorithm'''
for added in changes.AddedSecurities:
if added.Symbol.Value == 'QQQ':
self.symbolData[added.Symbol] = SymbolData(algorithm, added,
self.fastPeriod, self.slowPeriod, self.signalPeriod, self.rsiPeriod, self.movingAverageType, self.resolution)
self.consolidator = TradeBarConsolidator(timedelta(minutes=30))
self.consolidator.DataConsolidated += self.consolidation_handler
algorithm.SubscriptionManager.AddConsolidator(added.Symbol, self.consolidator)
self.fifteenconsolidator = TradeBarConsolidator(timedelta(minutes=15))
self.fifteenconsolidator.DataConsolidated += self.fifteenconsolidation_handler
algorithm.SubscriptionManager.AddConsolidator(added.Symbol, self.fifteenconsolidator)
for removed in changes.RemovedSecurities:
if added.Symbol.Value == 'QQQ':
data = self.symbolData.pop(removed.Symbol, None)
if data is not None:
# clean up our consolidator
algorithm.SubscriptionManager.RemoveConsolidator(removed.Symbol, data.Consolidator)
class SymbolData:
def __init__(self, algorithm, security, fastPeriod, slowPeriod, signalPeriod, rsiPeriod, movingAverageType, resolution):
self.Security = security
self.MACD = MovingAverageConvergenceDivergence(fastPeriod, slowPeriod, signalPeriod, movingAverageType)
self.MACD_15 = MovingAverageConvergenceDivergence(fastPeriod, slowPeriod, signalPeriod, movingAverageType)
self.RSI_input = RelativeStrengthIndex(rsiPeriod, MovingAverageType.Wilders)
self.RSI = RelativeStrengthIndex(rsiPeriod, MovingAverageType.Wilders)
self.Stochastic = Stochastic(period=14, kPeriod=3, dPeriod=3)
self.STO_RSI = IndicatorExtensions.Of(self.Stochastic,self.RSI_input)
# self.STO_RSI_15 = IndicatorExtensions.Of(self.Stochastic,self.RSI_input)
self.EMA20 = ExponentialMovingAverage(period=20)
# Construct standard deviation of price using STD Indicator for added securities
self.STD = StandardDeviation(10)
# Construct simple moving average (SMA) of the standard deviation of price
self.STD_AVG = IndicatorExtensions.SMA(self.STD, 20)
self.Consolidator = algorithm.ResolveConsolidator(security.Symbol, timedelta(minutes=30))
self.FifteenConsolidator = algorithm.ResolveConsolidator(security.Symbol, timedelta(minutes=15))
# 30 minute indicators
for indicator in [self.MACD,self.STO_RSI,self.EMA20,self.RSI, self.STD]:
algorithm.RegisterIndicator(security.Symbol, indicator, self.Consolidator)
algorithm.WarmUpIndicator(security.Symbol, indicator, timedelta(minutes=30))
# 15 minute indicators
for indicator in [self.MACD_15]:
algorithm.RegisterIndicator(security.Symbol, indicator, self.FifteenConsolidator)
algorithm.WarmUpIndicator(security.Symbol, indicator)
# Rolling Windows
self.MACD_Window = RollingWindow[float](2)
self.STO_RSI_Window = RollingWindow[float](2)
self.RSI_Window = RollingWindow[float](2)
# algorithm.PlotIndicator('STORSI',self.STO_RSI)
# algorithm.PlotIndicator('RSI',self.RSI)
# algorithm.PlotIndicator('MACD',self.MACD)
# algorithm.PlotIndicator('STORSI15',self.STO_RSI_15)
# algorithm.PlotIndicator('MACD_15',self.MACD_15)
algorithm.PlotIndicator('STD',self.STD)
self.PreviousDirection = None
self.should_enter_tqqq = False
self.should_exit_tqqq = False
self.should_enter_sqqq = False
self.should_exit_sqqq = False
def UpdateData(self):
self.MACD_Window.Add(self.MACD.Signal.Current.Value)
self.STO_RSI_Window.Add(self.STO_RSI.Current.Value)
self.RSI_Window.Add(self.RSI.Current.Value)
if (self.MACD_Window.Count < 2) or (self.STO_RSI_Window.Count < 2) or (self.RSI_Window.Count < 2):
return
is_macd_turned_bullish = (self.MACD_Window[1] < 0) and (self.MACD_Window[0] > 0)
is_macd_turned_bearish = (self.MACD_Window[1] > 0) and (self.MACD_Window[0] < 0)
is_storsi_overbought = self.STO_RSI.Current.Value > 80
is_storsi_oversold = self.STO_RSI.Current.Value < 20
is_rsi_overbought = self.RSI.Current.Value > 80
is_rsi_oversold = self.RSI.Current.Value < 20
is_price_above_ema = self.Security.Price > self.EMA20.Current.Value
is_std_above_avg = True #self.STD.Current.Value > self.STD_AVG.Current.Value
# Entry and Exit Conditions
self.should_enter_tqqq = is_macd_turned_bullish and (not is_storsi_oversold) and is_price_above_ema and is_std_above_avg
self.should_exit_tqqq = not is_price_above_ema
self.should_enter_sqqq = is_macd_turned_bearish and (not is_storsi_overbought) and (not is_price_above_ema) and is_std_above_avg
self.should_exit_sqqq = is_price_above_ema
from AlgorithmImports import *
# Import from files
from constants import *
from alpha import *
from riskmanagement import *
################################################################################
class MACDRSIQQQ(QCAlgorithm):
def Initialize(self):
"""Initialize algorithm."""
# Set backtest details
self.AddInstruments()
self.SetBacktestDetails()
self.AddModels()
self.AddOperationalParameters()
# self.AddSchedules()
# self.AddPlots()
#-------------------------------------------------------------------------------
def SetBacktestDetails(self):
"""Set the backtest details."""
self.SetStartDate(START_DATE)
if END_DATE:
self.SetEndDate(END_DATE)
self.SetCash(CASH)
self.SetBrokerageModel(BrokerageName.InteractiveBrokersBrokerage,
AccountType.Margin)
self.SetBenchmark(self.underlying)
self.SetWarmUp(2000, Resolution.Minute)
# Adjust the cash buffer from the default 2.5% to custom setting
self.Settings.FreePortfolioValuePercentage = FREE_PORTFOLIO_VALUE_PCT
self.Settings.DataSubscriptionLimit = 500
self.Settings.RebalancePortfolioOnInsightChanges = True
self.Settings.RebalancePortfolioOnSecurityChanges = False
# self.UniverseSettings.ExtendedMarketHours = INCLUDE_EXTENDED_HOURS
def AddInstruments(self):
self.underlying = self.AddEquity('QQQ',Resolution.Minute, extendedMarketHours=INCLUDE_EXTENDED_HOURS).Symbol
self.long_symbol = self.AddEquity('TQQQ',Resolution.Minute, extendedMarketHours=INCLUDE_EXTENDED_HOURS).Symbol
self.short_symbol = self.AddEquity('SQQQ',Resolution.Minute, extendedMarketHours=INCLUDE_EXTENDED_HOURS).Symbol
self.previous_profits = {
self.long_symbol : 0,
self.short_symbol : 0
}
def AddModels(self):
self.al_model = RsiMacdAlphaModel(self)
self.SetAlpha(self.al_model)
self.SetPortfolioConstruction(EqualWeightingPortfolioConstructionModel(self.Rebalance))
self.rm_model = TrailingStopRiskManagementModel(self,maximumDrawdownPercent=STOP_LOSS_PCT)
self.AddRiskManagement(self.rm_model)
def Rebalance(self, time):
return None
def AddOperationalParameters(self):
self.rm_liquidation = False
self.enable_reverse_shift = False
def OnOrderEvent(self, orderEvent):
if orderEvent.Status == OrderStatus.Filled:
order = self.Transactions.GetOrderById(orderEvent.OrderId)
if orderEvent.Direction == OrderDirection.Buy:
sd = self.al_model.symbolData[self.underlying]
self.MyLog(f'Bought {orderEvent.Symbol} with QQQ indicators -> \
QQQ Price: {sd.Security.Price}, \
MACD: {sd.MACD.Signal.Current.Value}, \
STO_RSI: {sd.STO_RSI.Current.Value}, \
RSI: {sd.RSI.Current.Value}, \
EMA20: {sd.EMA20.Current.Value}, \
{orderEvent.Symbol} Price: {orderEvent.FillPrice}')
elif orderEvent.Direction == OrderDirection.Sell:
trade_pnl = self.Portfolio[orderEvent.Symbol].NetProfit - self.previous_profits[orderEvent.Symbol]
sd = self.al_model.symbolData[self.underlying]
self.MyLog(f'Sold {orderEvent.Symbol} with QQQ indicators -> \
QQQ Price: {sd.Security.Price}, \
EMA20: {sd.EMA20.Current.Value}, \
TradeProfit: {trade_pnl} \
{orderEvent.Symbol} Price: {orderEvent.FillPrice}')
self.previous_profits[orderEvent.Symbol] = self.Portfolio[orderEvent.Symbol].NetProfit
def MyLog(self,msg):
self.Log(f'{self.Time}: {msg}')