from Algorithms.StandardAlgorithm import StandardAlgorithm
from Alphas.RsiAlphaModel import RsiAlphaModel
from Execution.StandardDeviationExecutionModel import StandardDeviationExecutionModel
class MainAlgorithm(StandardAlgorithm):
Timeframes = [1, 5, 15, 30, 60, 240, 390, 1950]
def Initialize(self):
self.SetExecution(StandardDeviationExecutionModel(60, 2, Resolution.Minute))
self.SetAlphas(RsiAlphaModel)
# Securities traded
symbols = [ Symbol.Create("CRON", SecurityType.Equity, Market.USA) ]
self.SetUniverseSelection( ManualUniverseSelectionModel(symbols) )
import math
import numpy as np
from Indicators.StandardIndicator import StandardIndicator
# def EhlerRSI(symbol, **kwargs):
# _symbol = symbol if isinstance(symbol, str) else symbol.Value
# RegisterIndicator(_symbol, EhlerRsiDiscriminator(**kwargs), kwargs['consolidator'].Period);
# return indicator
class EhlerRsiDiscriminator(StandardIndicator):
Warmup = 7
Name = "Ehler's RSI + Discriminator"
def __init__(self, double_smoothing=True, roofing_upper=48, roofing_lower=10, overbought=70, oversold=30, **kwargs):
super().__init__()
self.DoubleSmoothing = double_smoothing
self.RoofingUpper = roofing_upper
self.RoofingLower = roofing_lower
self.Overbought = overbought
self.Oversold = oversold
self.Source = RollingWindow[Decimal](3)
self.Filter = RollingWindow[float](3)
self.Highpass = RollingWindow[float](3)
self.Period = RollingWindow[Decimal](2)
self.SmoothedPeriod = RollingWindow[float](2)
self.Smooth = RollingWindow[float](7)
self.Detrend = RollingWindow[float](7)
self.HL2 = RollingWindow[Decimal](4)
self.I1 = RollingWindow[float](7)
self.I2 = RollingWindow[float](7)
self.Q1 = RollingWindow[float](7)
self.Q2 = RollingWindow[float](7)
self.Re = RollingWindow[float](2)
self.Im = RollingWindow[float](2)
self.MA = RollingWindow[float](2)
self.RoofedSource = RollingWindow[float](2)
# Start with history
self.FillWindows(self.Filter, self.Highpass, self.Period, self.Smooth, self.Detrend, self.I1, self.I2, self.Q1, self.Q2, self.Re, self.Im, self.MA, self.RoofedSource)
self.FillWindows(self.SmoothedPeriod, value=6)
def __repr__(self):
out = ""
for attribute, data in vars(self).items():
out += "{}: {}, ".format(attribute, (data[0] if isinstance(data, self.Source.__class__) or isinstance(data, self.Filter.__class__) else data))
return out
# Rolling Moving Average (or Wells Wilders MA)
def WWma(self, src, l):
self.MA.Add(np.nan_to_num((self.MA[1] * (l - 1) + src) / l))
return self.MA[0]
# RSI function.
def EhRSI(self, l):
change = self.RoofedSource[0] - self.RoofedSource[1]
up = self.WWma(max(change, 0), l)
down = self.WWma(-min(change, 0), l)
return 100 if down == 0 else 0 if up == 0 else 100 - (100 / (1 + up / down))
def EhlersSuperSmootherFilter(self):
f = (math.pi * math.sqrt(2)) / self.RoofingLower
a = math.exp(-f)
c3 = -a * a
c2 = 2 * a * math.cos(f)
c1 = 1 - c2 - c3
self.Filter.Add(c1 * (self.Highpass[0] + self.Highpass[1]) / 2 + c2 * self.Filter[1] + c3 * self.Filter[2])
return self.Filter[0]
def EhlersRoofingFilter(self):
f = math.sqrt(2) * math.pi / self.RoofingUpper
alpha1 = (math.cos(f) + math.sin(f) - 1) / math.cos(f)
self.Highpass.Add(np.nan_to_num(pow(1 - alpha1 / 2, 2) * (self.Source[0] - 2 * self.Source[1] + self.Source[2]) + 2 * (1 - alpha1) * self.Highpass[1] - pow(1 - alpha1, 2) * self.Highpass[2]))
return self.EhlersSuperSmootherFilter() if self.DoubleSmoothing else self.Highpass[0]
def Quadrature(self, src):
C1 = 0.0962
C2 = 0.5769
C3 = (self.Period[1] * 0.075 + 0.54)
return (src[0] * C1 + src[2] * C2 - src[4] * C2 - src[6] * C1) * C3
def Update(self, data):
self.Source.Add(data.Close)
self.HL2.Add((data.High + data.Low) / 2)
self.IsReady = self.Source.IsReady and self.HL2.IsReady
if not self.IsReady: return
#
# --- Start the Homodyne Discriminator Caculations
#
# Mutable Variables (non-series)
self.Smooth.Add((self.HL2[0] * 4.0 + self.HL2[1] * 3.0 + self.HL2[2] * 2.0 + self.HL2[3]) / 10.0)
self.Detrend.Add(self.Quadrature(self.Smooth))
# Compute InPhase and Quadrature components
self.Q1.Add(self.Quadrature(self.Detrend))
self.I1.Add(self.Detrend[3])
# Advance Phase of I1 and Q1 by 90 degrees
jI = self.Quadrature(self.I1)
jQ = self.Quadrature(self.Q1)
# Phaser addition for 3 bar averaging and
# Smooth i and q components before applying discriminator
self.I2.Add(0.2 * (self.I1[0] - jQ) + 0.8 * self.I2[1])
self.Q2.Add(0.2 * (self.Q1[0] + jI) + 0.8 * self.Q2[1])
# Extract Homodyne Discriminator
self.Re.Add(0.2 * (self.I2[0] * self.I2[1] + self.Q2[0] * self.Q2[1]) + 0.8 * self.Re[1])
self.Im.Add(0.2 * (self.I2[0] * self.Q2[1] - self.Q2[0] * self.I2[1]) + 0.8 * self.Im[1])
Dp = ((2 * math.pi) / math.atan(self.Im[0] / self.Re[0])) if (self.Re[0] != 0 and self.Im[0] != 0) else 0
Dp = max(max(min(min(Dp, 1.5 * self.Period[1]), 50), 0.6667 * self.Period[1]), 6)
self.Period.Add(Dp * 0.2 + self.Period[1] * 0.8)
self.SmoothedPeriod.Add(0.33 * self.Period[0] + self.SmoothedPeriod[1] * 0.67)
self.RoofedSource.Add(self.EhlersRoofingFilter())
self.Value = round(self.EhRSI(round(self.SmoothedPeriod[0] * 0.5 - 1)), 4)
# self.Value = self.RoofedSource[0]
# self.Value = IndicatorExtensions.Of(RelativeStrengthIndex(self.SmoothedPeriod[0] * 0.5 - 1, MovingAverageType.Wilders), self).Value
return self.Value
from Utilities.Unsorted import *
class StandardAlphaModel(AlphaModel):
Name = "Null Alpha"
Indicator = None
def __init__(self, algorithm):
self.symbolData = {}
self.Algorithm = algorithm
self.Chart = Chart(self.Name, ChartType.Stacked)
for consolidator in self.Algorithm.Consolidators:
consolidator.DataConsolidated += self.Update
# # S curve starting at 1 heading towards 0.
# # This means when the oscillator is at 0, trend
# # confidence is at its highest (most likely to reverse)
# def TrendConfidence(self, x):
# return -0.5 * (1 + math.sin((math.pi * x) - (math.pi / 2))) + 1
# # Equidistant thresholds within 0-1 range, ex; 3 means
# # (1 at 16.7%, 50% and 83.3%)
# # (0 at 0%, 33%, 66%, 100%)
# def ThresholdConfidence(self, x, t=3, low=0):
# if low > 1: lowest_confidence = 1
# if low < 0: lowest_confidence = 0
# return abs(math.sin(t * x * math.pi)) + lowest_confidence
# # Limit the threshold confidence to the trend confidence
# def TrendLimitedThresholdConfidence(self, x, t=3):
# trend_confidence = this.TrendConfidence(x)
# threshold_confidence = this.ThresholdConfidence(x, t)
# return threshold_confidence if threshold_confidence <= trend_confidence else trend_confidence
# This Update method somehow doesn't accept algorithm; only data
def Update(self, algorithm, data):
insights = []
for symbol, sd in self.symbolData.items():
for consolidator, indicator in sd.Indicators.items():
if indicator.IsReady:
self.Algorithm.Plot(self.Name, ConsolidatedSymbolLabel(symbol, consolidator), indicator.Value)
# sd.Algorithm.Debug("{} {}: {}".format(data.Time, ConsolidatedSymbolLabel(symbol, consolidator), indicator.Source[0]))
return insights
def OnSecuritiesChanged(self, algorithm, changes):
# None of my consolidators are present!
for consolidator in algorithm.Consolidators:
algorithm.Debug(consolidator.Period)
# Nor are they found here!
for consolidator in self.Algorithm.Consolidators:
algorithm.Debug(consolidator.Period)
for added in changes.AddedSecurities:
self.symbolData[added.Symbol] = SymbolData(self, added)
for removed in changes.RemovedSecurities:
del self.symbolData[removed.Symbol]
class SymbolData:
def __init__(self, alpha, security):
self.Indicators = {}
# So as a result, none of my indicators are created!
for consolidator in alpha.Algorithm.Consolidators:
indicator = alpha.Indicator()
alpha.Algorithm.RegisterIndicator(security.Symbol, indicator, consolidator)
alpha.Chart.AddSeries(Series(ConsolidatedSymbolLabel(security.Symbol, consolidator), SeriesType.Line))
self.Indicators[consolidator] = indicator
from Alphas.StandardAlphaModel import *
from Indicators.EhlerRsiDiscriminator import EhlerRsiDiscriminator
class RsiAlphaModel(StandardAlphaModel):
Indicator = EhlerRsiDiscriminator
Name = EhlerRsiDiscriminator.Name
# See this Update method accepts algorithm and data
def Update(self, algorithm, data):
insights = super().Update(algorithm, data)
# data.Period - contains the timestamp representing the consolidated timeframe the data applies to.
# We need to introduce MTF logic in aggregating confidence. Should we be keeping RollingWindow data
# in the AlphaModel
# for underlying in data:
# c = self.TrendLimitedThresholdConfidence(data[underlying], 3)
# time_delta = minutes * self.TrendConfidence()
# insights.append( Insight(data[underlying], time_delta, InsightType.Price, InsightDirection.Up, confidence=c) )
# insights.append( Insight(data[underlying], time_delta, InsightType.Price, InsightDirection.Down, confidence=1-c) )
return insights
import itertools
from datetime import timedelta
from collections import defaultdict
from Utilities.Unsorted import *
from Risk.CompositeRiskManagementModel import CompositeRiskManagementModel
from Risk.TrailingStopRiskManagementModel import TrailingStopRiskManagementModel
from Risk.MaximumDrawdownPercentPerSecurity import MaximumDrawdownPercentPerSecurity
from Portfolio.MeanVarianceOptimizationPortfolioConstructionModel import MeanVarianceOptimizationPortfolioConstructionModel
class StandardAlgorithm(QCAlgorithmFramework):
Timeframes = []
def __init__(self):
# TODO: Remove self.Alphas?
self.Alphas = []
self.SetBroker()
self.SetRisk()
self.SetTiming()
self.SetSecurityInitializer(self.SetSecurities)
# Each alpha model is registered once per consolidated resolution
def SetAlphas(self, *alpha_models):
for alpha_model in alpha_models:
alpha = alpha_model(self)
self.AddChart(alpha.Chart)
self.Alphas.append(alpha)
self.SetWarmUp(max(map(lambda alpha: alpha.Indicator.Warmup, self.Alphas)))
self.SetAlpha(CompositeAlphaModel(*self.Alphas))
def SetBroker(self):
self.SetBrokerageModel(BrokerageName.InteractiveBrokersBrokerage, AccountType.Margin)
def SetSecurities(self, security):
security.SetDataNormalizationMode(DataNormalizationMode.Raw)
def OnSecuritiesChanged(self, changes):
for consolidator in self.Consolidators:
for security in changes.AddedSecurities:
# self.SubscriptionManager.AddConsolidator(security.Symbol.Value, consolidator)
pass
for security in changes.RemovedSecurities:
self.SubscriptionManager.RemoveConsolidator(security.Symbol.Value, consolidator)
def SetRisk(self):
self.SetCash(20000)
trailing_stop = 0.10
max_drawdown = 0.05
self.SetRiskManagement(CompositeRiskManagementModel(TrailingStopRiskManagementModel(trailing_stop), MaximumDrawdownPercentPerSecurity(max_drawdown)))
self.SetPortfolioConstruction(MeanVarianceOptimizationPortfolioConstructionModel())
self.Allocate = 0.25
def SetTiming(self):
self.SetStartDate(2019, 2, 11)
self.SetEndDate(2019, 2, 15)
self.SetTimeZone(TimeZones.Toronto)
self.UniverseSettings.Resolution = Resolution.Minute
self.Consolidators = map(lambda t: TradeBarConsolidator(timedelta(minutes=t)), self.Timeframes)
def ConsolidatedSymbolLabel(symbol, consolidator):
return "{} ({})".format(symbol, consolidator.Period)
from abc import ABC, abstractmethod
class StandardIndicator:
Name = "Null Indicator"
def __init__(self):
self.Value = 0
self.IsReady = False
@abstractmethod
def FillWindows(self, *args, value=0):
for window in args:
for i in range(window.Size):
window.Add(value)