from Algorithms.StandardAlgorithm import StandardAlgorithm
from Alphas.RsiAlphaModel import RsiAlphaModel
from Execution.StandardDeviationExecutionModel import StandardDeviationExecutionModel
class MainAlgorithm(StandardAlgorithm):
def Initialize(self):
# super(MainAlgorithm, self).__init__()
self.SetExecution(StandardDeviationExecutionModel(60, 2, Resolution.Minute))
self.SetAlphas(RsiAlphaModel())
# Securities traded
symbols = [ Symbol.Create("CRON", SecurityType.Equity, Market.USA) ]
self.SetUniverseSelection( ManualUniverseSelectionModel(symbols) )
import math
import numpy as np
from Indicators.StandardIndicator import StandardIndicator
def EhlerRSI(symbol, **kwargs):
_symbol = symbol if isinstance(symbol, str) else symbol.Value
RegisterIndicator(_symbol, EhlerRsiDiscriminator(**kwargs), kwargs['resolution']);
return indicator
class EhlerRsiDiscriminator(StandardIndicator):
Warmup = 7
Name = "Ehler's RSI + Discriminator"
def __init__(self, **kwargs):
super(EhlerRsiDiscriminator, self).__init__(**kwargs)
self.DoubleSmoothing = kwargs['double_smoothing'] or True
self.RoofingUpper = kwargs['roofing_upper'] or 48
self.RoofingLower = kwargs['roofing_lower'] or 10
self.Overbought = kwargs['overbought'] or 70
self.Oversold = kwargs['oversold'] or 30
self.Source = RollingWindow[Decimal](self.Warmup)
self.Filtr = RollingWindow[float](3)
self.Highpass = RollingWindow[float](3)
self.Period = RollingWindow[Decimal](2)
self.SmoothedPeriod = RollingWindow[Decimal](2)
self.Smooth = RollingWindow[Decimal](7)
self.Detrend = RollingWindow[Decimal](7)
self.HL2 = RollingWindow[Decimal](4)
self.I1 = RollingWindow[Decimal](7)
self.I2 = RollingWindow[Decimal](7)
self.Q1 = RollingWindow[Decimal](7)
self.Q2 = RollingWindow[Decimal](7)
self.Re = RollingWindow[Decimal](2)
self.Im = RollingWindow[Decimal](2)
self.Irma = 0
self.RoofedSource = RollingWindow[float](2)
# Start with history
self.FillWindows(self.Filtr, self.Highpass, self.Period, self.SmoothedPeriod, self.Smooth, self.Detrend, self.I1, self.I2, self.Q1, self.Q2, self.Re, self.Im, self.RoofedSource)
def __repr__(self):
return "{0} -> IsReady: {1}. Value: {2}".format(self.Name, self.IsReady, self.Value)
# Rolling Moving Average (or Wells Wilders MA)
def WWma(self, src, l):
l = 1 if l == 0 else l
self.Irma = (self.Irma * (l - 1) + src) / l
return self.Irma
# RSI function.
def EhRSI(self, l):
change = self.RoofedSource[0] - self.RoofedSource[1]
up = self.WWma(max(change, 0), l)
down = self.WWma(-min(change, 0), l)
return 100 if down == 0 else 0 if up == 0 else 100 - (100 / (1 + up / down))
def EhlersSuperSmootherFilter(self, lower):
a1 = math.exp(-math.pi * math.sqrt(2) / lower)
coeff3 = -pow(a1, 2)
coeff2 = 2 * a1 * math.cos(math.sqrt(2) * math.pi / lower)
coeff1 = 1 - coeff2 - coeff3
self.Filtr.Add(coeff1 * (self.Highpass[0] + self.Highpass[1]) / 2 + coeff2 * self.Filtr[1] + coeff3 * self.Filtr[2])
return self.Filtr[0]
def EhlersRoofingFilter(self, upper, lower):
alpha1 = (math.cos(math.sqrt(2) * math.pi / upper) + math.sin(math.sqrt(2) * math.pi / upper) - 1) / math.cos(math.sqrt(2) * math.pi / upper)
self.Highpass.Add(pow(1 - alpha1 / 2, 2) * (self.Source[0] - 2 * self.Source[1] + self.Source[2]) + 2 * (1 - alpha1) * self.Highpass[1] - pow(1 - alpha1, 2) * self.Highpass[2])
return self.EhlersSuperSmootherFilter(lower) if self.DoubleSmoothing else self.Highpass[0]
def ComputeNextValue(self, data):
self.Source.Add(data.Close)
self.HL2.Add((data.High + data.Low) / 2)
self.IsReady = self.Source.IsReady
if not self.IsReady: return
#
# --- Start the Homodyne Discriminator Caculations
#
# Mutable Variables (non-series)
C1 = 0.0962
C2 = 0.5769
Df = 0.5
C3 = (self.Period[1] * 0.075 + 0.54)
self.Smooth.Add(((self.HL2[0] * 4.0) + (self.HL2[1] * 3.0) + (self.HL2[2] * 2.0) + (self.HL2[3])) / 10.0)
self.Detrend.Add((self.Smooth[0] * C1 + self.Smooth[2] * C2 - self.Smooth[4] * C2 - self.Smooth[6] * C1) * C3)
# Compute InPhase and Quadrature components
self.Q1.Add((self.Detrend[0] * C1 + self.Detrend[2] * C2 - self.Detrend[4] * C2 - self.Detrend[6] * C1) * C3)
self.I1.Add(self.Detrend[3])
# Advance Phase of I1 and Q1 by 90 degrees
jI = (self.I1[0] * C1 + self.I1[2] * C2 - self.I1[4] * C2 - self.I1[6] * C1) * C3
jQ = (self.Q1[0] * C1 + self.Q1[2] * C2 - self.Q1[4] * C2 - self.Q1[6] * C1) * C3
# Smooth i and q components before applying discriminator
self.I2.Add(0.2 * (self.I1[0] - jQ) + 0.8 * self.I2[1])
self.Q2.Add(0.2 * (self.Q1[0] + jI) + 0.8 * self.Q2[1])
# Extract Homodyne Discriminator
Re = self.I2[0] * self.I2[1] + self.Q2[0] * self.Q2[1]
Im = self.I2[0] * self.Q2[1] - self.Q2[0] * self.I2[1]
self.Re.Add(0.2 * Re + 0.8 * self.Re[1])
self.Im.Add(0.2 * Im + 0.8 * self.Im[1])
Dp = 6.28318 / math.atan(self.Im[0] / self.Re[0]) if (self.Re[0] != 0 and self.Im[0] != 0) else 0
II = self.Period[1]
Dp = max(max(min(min(Dp, 1.5 * II), 50), 0.6667 * II), 6)
self.Period.Add(Dp * 0.2 + self.Period[1] * 0.8)
self.SmoothedPeriod.Add(0.33 * self.Period[0] + self.SmoothedPeriod[1] * 0.67)
RsiPeriod = round((self.SmoothedPeriod[0] * Df) - 1)
self.RoofedSource.Add(self.EhlersRoofingFilter(self.RoofingUpper, self.RoofingLower))
self.Value = self.EhRSI(RsiPeriod)
return self.Value
class StandardAlphaModel(AlphaModel):
def __init__(self):
self.symbolData = {}
self.Charts = {}
# S curve starting at 1 heading towards 0
def TrendConfidence(self, x):
return -0.5 * (1 + math.sin((math.pi * x) - (math.pi / 2))) + 1
# Equidistant thresholds within 0-1 range
# (ex: 3 means one at 16.7%, 50% and )
def ThresholdConfidence(self, x, t=3):
return -abs(math.sin(t * x * math.pi)) + 1
# Limit the threshold confidence to the trend confidence
def TrendLimitedThresholdConfidence(self, x, t=3):
trend_confidence = this.TrendConfidence(x)
threshold_confidence = this.ThresholdConfidence(x, t)
return threshold_confidence if threshold_confidence <= trend_confidence else trend_confidence
def Update(self, algorithm, data):
insights = []
for symbol, sd in self.symbolData.items():
for resolution, indicator in sd.Indicators.items():
algorithm.Plot(sd.Chart.Name, indicator.SeriesName, indicator.Value)
return insights
def OnSecuritiesChanged(self, algorithm, changes):
for added in changes.AddedSecurities:
self.symbolData[added.Symbol] = SymbolData(self, algorithm, added)
for removed in changes.RemovedSecurities:
del self.symbolData[removed.Symbol]
class SymbolData:
def __init__(self, alpha, algorithm, security):
self.Security = security
self.Indicators = {}
self.Chart = Chart(alpha.Indicator.ChartName(security.Symbol), ChartType.Stacked)
for minutes, resolution in algorithm.Consolidators.items():
indicator = alpha.Indicator(resolution = resolution)
self.Chart.AddSeries(Series(indicator.SeriesName, SeriesType.Line))
algorithm.RegisterIndicator(security.Symbol, indicator, resolution)
self.Indicators[resolution] = indicator
algorithm.AddChart(self.Chart)
from Alphas.StandardAlphaModel import *
from Indicators.EhlerRsiDiscriminator import EhlerRsiDiscriminator
class RsiAlphaModel(StandardAlphaModel):
Name = "RSI Alpha"
Indicator = EhlerRsiDiscriminator
def Update(self, algorithm, data):
insights = super(RsiAlphaModel, self).Update(algorithm, data)
minutes = data.Period.seconds / 60 if hasattr(data, 'Period') else 1
for symbol in data.Keys:
# algorithm.Debug(str(algorithm.Indicators[symbol][self.Indicator.Name][1].IsReady))
pass
# data.Period - contains the timestamp representing the consolidated timeframe the data applies to.
# We need to introduce MTF logic in aggregating confidence. Should we be keeping RollingWindow data
# in the AlphaModel
# for underlying in data:
# c = self.TrendLimitedThresholdConfidence(data[underlying], 3)
# time_delta = minutes * self.TrendConfidence()
# insights.append( Insight(data[underlying], time_delta, InsightType.Price, InsightDirection.Up, confidence=c) )
# insights.append( Insight(data[underlying], time_delta, InsightType.Price, InsightDirection.Down, confidence=1-c) )
return insights
from Timeframes.Multiple import MultipleTimeframes
from Risk.CompositeRiskManagementModel import CompositeRiskManagementModel
from Risk.TrailingStopRiskManagementModel import TrailingStopRiskManagementModel
from Risk.MaximumDrawdownPercentPerSecurity import MaximumDrawdownPercentPerSecurity
from Portfolio.MeanVarianceOptimizationPortfolioConstructionModel import MeanVarianceOptimizationPortfolioConstructionModel
class StandardAlgorithm(QCAlgorithmFramework):
def __init__(self):
self.Indicators = dict()
self.Charts = dict()
self.Consolidators = dict()
self.Alphas = []
self.SetBroker()
self.SetRisk()
self.SetTiming()
self.SetBacktesting()
self.SetSecurityInitializer(self.SetSecurities)
def SetAlphas(self, *alphas):
self.SetAlpha(CompositeAlphaModel(*alphas))
self.SetWarmUp(max(map(lambda alpha: alpha.Indicator.Warmup, alphas)))
for timeframe in self.Consolidators.values():
for alpha in alphas:
timeframe.DataConsolidated += alpha.Update
def SetBroker(self):
self.SetBrokerageModel(BrokerageName.InteractiveBrokersBrokerage, AccountType.Margin)
def SetSecurities(self, security):
security.SetDataNormalizationMode(DataNormalizationMode.Raw)
def OnSecuritiesChanged(self, changes):
for resolution in self.Consolidators.values():
for security in changes.AddedSecurities:
self.SubscriptionManager.AddConsolidator(security.Symbol.Value, resolution)
for security in changes.RemovedSecurities:
self.SubscriptionManager.RemoveConsolidator(security.Symbol.Value, resolution)
def SetRisk(self):
trailing_stop = 0.10
max_drawdown = 0.05
self.SetRiskManagement(CompositeRiskManagementModel(TrailingStopRiskManagementModel(trailing_stop), MaximumDrawdownPercentPerSecurity(max_drawdown)))
self.SetPortfolioConstruction(MeanVarianceOptimizationPortfolioConstructionModel())
self.Allocate = 0.25
def SetTiming(self):
self.SetTimeZone(TimeZones.Toronto)
self.UniverseSettings.Resolution = Resolution.Minute
self.Consolidators = MultipleTimeframes(lambda minutes: TradeBarConsolidator(TimeSpan.FromMinutes(minutes)))
# def OnDataConsolidated(self, sender, data):
# if self.IsWarmingUp: return
# symbol = str(data.get_Symbol())
# # self.Data[bar.Symbol.Value].SMA.Update(bar.Time, bar.Close)
def SetBacktesting(self):
self.SetStartDate(2019, 2, 11)
self.SetEndDate(2019, 2, 15)
self.SetCash(20000)
from collections import OrderedDict
def MultipleTimeframes(proc = lambda:None):
timeframes = [1, 5, 15, 30, 60, 240, 390, 1950]
return dict(zip(timeframes, [proc(x) for x in timeframes]))
class StandardIndicator:
Name = "Null Indicator"
def __init__(self, **kwargs):
self.Value = 0
self.IsReady = False
self.Resolution = kwargs['resolution'] or Resolution.Daily
def FillWindows(self, *args):
for window in args:
for i in range(window.Size):
window.Add(0)
@classmethod
def ChartName(self, symbol):
return "[{}] {}".format(symbol, getattr(self, 'Name', self.__class__.__name__))
def SeriesName(self, symbol):
return Extensions.GetEnumString(self.Resolution, Resolution)