Overall Statistics |
Total Trades 4170 Average Win 0.47% Average Loss -0.59% Compounding Annual Return 8.033% Drawdown 58.900% Expectancy 0.119 Net Profit 409.738% Sharpe Ratio 0.449 Probabilistic Sharpe Ratio 0.224% Loss Rate 38% Win Rate 62% Profit-Loss Ratio 0.80 Alpha 0 Beta 0 Annual Standard Deviation 0.192 Annual Variance 0.037 Information Ratio 0.449 Tracking Error 0.192 Treynor Ratio 0 Total Fees $18134.81 |
from QuantConnect.Data.UniverseSelection import * from Selection.FundamentalUniverseSelectionModel import FundamentalUniverseSelectionModel from itertools import groupby from math import ceil class QC500UniverseSelectionModel(FundamentalUniverseSelectionModel): '''Defines the QC500 universe as a universe selection model for framework algorithm For details: https://github.com/QuantConnect/Lean/pull/1663''' def __init__(self, filterFineData = True, universeSettings = None, securityInitializer = None): '''Initializes a new default instance of the QC500UniverseSelectionModel''' super().__init__(filterFineData, universeSettings, securityInitializer) self.numberOfSymbolsCoarse = 1000 self.numberOfSymbolsFine = 500 self.dollarVolumeBySymbol = {} self.lastMonth = -1 def SelectCoarse(self, algorithm, coarse): if algorithm.Time.month == self.lastMonth: return Universe.Unchanged # Do not invest in stocks generating errors messages in logs filteredErrors = [x for x in coarse if x.Symbol.Value != "VIAC" and x.Symbol.Value != "BEEM" and x.Symbol.Value != "LSI" and x.Symbol.Value != "IQV" and x.Symbol.Value != "GBT" and x.Symbol.Value != "VTRS" and x.Symbol.Value != "FUBO" and x.Symbol.Value != "SPCE" and x.Symbol.Value != "TFC" and x.Symbol.Value != "PEAK"] sortedByDollarVolume = sorted([x for x in filteredErrors if x.HasFundamentalData and x.Volume > 0 and x.Price > 0], key = lambda x: x.DollarVolume, reverse=True)[:self.numberOfSymbolsCoarse] self.dollarVolumeBySymbol = {x.Symbol:x.DollarVolume for x in sortedByDollarVolume} # If no security has met the QC500 criteria, the universe is unchanged. # A new selection will be attempted on the next trading day as self.lastMonth is not updated if len(self.dollarVolumeBySymbol) == 0: return Universe.Unchanged # return the symbol objects our sorted collection return list(self.dollarVolumeBySymbol.keys()) def SelectFine(self, algorithm, fine): '''Performs fine selection for the QC500 constituents The company's headquarter must in the U.S. The stock must be traded on either the NYSE or NASDAQ At least half a year since its initial public offering The stock's market cap must be greater than 500 million''' sortedBySector = sorted([x for x in fine if x.CompanyReference.CountryId == "USA" and x.CompanyReference.PrimaryExchangeID in ["NYS","NAS"] and (algorithm.Time - x.SecurityReference.IPODate).days > 180 and x.MarketCap > 5e8], key = lambda x: x.CompanyReference.IndustryTemplateCode) count = len(sortedBySector) # If no security has met the QC500 criteria, the universe is unchanged. # A new selection will be attempted on the next trading day as self.lastMonth is not updated if count == 0: return Universe.Unchanged # Update self.lastMonth after all QC500 criteria checks passed self.lastMonth = algorithm.Time.month percent = self.numberOfSymbolsFine / count sortedByDollarVolume = [] # select stocks with top dollar volume in every single sector for code, g in groupby(sortedBySector, lambda x: x.CompanyReference.IndustryTemplateCode): y = sorted(g, key = lambda x: self.dollarVolumeBySymbol[x.Symbol], reverse = True) c = ceil(len(y) * percent) sortedByDollarVolume.extend(y[:c]) sortedByDollarVolume = sorted(sortedByDollarVolume, key = lambda x: self.dollarVolumeBySymbol[x.Symbol], reverse=True) return [x.Symbol for x in sortedByDollarVolume[:self.numberOfSymbolsFine]]
from clr import AddReference AddReference("System") AddReference("QuantConnect.Algorithm") AddReference("QuantConnect.Common") from System import * from QuantConnect import * from QuantConnect.Algorithm import * # from QC500UniverseSelectionModel import QC500UniverseSelectionModel from UniverseSelectionModel import UniverseSelectionModel from MomentumAlphaModel import MomentumAlphaModel from Execution.ImmediateExecutionModel import ImmediateExecutionModel from Risk.NullRiskManagementModel import NullRiskManagementModel from QuantConnect.Indicators import * class A0002(QCAlgorithm): def Initialize(self): self.SetStartDate(2000, 1, 28) # Set Start Date, needs to be 1st of the month # self.SetEndDate(2021, 1, 10) # Set End Date self.SetCash(100000) # Set Strategy Cash self.SetBrokerageModel(BrokerageName.InteractiveBrokersBrokerage, AccountType.Margin) # Benchmark self.benchmark = Symbol.Create('SPY', SecurityType.Equity, Market.USA) self.AddEquity('SPY', Resolution.Daily) self.referenceSMAperiod = 200 self.referenceTicker = "SPY" self.referenceSMA = self.SMA(self.referenceTicker, self.referenceSMAperiod, Resolution.Daily) self.momentumScorePeriod = 125 self.selectionSize = 30 self.selectionThreshold = 40 self.STDperiod = 20 # Data resolution self.UniverseSettings.Resolution = Resolution.Daily self.UniverseSettings.DataNormalizationMode = DataNormalizationMode.SplitAdjusted self.Settings.RebalancePortfolioOnInsightChanges = False self.Settings.RebalancePortfolioOnSecurityChanges = False self.SetUniverseSelection(UniverseSelectionModel()) self.AddAlpha(MomentumAlphaModel(Resolution.Daily, self.momentumScorePeriod, self.selectionSize, self.selectionThreshold, self.STDperiod, self.referenceSMAperiod, self.referenceTicker, self.referenceSMA)) self.SetPortfolioConstruction(InsightWeightingPortfolioConstructionModel(self.DateRules.MonthStart())) self.SetExecution(ImmediateExecutionModel()) self.SetRiskManagement(NullRiskManagementModel())
from collections import deque from datetime import datetime, timedelta import numpy as np from scipy import stats import pandas as pd from QuantConnect.Algorithm.Framework.Alphas import * class ClenowMomentumAlphaModel(AlphaModel): def __init__(self, resolution, linRegPeriod, SMAperiod, ATRperiod, referenceSMAperiod, referenceTicker, referenceSMA, riskFactor): self.resolution = resolution self.symbolDataBySymbol = {} self.linRegPeriod = linRegPeriod self.SMAperiod = SMAperiod self.ATRperiod = ATRperiod self.referenceSMAperiod = referenceSMAperiod self.referenceTicker = referenceTicker self.referenceSMA = referenceSMA self.referenceSymbolData = None self.riskFactor = riskFactor self.isPositionsRebalancingWeek = True self.previousStocksBought = [] self.referenceSMAisSet = False def Update(self, algorithm, data): insights = [] topSelection = [] stocksToBuy = [] if algorithm.Time.isoweekday() != 3: return insights else: # Weekly update of insights every Wednesday for symbol, symbolData in self.symbolDataBySymbol.items(): if data.ContainsKey(symbolData.Symbol): if data[symbolData.Symbol] is None: continue else: selectedStock = [] selectedStock.append(symbolData) selectedStock.append(symbolData.ClenowMomentum.Value) selectedStock.append(self.riskFactor*data[symbolData.Symbol].Close/symbolData.ATR.Current.Value) topSelection.append(selectedStock) #Rank and keep 20% of stock based on momentum indicator topSelection = sorted(topSelection, key=lambda x: x[1], reverse=True) topSelection = topSelection[:int(20*len(topSelection)/100)] # Setup portfolio available % AvailablePorfolioPercent = 1 # buy the previous stock first or sell according to conditions for previousStockBought in self.previousStocksBought: for selectedStock in topSelection: if previousStockBought[0] == selectedStock[0] and data[selectedStock[0].Symbol].Close > selectedStock[0].SMA.Current.Value: # Every positions rebalancing week, update ATR value for last week stocks if self.isPositionsRebalancingWeek == True: if selectedStock[2] < AvailablePorfolioPercent: AvailablePorfolioPercent = AvailablePorfolioPercent - selectedStock[2] stocksToBuy.append(selectedStock) else: # Every other week, keep the same ATR for positions if previousStockBought[2] < AvailablePorfolioPercent: AvailablePorfolioPercent = AvailablePorfolioPercent - previousStockBought[2] stocksToBuy.append(previousStockBought) topSelection.remove(selectedStock) elif data[selectedStock[0].Symbol].Close < selectedStock[0].SMA.Current.Value: topSelection.remove(selectedStock) # If market reference > referenceSMA -> bullish market -> buy more stock if data.ContainsKey(self.referenceSymbolData.Symbol): if data[self.referenceSymbolData.Symbol] is not None: if data[self.referenceSymbolData.Symbol].Close > self.referenceSymbolData.referenceSMA.Current.Value: for selectedStock in topSelection: if selectedStock[2] < AvailablePorfolioPercent: stocksToBuy.append(selectedStock) AvailablePorfolioPercent = AvailablePorfolioPercent - selectedStock[2] else: break for stockToBuy in stocksToBuy: insights.append(Insight.Price(stockToBuy[0].Symbol, timedelta(days=1), InsightDirection.Up, None, 1.00, None, stockToBuy[2])) self.previousStocksBought = stocksToBuy if self.isPositionsRebalancingWeek == True: self.isPositionsRebalancingWeek = False else: self.isPositionsRebalancingWeek = True return insights def OnSecuritiesChanged(self, algorithm, changes): # Initialize referenceTicker SMA once if self.referenceSMAisSet == False: history = algorithm.History([self.referenceTicker], max(self.SMAperiod, self.ATRperiod, self.linRegPeriod, self.referenceSMAperiod), self.resolution) tickers = history.index.levels[0] for ticker in tickers: symbol = SymbolCache.GetSymbol(ticker) if symbol not in self.symbolDataBySymbol: symbolData = SymbolData(symbol, self.SMAperiod, self.ATRperiod, self.linRegPeriod, self.referenceSMAperiod, self.referenceTicker) self.symbolDataBySymbol[symbol] = symbolData self.referenceSymbolData = symbolData symbolData.RegisterIndicators(algorithm, self.resolution) symbolData.WarmUpIndicators(history, ticker, symbol) self.referenceSMAisSet = True # initialize data for added securities addedSymbols = [x.Symbol for x in changes.AddedSecurities] if len(addedSymbols) == 0: return else: history = algorithm.History(addedSymbols, max(self.SMAperiod, self.ATRperiod, self.linRegPeriod), self.resolution) tickers = history.index.levels[0] for ticker in tickers: symbol = SymbolCache.GetSymbol(ticker) if symbol not in self.symbolDataBySymbol: symbolData = SymbolData(symbol, self.SMAperiod, self.ATRperiod, self.linRegPeriod, self.referenceSMAperiod, self.referenceTicker) self.symbolDataBySymbol[symbol] = symbolData symbolData.RegisterIndicators(algorithm, self.resolution) symbolData.WarmUpIndicators(history, ticker, symbol) # clean up data for removed securities for removed in changes.RemovedSecurities: symbolData = self.symbolDataBySymbol.pop(removed.Symbol, None) if symbolData is not None: symbolData.RemoveConsolidators(algorithm) class SymbolData: '''Contains data specific to a symbol required by this model''' def __init__(self, symbol, SMAperiod, ATRperiod, linRegPeriod, referenceSMAperiod, referenceTicker): self.Symbol = symbol self.SMA = SimpleMovingAverage(SMAperiod) self.ATR = AverageTrueRange(ATRperiod, MovingAverageType.Simple) self.ClenowMomentum = ClenowMomentum('ClenowMomentum', symbol, linRegPeriod) self.referenceSMA = SimpleMovingAverage(referenceSMAperiod) self.referenceTicker = referenceTicker self.SMAConsolidator = None self.ATRConsolidator = None self.ClenowMomentumConsolidator = None self.referenceSMAConsolidator = None self.previous = 0 def RegisterIndicators(self, algorithm, resolution): self.SMAConsolidator = algorithm.ResolveConsolidator(self.Symbol, resolution) algorithm.RegisterIndicator(self.Symbol, self.SMA, self.SMAConsolidator, Field.Close) self.ATRConsolidator = algorithm.ResolveConsolidator(self.Symbol, resolution) algorithm.RegisterIndicator(self.Symbol, self.ATR, self.ATRConsolidator) self.ClenowMomentumConsolidator = algorithm.ResolveConsolidator(self.Symbol, resolution) algorithm.RegisterIndicator(self.Symbol, self.ClenowMomentum, self.ClenowMomentumConsolidator) if self.Symbol.Value == self.referenceTicker: self.referenceSMAConsolidator = algorithm.ResolveConsolidator(self.Symbol, resolution) algorithm.RegisterIndicator(self.Symbol, self.referenceSMA, self.referenceSMAConsolidator, Field.Close) def WarmUpIndicators(self, history, ticker, symbol): for tuple in history.loc[ticker].itertuples(): self.SMA.Update(tuple.Index, tuple.close) bar = TradeBar(tuple.Index, symbol, tuple.open, tuple.high, tuple.low, tuple.close, tuple.volume) self.ATR.Update(bar) self.ClenowMomentum.Warmup(history) if self.Symbol.Value == self.referenceTicker: for tuple in history.loc[ticker].itertuples(): self.referenceSMA.Update(tuple.Index, tuple.close) def RemoveConsolidators(self, algorithm): if self.SMAConsolidator is not None: algorithm.SubscriptionManager.RemoveConsolidator(self.Symbol, self.SMAConsolidator) if self.ATRConsolidator is not None: algorithm.SubscriptionManager.RemoveConsolidator(self.Symbol, self.ATRConsolidator) if self.ClenowMomentumConsolidator is not None: algorithm.SubscriptionManager.RemoveConsolidator(self.Symbol, self.ClenowMomentumConsolidator) @property def CanEmit(self): if self.previousSMA == self.SMA.Samples: return False if self.previousATR == self.ATR.Samples: return False self.previousSMA = self.SMA.Samples self.previousATR = self.ATR.Samples return self.SMA.IsReady # Define custom indicator class ClenowMomentum: def __init__(self, name, symbol, period): self.symbol = symbol self.Name = name self.Time = datetime.min self.Value = 0 self.IsReady = False self.queue = deque(maxlen=period) self.queueTime = deque(maxlen=period) self.CurrentReturn = 0 # required update method def Update(self, input): return self.UpdateFromParameters(input.Time, input.Close) # update called for history warmup def UpdateFromParameters(self, time, value): self.queue.appendleft(value) if not len(self.queue) == self.queue.maxlen: return False self.queueTime.appendleft(time) self.Time = time count = len(self.queue) self.IsReady = count == self.queue.maxlen #start indicator calculation if self.IsReady: y = np.flipud(np.log(self.queue)) x = np.arange(len(y)) slope, intercept, r_value, p_value, std_err = stats.linregress(x, y) # Annualized exponential regression (ratio^252) x correlation self.Value = ((1+slope)**252)*(r_value**2) return self.IsReady def Warmup(self,history): for index, row in history.loc[self.symbol].iterrows(): self.UpdateFromParameters(index, row['close'])
from collections import deque from datetime import datetime, timedelta import numpy as np from scipy import stats import pandas as pd from QuantConnect.Algorithm.Framework.Alphas import * class MomentumAlphaModel(AlphaModel): def __init__(self, resolution, momentumScorePeriod, selectionSize, selectionThreshold, STDperiod, referenceSMAperiod, referenceTicker, referenceSMA): self.resolution = resolution self.symbolDataBySymbol = {} self.monthTracking = -1 self.momentumScorePeriod = momentumScorePeriod self.selectionSize = selectionSize self.selectionThreshold = selectionThreshold self.lastInsightsSelection = [] self.STDperiod = STDperiod self.referenceSMAperiod = referenceSMAperiod self.referenceTicker = referenceTicker self.referenceSMA = referenceSMA self.referenceSMAisSet = False def Update(self, algorithm, data): insights = [] momentumSelection = [] insightsSelection = [] if algorithm.Time.month != self.monthTracking: # Monthly trading rules for symbol, symbolData in self.symbolDataBySymbol.items(): if data.ContainsKey(symbolData.Symbol): if data[symbolData.Symbol] is None: continue else: #Check of lastSelection momentum score is still above threshold for lastSelection in self.lastInsightsSelection: if lastSelection[0] == symbolData: if symbolData.MomentumScore.Value < self.selectionThreshold: self.lastInsightsSelection.remove(lastSelection) else: # Update lastSelection weighting if (symbolData.STD.Current.Value/data[symbolData.Symbol].Close) != 0: lastSelection[2] = 1 / (symbolData.STD.Current.Value/data[symbolData.Symbol].Close) # selectedStock.append(self.riskFactor*data[symbolData.Symbol].Close/symbolData.ATR.Current.Value) if (symbolData.STD.Current.Value/data[symbolData.Symbol].Close) != 0: momentumSelection.append([symbolData, symbolData.MomentumScore.Value, 1 / (symbolData.STD.Current.Value/data[symbolData.Symbol].Close)]) #Rank and keep only the selection size momentumSelection = sorted(momentumSelection, key=lambda x: x[1], reverse=True)[:self.selectionSize] # Fill insights selection with previous stocks in portfolio for lastSelection in self.lastInsightsSelection: insightsSelection.append(lastSelection) # Buy new stocks if reference close > reference SMA if data.ContainsKey(self.referenceSymbolData.Symbol): if data[self.referenceSymbolData.Symbol] is not None: if data[self.referenceSymbolData.Symbol].Close > self.referenceSymbolData.referenceSMA.Current.Value: for selectedStock in momentumSelection: if len(insightsSelection) < self.selectionSize: insightsSelection.append(selectedStock) else: break for insightSelection in insightsSelection: insights.append(Insight.Price(insightSelection[0].Symbol, timedelta(days=1), InsightDirection.Up, None, 1.00, None, insightSelection[2])) self.lastInsightsSelection = insightsSelection # update month tracking with the current month self.monthTracking = algorithm.Time.month return insights def OnSecuritiesChanged(self, algorithm, changes): # Initialize referenceTicker SMA once if self.referenceSMAisSet == False: history = algorithm.History([self.referenceTicker], self.referenceSMAperiod, self.resolution) tickers = history.index.levels[0] for ticker in tickers: symbol = SymbolCache.GetSymbol(ticker) if symbol not in self.symbolDataBySymbol: symbolData = SymbolData(symbol, self.momentumScorePeriod, self.STDperiod, self.referenceSMAperiod, self.referenceTicker) self.symbolDataBySymbol[symbol] = symbolData self.referenceSymbolData = symbolData symbolData.RegisterIndicators(algorithm, self.resolution) symbolData.WarmUpIndicators(history, ticker, symbol) self.referenceSMAisSet = True # Initialize data for added securities addedSymbols = [x.Symbol for x in changes.AddedSecurities] if len(addedSymbols) == 0: return else: history = algorithm.History(addedSymbols, max(self.momentumScorePeriod, self.STDperiod), self.resolution) tickers = history.index.levels[0] for ticker in tickers: symbol = SymbolCache.GetSymbol(ticker) if symbol not in self.symbolDataBySymbol: symbolData = SymbolData(symbol, self.momentumScorePeriod, self.STDperiod, self.referenceSMAperiod, self.referenceTicker) self.symbolDataBySymbol[symbol] = symbolData symbolData.RegisterIndicators(algorithm, self.resolution) symbolData.WarmUpIndicators(history, ticker, symbol) # clean up data for removed securities for removed in changes.RemovedSecurities: symbolData = self.symbolDataBySymbol.pop(removed.Symbol, None) if symbolData is not None: symbolData.RemoveConsolidators(algorithm) class SymbolData: '''Contains data specific to a symbol required by this model''' def __init__(self, symbol, momentumScorePeriod, STDperiod, referenceSMAperiod, referenceTicker): self.Symbol = symbol self.MomentumScore = MomentumScore('MomentumScore', symbol, momentumScorePeriod) self.MomentumScoreConsolidator = None self.STD = StandardDeviation(STDperiod) self.STDConsolidator = None self.referenceSMA = SimpleMovingAverage(referenceSMAperiod) self.referenceTicker = referenceTicker self.referenceSMAConsolidator = None def RegisterIndicators(self, algorithm, resolution): self.MomentumScoreConsolidator = algorithm.ResolveConsolidator(self.Symbol, resolution) algorithm.RegisterIndicator(self.Symbol, self.MomentumScore, self.MomentumScoreConsolidator) self.STDConsolidator = algorithm.ResolveConsolidator(self.Symbol, resolution) algorithm.RegisterIndicator(self.Symbol, self.STD, self.STDConsolidator) if self.Symbol.Value == self.referenceTicker: self.referenceSMAConsolidator = algorithm.ResolveConsolidator(self.Symbol, resolution) algorithm.RegisterIndicator(self.Symbol, self.referenceSMA, self.referenceSMAConsolidator, Field.Close) def WarmUpIndicators(self, history, ticker, symbol): self.MomentumScore.Warmup(history) for tuple in history.loc[ticker].itertuples(): self.STD.Update(tuple.Index, tuple.close) if self.Symbol.Value == self.referenceTicker: for tuple in history.loc[ticker].itertuples(): self.referenceSMA.Update(tuple.Index, tuple.close) def RemoveConsolidators(self, algorithm): if self.MomentumScoreConsolidator is not None: algorithm.SubscriptionManager.RemoveConsolidator(self.Symbol, self.MomentumScoreConsolidator) if self.STDConsolidator is not None: algorithm.SubscriptionManager.RemoveConsolidator(self.Symbol, self.STDConsolidator) # Define custom indicator class MomentumScore: def __init__(self, name, symbol, period): self.symbol = symbol self.Name = name self.Time = datetime.min self.Value = 0 self.IsReady = False self.queue = deque(maxlen=period) self.queueTime = deque(maxlen=period) self.CurrentReturn = 0 # required update method def Update(self, input): return self.UpdateFromParameters(input.Time, input.Close) # update called for history warmup def UpdateFromParameters(self, time, value): self.queue.appendleft(value) if not len(self.queue) == self.queue.maxlen: return False self.queueTime.appendleft(time) self.Time = time count = len(self.queue) self.IsReady = count == self.queue.maxlen #start indicator calculation if self.IsReady: y = np.flipud(np.log(self.queue)) x = np.arange(len(y)) slope, intercept, r_value, p_value, std_err = stats.linregress(x, y) # Annualized exponential regression (ratio^252) x correlation self.Value = (np.power(np.exp(slope), 252)-1) * 100 * (r_value**2) return self.IsReady def Warmup(self,history): for index, row in history.loc[self.symbol].iterrows(): self.UpdateFromParameters(index, row['close'])
from QuantConnect.Data.UniverseSelection import * from Selection.FundamentalUniverseSelectionModel import FundamentalUniverseSelectionModel class UniverseSelectionModel(FundamentalUniverseSelectionModel): def __init__(self, filterFineData = True, universeSettings = None, securityInitializer = None): super().__init__(filterFineData, universeSettings, securityInitializer) self.minimumPrice = 5 self.numberOfSymbolsCoarse = 1000 self.monthTracking = -1 def SelectCoarse(self, algorithm, coarse): if algorithm.Time.month == self.monthTracking: return Universe.Unchanged filter_dollar_price = [x for x in coarse if x.HasFundamentalData and x.Price > self.minimumPrice] # filter_dollar_price = [x for x in coarse if x.Symbol.Value == "TSLA" and x.HasFundamentalData and x.Price > self.minimumPrice] sorted_dollar_volume = sorted([x for x in filter_dollar_price if x.HasFundamentalData], key=lambda x: x.DollarVolume, reverse=True) self.monthTracking = algorithm.Time.month return [x.Symbol for x in sorted_dollar_volume[:self.numberOfSymbolsCoarse]] def SelectFine(self, algorithm, fine): return [x.Symbol for x in fine]