Overall Statistics |
Total Trades 0 Average Win 0% Average Loss 0% Compounding Annual Return 0% Drawdown 0% Expectancy 0 Net Profit 0% Sharpe Ratio 0 Loss Rate 0% Win Rate 0% Profit-Loss Ratio 0 Alpha 0 Beta 0 Annual Standard Deviation 0 Annual Variance 0 Information Ratio 0 Tracking Error 0 Treynor Ratio 0 Total Fees $0.00 |
from Algorithms.StandardAlgorithm import StandardAlgorithm from Alphas.RsiAlphaModel import RsiAlphaModel from Execution.StandardDeviationExecutionModel import StandardDeviationExecutionModel class MainAlgorithm(StandardAlgorithm): def Initialize(self): super(MainAlgorithm, self).__init__() self.SetExecution(StandardDeviationExecutionModel(60, 2, Resolution.Minute)) self.SetAlphas(RsiAlphaModel()) # Securities traded symbols = [ Symbol.Create("CRON", SecurityType.Equity, Market.USA) ] self.SetUniverseSelection( ManualUniverseSelectionModel(symbols) )
import math import numpy as np from Indicators.StandardIndicator import StandardIndicator def EhlerRSI(symbol, **kwargs): symbol_id = symbol if isinstance(symbol, str) else symbol.Value name = CreateIndicatorName(symbol, "[{0}] {1}".format(symbol_id, EhlerRsiDiscriminator.Name), kwargs['resolution']) indicator = EhlerRsiDiscriminator(name, **kwargs); RegisterIndicator(symbol_id, indicator, kwargs['resolution']); return indicator class EhlerRsiDiscriminator(StandardIndicator): Warmup = 7 Name = "Ehler's RSI + Discriminator" def __init__(self, name, double_smoothing=True, roofing_upper=48, roofing_lower=10, overbought=70, oversold=30): super(EhlerRsiDiscriminator, self).__init__() self.Name = name self.DoubleSmoothing = double_smoothing self.RoofingUpper = roofing_upper self.RoofingLower = roofing_lower self.Overbought = overbought self.Oversold = oversold self.Source = RollingWindow[Decimal](self.Warmup) self.Filtr = RollingWindow[float](3) self.Highpass = RollingWindow[float](3) self.Period = RollingWindow[Decimal](2) self.SmoothedPeriod = RollingWindow[Decimal](2) self.Smooth = RollingWindow[Decimal](7) self.Detrend = RollingWindow[Decimal](7) self.HL2 = RollingWindow[Decimal](4) self.I1 = RollingWindow[Decimal](7) self.I2 = RollingWindow[Decimal](7) self.Q1 = RollingWindow[Decimal](7) self.Q2 = RollingWindow[Decimal](7) self.Re = RollingWindow[Decimal](2) self.Im = RollingWindow[Decimal](2) self.Irma = 0 self.RoofedSource = RollingWindow[float](2) # Start with history self.FillWindows(self.Filtr, self.Highpass, self.Period, self.SmoothedPeriod, self.Smooth, self.Detrend, self.I1, self.I2, self.Q1, self.Q2, self.Re, self.Im, self.RoofedSource) def __repr__(self): return "{0} -> IsReady: {1}. Value: {2}".format(self.Name, self.IsReady, self.Value) # Rolling Moving Average (or Wells Wilders MA) def WWma(self, src, l): l = 1 if l == 0 else l self.Irma = (self.Irma * (l - 1) + src) / l return self.Irma # RSI function. def EhRSI(self, l): change = self.RoofedSource[0] - self.RoofedSource[1] up = self.WWma(max(change, 0), l) down = self.WWma(-min(change, 0), l) return 100 if down == 0 else 0 if up == 0 else 100 - (100 / (1 + up / down)) def EhlersSuperSmootherFilter(self, lower): a1 = math.exp(-math.pi * math.sqrt(2) / lower) coeff3 = -pow(a1, 2) coeff2 = 2 * a1 * math.cos(math.sqrt(2) * math.pi / lower) coeff1 = 1 - coeff2 - coeff3 self.Filtr.Add(coeff1 * (self.Highpass[0] + self.Highpass[1]) / 2 + coeff2 * self.Filtr[1] + coeff3 * self.Filtr[2]) return self.Filtr[0] def EhlersRoofingFilter(self, upper, lower): alpha1 = (math.cos(math.sqrt(2) * math.pi / upper) + math.sin(math.sqrt(2) * math.pi / upper) - 1) / math.cos(math.sqrt(2) * math.pi / upper) self.Highpass.Add(pow(1 - alpha1 / 2, 2) * (self.Source[0] - 2 * self.Source[1] + self.Source[2]) + 2 * (1 - alpha1) * self.Highpass[1] - pow(1 - alpha1, 2) * self.Highpass[2]) return self.EhlersSuperSmootherFilter(lower) if self.DoubleSmoothing else self.Highpass[0] def Update(self, data): self.Source.Add(data.Close) self.HL2.Add((data.High + data.Low) / 2) self.IsReady = self.Source.IsReady if not self.IsReady: return # # --- Start the Homodyne Discriminator Caculations # # Mutable Variables (non-series) C1 = 0.0962 C2 = 0.5769 Df = 0.5 C3 = (self.Period[1] * 0.075 + 0.54) self.Smooth.Add(((self.HL2[0] * 4.0) + (self.HL2[1] * 3.0) + (self.HL2[2] * 2.0) + (self.HL2[3])) / 10.0) self.Detrend.Add((self.Smooth[0] * C1 + self.Smooth[2] * C2 - self.Smooth[4] * C2 - self.Smooth[6] * C1) * C3) # Compute InPhase and Quadrature components self.Q1.Add((self.Detrend[0] * C1 + self.Detrend[2] * C2 - self.Detrend[4] * C2 - self.Detrend[6] * C1) * C3) self.I1.Add(self.Detrend[3]) # Advance Phase of I1 and Q1 by 90 degrees jI = (self.I1[0] * C1 + self.I1[2] * C2 - self.I1[4] * C2 - self.I1[6] * C1) * C3 jQ = (self.Q1[0] * C1 + self.Q1[2] * C2 - self.Q1[4] * C2 - self.Q1[6] * C1) * C3 # Smooth i and q components before applying discriminator self.I2.Add(0.2 * (self.I1[0] - jQ) + 0.8 * self.I2[1]) self.Q2.Add(0.2 * (self.Q1[0] + jI) + 0.8 * self.Q2[1]) # Extract Homodyne Discriminator Re = self.I2[0] * self.I2[1] + self.Q2[0] * self.Q2[1] Im = self.I2[0] * self.Q2[1] - self.Q2[0] * self.I2[1] self.Re.Add(0.2 * Re + 0.8 * self.Re[1]) self.Im.Add(0.2 * Im + 0.8 * self.Im[1]) Dp = 6.28318 / math.atan(self.Im[0] / self.Re[0]) if (self.Re[0] != 0 and self.Im[0] != 0) else 0 II = self.Period[1] Dp = max(max(min(min(Dp, 1.5 * II), 50), 0.6667 * II), 6) self.Period.Add(Dp * 0.2 + self.Period[1] * 0.8) self.SmoothedPeriod.Add(0.33 * self.Period[0] + self.SmoothedPeriod[1] * 0.67) RsiPeriod = round((self.SmoothedPeriod[0] * Df) - 1) self.RoofedSource.Add(self.EhlersRoofingFilter(self.RoofingUpper, self.RoofingLower)) self.Value = self.EhRSI(RsiPeriod)
from Timeframes.Multiple import MultipleTimeframes class StandardAlphaModel(AlphaModel): def ChartName(self, symbol): return "[{0}] {1}".format(symbol, self.Indicator.Name) # S curve starting at 1 heading towards 0 def TrendConfidence(self, x): return -0.5 * (1 + math.sin((math.pi * x) - (math.pi / 2))) + 1 # Equidistant thresholds within 0-1 range # (ex: 3 means one at 16.7%, 50% and ) def ThresholdConfidence(self, x, t=3): return -abs(math.sin(t * x * math.pi)) + 1 # Limit the threshold confidence to the trend confidence def TrendLimitedThresholdConfidence(self, x, t=3): trend_confidence = this.TrendConfidence(x) threshold_confidence = this.ThresholdConfidence(x, t) return threshold_confidence if threshold_confidence <= trend_confidence else trend_confidence
from Timeframes.Multiple import MultipleTimeframes from Alphas.StandardAlphaModel import StandardAlphaModel from Indicators.EhlerRsiDiscriminator import EhlerRsiDiscriminator class RsiAlphaModel(StandardAlphaModel): def __init__(self): self.Name = "RSI Alpha" self.Indicator = EhlerRsiDiscriminator def Update(self, algorithm, slice): # Updates this alpha model with the latest data from the algorithm. # This is called each time the algorithm receives data for subscribed securities # Generate insights on the securities in universe. minutes = slice.Period.seconds / 60 if hasattr(slice, 'Period') else 1 for symbol in slice.Keys: # algorithm.Debug(str(algorithm.Indicators[symbol][self.Indicator.Name][1].IsReady)) pass insights = [] # data.Period - contains the timestamp representing the consolidated timeframe the data applies to. # We need to introduce MTF logic in aggregating confidence. Should we be keeping RollingWindow data # in the AlphaModel # for underlying in data: # c = self.TrendLimitedThresholdConfidence(data[underlying], 3) # time_delta = minutes * self.TrendConfidence() # insights.append( Insight(data[underlying], time_delta, InsightType.Price, InsightDirection.Up, confidence=c) ) # insights.append( Insight(data[underlying], time_delta, InsightType.Price, InsightDirection.Down, confidence=1-c) ) return insights
from Timeframes.Multiple import MultipleTimeframes from Risk.CompositeRiskManagementModel import CompositeRiskManagementModel from Risk.TrailingStopRiskManagementModel import TrailingStopRiskManagementModel from Risk.MaximumDrawdownPercentPerSecurity import MaximumDrawdownPercentPerSecurity from Portfolio.MeanVarianceOptimizationPortfolioConstructionModel import MeanVarianceOptimizationPortfolioConstructionModel class StandardAlgorithm(QCAlgorithmFramework): def __init__(self): self.Indicators = dict() self.Charts = dict() self.Consolidators = dict() self.Alphas = [] self.SetBroker() self.SetRisk() self.SetTimeSettings() self.SetBacktesting() self.SetSecurityInitializer(self.SetSecurities) def SetAlphas(self, *alphas): self.Alphas = alphas self.SetAlpha(CompositeAlphaModel(*self.Alphas)) self.SetWarmUp(max(map(lambda alpha: alpha.Indicator.Warmup, self.Alphas))) def SetBroker(self): self.SetBrokerageModel(BrokerageName.InteractiveBrokersBrokerage, AccountType.Margin) def SetSecurities(self, security): security.SetDataNormalizationMode(DataNormalizationMode.Raw) def OnSecuritiesChanged(self, changes): for security in changes.AddedSecurities: symbol = security.Symbol.Value # Within each timeframe self.Consolidators[symbol] = MultipleTimeframes(lambda minutes: TradeBarConsolidator(TimeSpan.FromMinutes(minutes))) for minutes, timeframe in self.Consolidators[symbol].items(): # Register consolidators with subscription data timeframe.DataConsolidated += self.OnDataConsolidated self.SubscriptionManager.AddConsolidator(symbol, timeframe) # Register indicators with timeframe consolidators and plot series data for alpha in self.Alphas: indicator_id = alpha.Indicator.Name alpha_chart_name = alpha.ChartName(symbol) indicator_timeframe_name = self.CreateIndicatorName(security.Symbol, alpha_chart_name, security.Resolution) # One chart per symbol/alpha self.Charts[symbol] = dict() self.Charts[symbol][indicator_id] = Chart(alpha_chart_name, ChartType.Stacked) self.Indicators[symbol] = dict() self.Indicators[symbol][indicator_id] = MultipleTimeframes(lambda minutes: alpha.Indicator(indicator_timeframe_name)) self.RegisterIndicator(symbol, self.Indicators[symbol][indicator_id][minutes], timeframe) self.Charts[symbol][indicator_id].AddSeries(Series("{0}m".format(minutes), SeriesType.Line)) # Plot the chart for alpha in self.Alphas: self.AddChart(self.Charts[symbol][alpha.Indicator.Name]) for security in changes.RemovedSecurities: symbol = security.Symbol.Value for timeframe in self.Consolidators[symbol].values(): self.SubscriptionManager.RemoveConsolidator(symbol, timeframe) def SetRisk(self): trailing_stop = 0.10 max_drawdown = 0.05 self.SetRiskManagement(CompositeRiskManagementModel(TrailingStopRiskManagementModel(trailing_stop), MaximumDrawdownPercentPerSecurity(max_drawdown))) self.SetPortfolioConstruction(MeanVarianceOptimizationPortfolioConstructionModel()) self.Allocate = 0.25 def SetTimeSettings(self): self.SetTimeZone(TimeZones.Toronto) self.UniverseSettings.Resolution = Resolution.Minute def OnDataConsolidated(self, sender, slice): if self.IsWarmingUp: return symbol = str(slice.get_Symbol()) for alpha in self.Alphas: # Update the chart series indicator_name = alpha.Indicator.Name for minutes in self.Indicators[symbol][indicator_name]: # self.Debug(self.Indicators[symbol][indicator_name][minutes]) self.Plot(alpha.ChartName(symbol), "{0}m".format(minutes), self.Indicators[symbol][indicator_name][minutes].Value) def SetBacktesting(self): self.SetStartDate(2019, 2, 11) self.SetEndDate(2019, 2, 15) self.SetCash(20000)
from collections import OrderedDict class MultipleTimeframes(OrderedDict): def __init__(self, proc = lambda:None): timeframes = [1, 5, 15, 30, 60, 240, 390, 1950] super(MultipleTimeframes, self).__init__(dict(zip(timeframes, [proc(x) for x in timeframes])))
class StandardIndicator: def __init__(self): self.Value = 0 self.IsReady = False def FillWindows(self, *args): for window in args: for i in range(window.Size): window.Add(0)