Overall Statistics |
Total Trades 6 Average Win 0.00% Average Loss 0.00% Compounding Annual Return -78.366% Drawdown 3.600% Expectancy 0.185 Net Profit -2.092% Sharpe Ratio -3.729 Probabilistic Sharpe Ratio 6.582% Loss Rate 50% Win Rate 50% Profit-Loss Ratio 1.37 Alpha -0.462 Beta 0.903 Annual Standard Deviation 0.196 Annual Variance 0.039 Information Ratio -9.202 Tracking Error 0.047 Treynor Ratio -0.811 Total Fees $83.08 Estimated Strategy Capacity $53000.00 Lowest Capacity Asset RFV TGRALZT9E5ID |
#region imports from AlgorithmImports import * #endregion class ConstantEtfAlphaModel(AlphaModel): # Constant Alpha Model for ETFs def __init__(self, etfTickers): self.Name = 'ETF' self.etfSymbols = [Symbol.Create(ticker, SecurityType.Equity, Market.USA) for ticker in etfTickers] self.nextUpdateTime = datetime.min def Update(self, algorithm, data): insights = [] if algorithm.Time < self.nextUpdateTime: return insights expiry = Expiry.EndOfYear(algorithm.Time) for symbol in self.etfSymbols: if symbol not in data.Bars: continue insights.append(Insight.Price(symbol, expiry, InsightDirection.Up)) if all([algorithm.Portfolio[symbol].TotalSaleVolume > 0 for symbol in self.etfSymbols]): self.nextUpdateTime = expiry return insights def OnSecuritiesChanged(self, algorithm, changes): pass
#region imports from AlgorithmImports import * #endregion import operator import numpy as np from collections import namedtuple import functools class MomentumAlphaModel(AlphaModel): def __init__(self, algorithm, numberOfMonthsUntilFullyInvested, maxNumberOfStocksToHold_Momentum, maxNumberOfStocksToHold_Momentum_VolatilityCondition, numberOfTradingDaysInvestedIfInProfit, numberOfTradingDaysInvestedIfInLoss, minMomentumPercent, riskManagementEnabled): self.Name = "Momentum" self.symbolDataDict = {} self.nextUpdateTime = datetime.min self.numberOfMonthsUntilFullyInvested = numberOfMonthsUntilFullyInvested self.maxNumberOfStocksToHold = maxNumberOfStocksToHold_Momentum self.numberOfStocksToAddPerMonth = int(maxNumberOfStocksToHold_Momentum/numberOfMonthsUntilFullyInvested) insightDuration = namedtuple('InsightDuration', 'Profit, Loss') self.insightDuration = insightDuration(Profit=int(365/252*numberOfTradingDaysInvestedIfInProfit), Loss=int(365/252*numberOfTradingDaysInvestedIfInLoss)) self.minMomentumPercent = minMomentumPercent self.IsInitRun = True self.monthsSinceStart = 0 self.month = None self.scheduledEvent = None self.pcm = algorithm.pcm self.riskModel = None self.riskManagementEnabled = riskManagementEnabled self.insightCollection = InsightCollection() def Update(self, algorithm, data): insights = [] # check if it's time to emit insights if algorithm.Time < self.nextUpdateTime: return insights # if this is the first run and we're in live mode, we must load the insights from the object store if self.IsInitRun: insightsFromObjectStore = self.LoadInsightsFromObjectStore(algorithm) insights.extend(insightsFromObjectStore) self.monthsSinceStart += int(len(insightsFromObjectStore)/self.numberOfStocksToAddPerMonth) self.IsInitRun = False if self.riskModel is not None and self.riskModel.safeModeExpiryTime > algorithm.Time: insights.extend(self.AdjustInsightExpiryTimesForActiveMomentumHoldings(algorithm)) return insights # check if we can add more holdings for this strategy targetCount = min(self.maxNumberOfStocksToHold, self.monthsSinceStart*self.numberOfStocksToAddPerMonth) count = len([symbol for symbol, symbolData in self.symbolDataDict.items() if symbolData.Holdings.Invested and symbolData.InsightExpiry >= algorithm.Time]) if count > targetCount - self.numberOfStocksToAddPerMonth: self.nextUpdateTime = self.GetNextUpdateTime(algorithm) insights.extend(self.AdjustInsightExpiryTimesForActiveMomentumHoldings(algorithm)) return insights if self.month != algorithm.Time.month: # create the ranking (Momentum & Kaufman Efficiency Ratio) and select the best N stocks where we're not invested yet; default is N=2 filteredSymbolDataDict = {symbol : symbolData for symbol, symbolData in self.symbolDataDict.items() if (symbol in data.Bars and symbolData.IsReady and symbolData.Momentum > self.minMomentumPercent and (not symbolData.Holdings.Invested))} sortedByMomentum = [*map(operator.itemgetter(0), sorted(filteredSymbolDataDict.items(), key = lambda x: x[1].Momentum, reverse=True))] sortedByKER = [*map(operator.itemgetter(0), sorted(filteredSymbolDataDict.items(), key = lambda x: x[1].KER ))] totalRank = {symbol : sortedByMomentum.index(symbol) + sortedByKER.index(symbol) for symbol, _ in filteredSymbolDataDict.items()} sortedByTotalRank = sorted(totalRank.items(), key = lambda x: x[1]) selection = [x[0] for x in sortedByTotalRank if not algorithm.Portfolio[x[0]].Invested][:self.numberOfStocksToAddPerMonth] if len(selection) >= self.numberOfStocksToAddPerMonth and count >= targetCount: self.month = algorithm.Time.month elif len(selection) == 0: insights.extend(self.AdjustInsightExpiryTimesForActiveMomentumHoldings(algorithm)) return insights # create insights for the selected stocks for symbol in selection: symbolData = self.symbolDataDict[symbol] if not algorithm.Portfolio[symbol].Invested: symbolData.InsightExpiry = algorithm.Time + timedelta(days=min(self.insightDuration)) insights.append(Insight.Price(symbol, symbolData.InsightExpiry, InsightDirection.Up)) count += 1 # adjust the time for nextInsightUpdate to avoid unnecessary computations if len(insights) >= self.numberOfStocksToAddPerMonth or (count >=targetCount): self.nextUpdateTime = self.GetNextUpdateTime(algorithm) insights.extend(self.AdjustInsightExpiryTimesForActiveMomentumHoldings(algorithm)) if algorithm.LiveMode: self.insightCollection.AddRange(insights) return insights def AdjustInsightExpiryTimesForActiveMomentumHoldings(self, algorithm): # adjust insight expiry times for symbols with active insights if necessary insights = [] for symbol, symbolData in self.symbolDataDict.items(): if not symbolData.Holdings.Invested or symbolData.InsightExpiry < algorithm.Time: continue if symbolData.InsightExpiry > self.nextUpdateTime + timedelta(1): continue if (not symbolData.Flag and symbolData.Holdings.UnrealizedProfit > 0): symbolData.Flag = True td = timedelta(days = self.insightDuration.Profit - self.insightDuration.Loss) if td.days > 0: symbolData.InsightExpiry += td insights.append(Insight.Price(symbol, symbolData.InsightExpiry, InsightDirection.Up)) return insights def GetNextUpdateTime(self, algorithm): nextInsightExpiry = min([symbolData.InsightExpiry for _, symbolData in self.symbolDataDict.items() if symbolData.Holdings.Invested and symbolData.InsightExpiry > algorithm.Time], default=datetime.max) return min([Expiry.EndOfMonth(algorithm.Time) - timedelta(days=3), nextInsightExpiry]) def OnSecuritiesChanged(self, algorithm, changes): if self.riskManagementEnabled and self.riskModel is None and self.pcm.riskModel is not None: self.riskModel = self.pcm.riskModel # tracks the changes in our universe of securities if self.scheduledEvent is None and algorithm.Benchmark is not None: symbol = algorithm.Benchmark.Security.Symbol self.scheduledEvent = algorithm.Schedule.On(algorithm.DateRules.MonthEnd(symbol, 1), algorithm.TimeRules.Midnight, functools.partial(self.PersistInsightCollection, algorithm) ) for security in changes.RemovedSecurities: symbolData = self.symbolDataDict.pop(security.Symbol, None) if symbolData is not None: symbolData.Dispose(algorithm) for security in changes.AddedSecurities: if not security.IsTradable or security.Fundamentals is None: continue symbol = security.Symbol if symbol not in self.symbolDataDict: self.symbolDataDict[symbol] = SymbolData(algorithm, security) def PersistInsightCollection(self, algorithm): # we also use this scheduled event handler to adjust self.monthsSinceStart self.monthsSinceStart += 1 # stores all active insights from this alpha model in an object store if not algorithm.LiveMode: return activeInsights = self.insightCollection.GetActiveInsights(algorithm.UtcTime) insightDict = {insight.Id : [insight.Symbol, insight.Period, insight.Direction, insight.SourceModel, insight.GeneratedTimeUtc, insight.CloseTimeUtc] for insight in activeInsights} df = pd.DataFrame(insightDict.values(), index=insightDict.keys(), columns=['Symbol','Period','Direction','SourceModel','GeneratedTimeUtc','CloseTimeUtc']) algorithm.ObjectStore.Save(str(algorithm.ProjectId) + "/" + self.Name + "InsightCollection", df.to_json(orient='split', default_handler = str)) def LoadInsightsFromObjectStore(self, algorithm): # loads all insights from the object store if we are in live mode # ensures we track the insight timings correctly between redeployments if not algorithm.LiveMode: return [] insights = [] insightCollection = InsightCollection() key = str(algorithm.ProjectId) + "/" + self.Name + "InsightCollection" if not algorithm.ObjectStore.ContainsKey(key): return insights storedInsightsDf = pd.read_json(algorithm.ObjectStore.Read(key), orient='split') storedInsightsDf.loc[:, 'Period'] = pd.to_timedelta(storedInsightsDf.Period, unit='ms') storedInsightsDf.loc[:, 'GeneratedTimeUtc'] = pd.to_datetime(storedInsightsDf.GeneratedTimeUtc, unit='ms', utc=True) storedInsightsDf.loc[:, 'CloseTimeUtc'] = pd.to_datetime(storedInsightsDf.CloseTimeUtc, unit='ms', utc=True) filteredInsightsDf = storedInsightsDf.loc[(storedInsightsDf.SourceModel.eq(self.Name) & storedInsightsDf.CloseTimeUtc.gt(algorithm.UtcTime) & storedInsightsDf.Direction.eq(1) & storedInsightsDf.GeneratedTimeUtc.lt(algorithm.UtcTime))] for row in filteredInsightsDf.itertuples(): symbol = SymbolCache.GetSymbol(row.Symbol) if symbol not in self.symbolDataDict: if symbol not in algorithm.ActiveSecurities.Keys: continue security = algorithm.Securities[symbol] if not (security.IsTradable and symbol in algorithm.CurrentSlice.Bars): continue self.symbolDataDict[symbol] = SymbolData(algorithm, security) self.symbolDataDict[symbol].InsightExpiry = row.CloseTimeUtc.to_pydatetime() insight = Insight.Price(symbol, row.CloseTimeUtc.to_pydatetime(), row.Direction) insights.append(insight) return insights class SymbolData: def __init__(self, algorithm, security): self.algorithm = algorithm self.Security = security self.Symbol = security.Symbol self.Holdings = security.Holdings self.InsightExpiry = datetime.min self.Flag = False self.SetupIndicators(algorithm) def SetupIndicators(self, algorithm): resolution = Resolution.Daily period = self.Security.Exchange.TradingDaysPerYear delay_period = int(period/12) self.momp = MomentumPercent(period) self.momentum = IndicatorExtensions.Of(Delay(delay_period), self.momp) algorithm.RegisterIndicator(self.Symbol, self.momp, resolution) algorithm.WarmUpIndicator(self.Symbol, self.momp, resolution) self.ker = KaufmanEfficiencyRatio(period) self.ker = IndicatorExtensions.Of(Delay(delay_period), self.ker) algorithm.RegisterIndicator(self.Symbol, self.ker, resolution) algorithm.WarmUpIndicator(self.Symbol, self.ker, resolution) self.Consolidator = algorithm.ResolveConsolidator(self.Symbol, resolution) def Dispose(self, algorithm): algorithm.SubscriptionManager.RemoveConsolidator(self.Symbol, self.Consolidator) @property def Momentum(self): return self.momentum.Current.Value @property def KER(self): return self.ker.Current.Value @property def IsReady(self): return self.momp.IsReady and self.ker.IsReady class RiskModel: def __init__(self, algorithm, security, momentumBudget, momentumBudgetVolCond, valueBudget, valueBudgetVolCond, riskManagementEnabled, maxDrawdown, maxVIX, numberOfTradingDaysBeforeReset): self.algorithm = algorithm self.Security = security self.Symbol = security.Symbol self.portfolioHigh = algorithm.Portfolio.TotalPortfolioValue self.SafeMode = False self.safeModeExpiryTime = datetime.min self.momentumBudget = momentumBudget self.momentumBudgetDefault = momentumBudget self.momentumBudgetVolCond = momentumBudgetVolCond self.valueBudget = valueBudget self.valueBudgetDefault = valueBudget self.valueBudgetVolCond = valueBudgetVolCond self.riskManagementEnabled = riskManagementEnabled self.maxDrawdown = -abs(maxDrawdown) self.maxVIX = maxVIX self.numberOfTradingDaysBeforeReset = timedelta(days=numberOfTradingDaysBeforeReset) self.budgetBySourceModel = defaultdict(float) self.budgetBySourceModel.update({'Momentum':momentumBudget, 'Value':valueBudget}) self.hour = None algorithm.Schedule.On(algorithm.DateRules.EveryDay(self.Symbol), algorithm.TimeRules.Every(timedelta(hours=1)), self.Update) @property def BudgetBySourceModel(self): if not self.riskManagementEnabled: return self.budgetBySourceModel self.budgetBySourceModel.update({"Momentum":self.momentumBudget, "Value":self.valueBudget}) return self.budgetBySourceModel def Update(self): if not self.SafeMode and self.algorithm.Time > self.safeModeExpiryTime: self.momentumBudget = self.momentumBudgetDefault self.valueBudget = self.valueBudgetDefault self.SafeMode = self.Security.Price > self.maxVIX and self.CurrentDrawdown < self.maxDrawdown if self.SafeMode: self.momentumBudget = self.momentumBudgetVolCond self.valueBudget = self.valueBudgetVolCond self.algorithm.Debug(f"{self.algorithm.Time} | ACHTUNG !!! Volatility-Condition erfüllt!") else: if self.SafeMode and self.Security.Price < self.maxVIX and self.CurrentDrawdown > self.maxDrawdown: self.safeModeExpiryTime = self.algorithm.Time + self.numberOfTradingDaysBeforeReset self.SafeMode = False self.algorithm.Debug(f"{self.algorithm.Time} | Volatility-Condition wieder aufgehoben!") self.algorithm.Debug(f"Safe-Mode verfällt am {self.safeModeExpiryTime}.") @property def CurrentDrawdown(self): currentTPV = self.algorithm.Portfolio.TotalPortfolioValue if currentTPV > self.portfolioHigh: self.portfolioHigh = currentTPV return currentTPV/self.portfolioHigh - 1
#region imports from AlgorithmImports import * #endregion from collections import namedtuple import operator import functools class ValueAlphaModel(AlphaModel): def __init__(self, algorithm, numberOfMonthsUntilFullyInvested, maxNumberOfStocksToHold_Value, maxNumberOfStocksToHold_Value_VolatilityCondition, min_ROA, min_PE_Ratio, numberOfTradingDaysInvestedIfInProfit, numberOfTradingDaysInvestedIfInLoss, riskManagementEnabled): self.Name = "Value" self.symbolDataDict = {} self.nextUpdateTime = datetime.min self.numberOfMonthsUntilFullyInvested = numberOfMonthsUntilFullyInvested self.maxNumberOfStocksToHold = maxNumberOfStocksToHold_Value self.maxNumberOfStocksToHold_Value_VolatilityCondition = maxNumberOfStocksToHold_Value_VolatilityCondition self.numberOfStocksToAddPerMonth = int(maxNumberOfStocksToHold_Value/numberOfMonthsUntilFullyInvested) self.min_ROA = min_ROA self.min_PE_Ratio = min_PE_Ratio insightDuration = namedtuple('InsightDuration', 'Profit, Loss') self.insightDuration = insightDuration(Profit=int(365/252*numberOfTradingDaysInvestedIfInProfit), Loss=int(365/252*numberOfTradingDaysInvestedIfInLoss)) self.monthsSinceStart = 0 self.month = None self.IsInitRun = True self.scheduledEvent = None self.insightCollection = InsightCollection() self.pcm = algorithm.pcm self.riskModel = None self.riskManagementEnabled = riskManagementEnabled def Update(self, algorithm, data): insights = [] # check if it's time to emit insights if algorithm.Time < self.nextUpdateTime: return insights # if this is the first run and we're in live mode, we must load the insights from the object store if self.IsInitRun: insightsFromObjectStore = self.LoadInsightsFromObjectStore(algorithm) insights.extend(insightsFromObjectStore) self.monthsSinceStart += int(len(insightsFromObjectStore)/self.numberOfStocksToAddPerMonth) self.IsInitRun = False targetCount = min(self.maxNumberOfStocksToHold, self.monthsSinceStart*self.numberOfStocksToAddPerMonth) # adjust the target holdings count if volatility condition is met if self.riskManagementEnabled and self.riskModel is not None and (self.riskModel.SafeMode or self.riskModel.safeModeExpiryTime > algorithm.Time): targetCount = self.riskModel.NumberOfStocksBySourceModel[self.Name] count = len([symbol for symbol, symbolData in self.symbolDataDict.items() if symbolData.Holdings.Invested and symbolData.InsightExpiry >= algorithm.Time]) # check if we can add more holdings for this strategy if count > targetCount - self.numberOfStocksToAddPerMonth: self.nextUpdateTime = self.GetNextUpdateTime(algorithm) insights.extend(self.AdjustInsightExpiryTimesForActiveValueHoldings(algorithm)) return insights if self.month != algorithm.Time.month: # create ranking for this strategy (based on EVtoEBIT) filteredSymbolDataDict = {symbol : symbolData for symbol, symbolData in self.symbolDataDict.items() if (symbol in data.Bars and symbolData.ROA > self.min_ROA and symbolData.PE_Ratio > self.min_PE_Ratio and (not symbolData.Holdings.Invested))} sortedByEVToEBIT = sorted(filteredSymbolDataDict.items(), key = lambda x: x[1].EVtoEBIT) selection = [*map(operator.itemgetter(0), sortedByEVToEBIT)][:self.numberOfStocksToAddPerMonth] if len(selection) >= self.numberOfStocksToAddPerMonth and count >= targetCount: self.month = algorithm.Time.month elif len(selection) == 0: insights.extend(self.AdjustInsightExpiryTimesForActiveValueHoldings(algorithm)) return insights # create insights for the selected stocks for symbol in selection: symbolData = self.symbolDataDict[symbol] if not algorithm.Portfolio[symbol].Invested: symbolData.InsightTime = algorithm.Time symbolData.InsightExpiry = algorithm.Time + timedelta(days=min(self.insightDuration)) insights.append(Insight.Price(symbol, symbolData.InsightExpiry, InsightDirection.Up)) count +=1 # adjust the time for nextInsightUpdate to avoid unnecessary computations if len(insights) >= self.numberOfStocksToAddPerMonth or (count >=targetCount): self.nextUpdateTime = self.GetNextUpdateTime(algorithm) insights.extend(self.AdjustInsightExpiryTimesForActiveValueHoldings(algorithm)) if algorithm.LiveMode: self.insightCollection.AddRange(insights) return insights def AdjustInsightExpiryTimesForActiveValueHoldings(self, algorithm): # adjust insight expiry times for symbols with active insights if necessary insights = [] for symbol, symbolData in self.symbolDataDict.items(): if not symbolData.Holdings.Invested or symbolData.InsightExpiry < algorithm.Time: continue if symbolData.InsightExpiry > self.nextUpdateTime + timedelta(1): continue if (not symbolData.Flag and symbolData.Holdings.UnrealizedProfit > 0): symbolData.Flag = True td = timedelta(days = self.insightDuration.Profit - self.insightDuration.Loss) if td.days > 0: symbolData.InsightExpiry += td insights.append(Insight.Price(symbol, symbolData.InsightExpiry, InsightDirection.Up)) return insights def GetNextUpdateTime(self, algorithm): nextInsightExpiry = min([symbolData.InsightExpiry for _, symbolData in self.symbolDataDict.items() if symbolData.Holdings.Invested and symbolData.InsightExpiry > algorithm.Time], default=datetime.max) return min([Expiry.EndOfMonth(algorithm.Time) - timedelta(days=3), nextInsightExpiry]) def OnSecuritiesChanged(self, algorithm, changes): if self.riskManagementEnabled and self.riskModel is None and self.pcm.riskModel is not None: self.riskModel = self.pcm.riskModel if self.scheduledEvent is None and algorithm.Benchmark is not None: symbol = algorithm.Benchmark.Security.Symbol self.scheduledEvent = algorithm.Schedule.On(algorithm.DateRules.MonthEnd(symbol, 1), algorithm.TimeRules.Midnight, functools.partial(self.PersistInsightCollection, algorithm) ) for security in changes.RemovedSecurities: symbolData = self.symbolDataDict.pop(security.Symbol, None) if symbolData is not None: symbolData.Dispose(algorithm) for security in changes.AddedSecurities: if not security.IsTradable or security.Fundamentals is None: continue symbol = security.Symbol if symbol not in self.symbolDataDict: self.symbolDataDict[symbol] = SymbolData(algorithm, security) def PersistInsightCollection(self, algorithm): # we also use this scheduled event handler to adjust self.monthsSinceStart self.monthsSinceStart += 1 # stores all active insights from this alpha model in an object store if not algorithm.LiveMode: return activeInsights = self.insightCollection.GetActiveInsights(algorithm.UtcTime) insightDict = {insight.Id : [insight.Symbol, insight.Period, insight.Direction, insight.SourceModel, insight.GeneratedTimeUtc, insight.CloseTimeUtc] for insight in activeInsights} df = pd.DataFrame(insightDict.values(), index=insightDict.keys(), columns=['Symbol','Period','Direction','SourceModel','GeneratedTimeUtc','CloseTimeUtc']) algorithm.ObjectStore.Save(str(algorithm.ProjectId) + "/" + self.Name + "InsightCollection", df.to_json(orient='split', default_handler = str)) def LoadInsightsFromObjectStore(self, algorithm): if not algorithm.LiveMode: return [] insights = [] insightCollection = InsightCollection() key = str(algorithm.ProjectId) + "/" + self.Name + "InsightCollection" if not algorithm.ObjectStore.ContainsKey(key): return [] storedInsightsDf = pd.read_json(algorithm.ObjectStore.Read(key), orient='split') storedInsightsDf.loc[:, 'Period'] = pd.to_timedelta(storedInsightsDf.Period, unit='ms') storedInsightsDf.loc[:, 'GeneratedTimeUtc'] = pd.to_datetime(storedInsightsDf.GeneratedTimeUtc, unit='ms', utc=True) storedInsightsDf.loc[:, 'CloseTimeUtc'] = pd.to_datetime(storedInsightsDf.CloseTimeUtc, unit='ms', utc=True) filteredInsightsDf = storedInsightsDf.loc[(storedInsightsDf.SourceModel.eq(self.Name) & storedInsightsDf.CloseTimeUtc.gt(algorithm.UtcTime) & storedInsightsDf.Direction.eq(1) & storedInsightsDf.GeneratedTimeUtc.lt(algorithm.UtcTime))] for row in filteredInsightsDf.itertuples(): symbol = SymbolCache.GetSymbol(row.Symbol) if symbol not in self.symbolDataDict: security = algorithm.Securities[symbol] if not (security.IsTradable and symbol in algorithm.CurrentSlice.Bars): continue self.symbolDataDict[symbol] = SymbolData(algorithm, security) self.symbolDataDict[symbol].InsightExpiry = row.CloseTimeUtc.to_pydatetime() insight = Insight.Price(symbol, row.CloseTimeUtc.to_pydatetime(), row.Direction) insights.append(insight) return insights class SymbolData: def __init__(self, algorithm, security): self.algorithm = algorithm self.Security = security self.Symbol = security.Symbol self.Holdings = security.Holdings self.InsightExpiry = datetime.min self.Flag = False self.InsightTime = datetime.min self.Consolidator = algorithm.ResolveConsolidator(self.Symbol, Resolution.Daily) def Dispose(self, algorithm): algorithm.SubscriptionManager.RemoveConsolidator(self.Symbol, self.Consolidator) @property def ROA(self): return self.Security.Fundamentals.OperationRatios.ROA.OneYear @property def PE_Ratio(self): return self.Security.Fundamentals.ValuationRatios.PERatio @property def EVtoEBIT(self): return self.Security.Fundamentals.ValuationRatios.EVtoEBIT class RiskModel: def __init__(self, algorithm, security, momentumBudget, momentumBudgetVolCond, valueBudget, valueBudgetVolCond, riskManagementEnabled, maxDrawdown, maxVIX, numberOfTradingDaysBeforeReset): self.algorithm = algorithm self.Security = security self.Symbol = security.Symbol self.portfolioHigh = algorithm.Portfolio.TotalPortfolioValue self.SafeMode = False self.safeModeExpiryTime = datetime.min self.momentumBudget = momentumBudget self.momentumBudgetDefault = momentumBudget self.momentumBudgetVolCond = momentumBudgetVolCond self.valueBudget = valueBudget self.valueBudgetDefault = valueBudget self.valueBudgetVolCond = valueBudgetVolCond self.riskManagementEnabled = riskManagementEnabled self.maxDrawdown = -abs(maxDrawdown) self.maxVIX = maxVIX self.numberOfTradingDaysBeforeReset = timedelta(days=numberOfTradingDaysBeforeReset) self.budgetBySourceModel = defaultdict(float) self.budgetBySourceModel.update({'Momentum':momentumBudget, 'Value':valueBudget}) self.hour = None algorithm.Schedule.On(algorithm.DateRules.EveryDay(self.Symbol), algorithm.TimeRules.Every(timedelta(hours=1)), self.Update) @property def BudgetBySourceModel(self): if not self.riskManagementEnabled: return self.budgetBySourceModel self.budgetBySourceModel.update({"Momentum":self.momentumBudget, "Value":self.valueBudget}) return self.budgetBySourceModel def Update(self): if not self.SafeMode and self.algorithm.Time > self.safeModeExpiryTime: self.momentumBudget = self.momentumBudgetDefault self.valueBudget = self.valueBudgetDefault self.SafeMode = self.Security.Price > self.maxVIX and self.CurrentDrawdown < self.maxDrawdown if self.SafeMode: self.momentumBudget = self.momentumBudgetVolCond self.valueBudget = self.valueBudgetVolCond self.algorithm.Debug(f"{self.algorithm.Time} | ACHTUNG !!! Volatility-Condition erfüllt!") else: if self.SafeMode and self.Security.Price < self.maxVIX and self.CurrentDrawdown > self.maxDrawdown: self.safeModeExpiryTime = self.algorithm.Time + self.numberOfTradingDaysBeforeReset self.SafeMode = False self.algorithm.Debug(f"{self.algorithm.Time} | Volatility-Condition wieder aufgehoben!") self.algorithm.Debug(f"Safe-Mode verfällt am {self.safeModeExpiryTime}.") @property def CurrentDrawdown(self): currentTPV = self.algorithm.Portfolio.TotalPortfolioValue if currentTPV > self.portfolioHigh: self.portfolioHigh = currentTPV return currentTPV/self.portfolioHigh - 1
#region imports from AlgorithmImports import * #endregion ############ # def GetLatestLivePortfolioStateFromObjectStore(self, algorithm): # if not ( self.IsInitRun and algorithm.ObjectStore.ContainsKey('PortfolioState')): #algorithm.LiveMode and ## <--- wieder hinzufügen!!! # return [] # portfolioState = pd.read_json(algorithm.ObjectStore.Read('PortfolioState')) # if portfolioState is None or portfolioState.empty: # return [] # portfolioState.loc[:,'Entry'] = pd.to_datetime(portfolioState.Entry.clip(lower=0).replace(0, np.nan), unit='ms') # portfolioState.loc[:, 'Exit'] = pd.to_datetime(portfolioState.Exit.clip(lower=0).replace(0, np.nan), unit='ms') # previousLiveHoldings = portfolioState.loc[portfolioState.Exit.isnull()].index.map(SymbolCache.GetSymbol).tolist() # self.IsInitRun = False # return previousLiveHoldings # self.positionChangesDict = defaultdict(PortfolioSnapshot) # self.Schedule.On(self.DateRules.EveryDay(), self.TimeRules.At(0,0), self.UpdateCharts) # def UpdateCharts(self): # numHoldings = len([holding for _, holding in self.Portfolio.items() if holding.Invested]) # self.Plot('Portfolio Holdings', 'Number of Holdings', numHoldings) # def OnOrderEvent(self, orderEvent): # # if not self.LiveMode: # # return # if orderEvent.Status == OrderStatus.Filled: # symbol = orderEvent.Symbol # if not self.Portfolio[symbol].Invested and symbol in self.positionChangesDict: # # das heißt es handelt sich hierbei um einen Exit # self.positionChangesDict.update({symbol : self.positionChangesDict[symbol]._replace(Exit = self.Time, Quantity = 0)}) # return # self.positionChangesDict.update({symbol : self.positionChangesDict[symbol]._replace(Entry = self.Time, Quantity = orderEvent.Quantity)}) # def OnEndOfAlgorithm(self): # if not self.LiveMode: # return # for key in list([x.Key for x in self.ObjectStore.GetEnumerator()]): # if key.endswith('InsightCollection'): # self.ObjectStore.Delete(key) # hier die positionChangedDict im ObjectStore speichern, falls LiveMode # if not self.LiveMode: # return # if self.ObjectStore.ContainsKey('PortfolioState'): # self.ObjectStore.Delete('PortfolioState') # self.ObjectStore.Save('PortfolioState', pd.DataFrame(self.positionChangesDict.values(), index=self.positionChangesDict.keys()).to_json(default_handler=str)) ############# # def GetObjectStoreSymbols(self, algorithm): # SymbolsFromObjectStore(self, algorithm): # if not (self.IsInitRun and algorithm.ObjectStore.ContainsKey('PortfolioState')): # return 0 # portfolioState = pd.read_json(algorithm.ObjectStore.Read('PortfolioState')) # if portfolioState is None or portfolioState.empty: # return 0 # portfolioState.loc[:, 'Entry'] = pd.to_datetime(portfolioState.Entry.clip(lower=0).replace(0, np.nan), unit='ms') # portfolioState.loc[:, 'Exit'] = pd.to_datetime(portfolioState.Exit.clip(lower=0).replace(0, np.nan), unit='ms') # latestPortfolioState = portfolioState.loc[portfolioState.Exit.isnull() & portfolioState.Quantity.gt(0)] \ # .index.map(SymbolCache.GetSymbol).tolist() # # .apply(lambda x: PortfolioSnapshot(Entry = x.Entry, Exit = x.Exit, Quantity = x.Quantity), axis=1) \ # # .rename(lambda x: SymbolCache.GetSymbol(x)) \ # # .to_dict() # self.IsInitRun = False # return latestPortfolioState # return len([symbol for symbol in latestPortfolioState if algorithm.Portfolio[symbol].Invested]) ##################### ######### # currentTPV = self.algorithm.Portfolio.TotalPortfolioValue # if currentTPV > self.portfolioHigh: # self.portfolioHigh = currentTPV # drawdown = currentTPV/self.portfolioHigh - 1 # # targets.extend(self.CreateTargetsForLivePortfolioStateRecovery(algorithm)) # if not self.latestLivePortfolioState: # self.latestLivePortfolioState = self.GetLatestLivePortfolioStateFromObjectStore(algorithm) # for symbol, portfolioSnapshot in list(self.latestLivePortfolioState.items()): # if algorithm.Time - portfolioSnapshot.Entry > timedelta(days=265): # target = PortfolioTarget(symbol, 0) # targets.append(target) # del self.latestLivePortfolioState[symbol] # # self.latestLivePortfolioState.pop(symbol) # continue # else: # if not algorithm.Portfolio[symbol].Invested: # target = PortfolioTarget(symbol, portfolioSnapshot.Quantity) # residualBuyingPower -= abs(target.Quantity*algorithm.Securities[symbol].AskPrice) # targets.append(target) # # if algorithm.Portfolio[symbol].Invested and (algorithm.Time - portfolioSnapshot.Entry < timedelta(days=265)): # # continue # # if not algorithm.Portfolio[symbol].Invested and (algorithm.Time - portfolioSnapshot.Entry < timedelta(days=265)): # # target = PortfolioTarget(symbol, portfolioSnapshot.Quantity) # # residualBuyingPower -= abs(target.Quantity*algorithm.Securities[symbol].Price) # # if target is None: # # self.symbolsToIgnore.add(symbol) # # continue # # targets.append(target) ### # def GetLatestLivePortfolioStateFromObjectStore(self, algorithm): # if not ( self.IsInitRun and algorithm.ObjectStore.ContainsKey('PortfolioState')): #algorithm.LiveMode and # return {} # portfolioState = pd.read_json(algorithm.ObjectStore.Read('PortfolioState')) # if portfolioState is None or portfolioState.empty: # return {} # portfolioState.loc[:, 'Entry'] = pd.to_datetime(portfolioState.Entry.clip(lower=0).replace(0, np.nan), unit='ms') # portfolioState.loc[:, 'Exit'] = pd.to_datetime(portfolioState.Exit.clip(lower=0).replace(0, np.nan), unit='ms') # # previousLiveHoldings = portfolioState.loc[portfolioState.Exit.isnull()].index.map(SymbolCache.GetSymbol).tolist() # # previousLiveHoldings = portfolioState.Quantity.rename(lambda x: SymbolCache.GetSymbol(x)).to_dict() # # exit = null means there wasn't an exit yet, i.e. we have an active position for that security # latestPortfolioState = portfolioState.loc[portfolioState.Exit.isnull() & portfolioState.Quantity.gt(0)] \ # .apply(lambda x: PortfolioSnapshot(Entry = x.Entry, Exit = x.Exit, Quantity = x.Quantity), axis=1) \ # .rename(lambda x: SymbolCache.GetSymbol(x)) \ # .to_dict() # # algorithm.ObjectStore.Delete('PortfolioState') # self.IsInitRun = False # return latestPortfolioState ############ # def GetObjectStoreRelatedSymbols(self, algorithm): # SymbolsFromObjectStore(self, algorithm): # if not (self.IsInitRun and algorithm.ObjectStore.ContainsKey('PortfolioState')): # LIVE_MODE ! # return 0 # portfolioState = pd.read_json(algorithm.ObjectStore.Read('PortfolioState')) # if portfolioState is None or portfolioState.empty: # return 0 # portfolioState.loc[:, 'Entry'] = pd.to_datetime(portfolioState.Entry.clip(lower=0).replace(0, np.nan), unit='ms') # portfolioState.loc[:, 'Exit'] = pd.to_datetime(portfolioState.Exit.clip(lower=0).replace(0, np.nan), unit='ms') # latestPortfolioState = portfolioState.loc[portfolioState.Exit.isnull() & portfolioState.Quantity.gt(0)] \ # .index.map(SymbolCache.GetSymbol).tolist() # # .apply(lambda x: PortfolioSnapshot(Entry = x.Entry, Exit = x.Exit, Quantity = x.Quantity), axis=1) \ # # .rename(lambda x: SymbolCache.GetSymbol(x)) \ # # .to_dict() # self.IsInitRun = False # return latestPortfolioState # # CHECK = len([symbol for symbol in latestPortfolioState if algorithm.Portfolio[symbol].Invested]) ###DEBUG # # return len([symbol for symbol in latestPortfolioState if algorithm.Portfolio[symbol].Invested])
#region imports from AlgorithmImports import * #endregion import math from helper_tools import sign from collections import defaultdict class CustomExecutionModel(ExecutionModel): def __init__(self): self.targetCollection = PortfolioTargetCollection() self.tol = .1 def Execute(self, algorithm, targets): self.targetCollection.AddRange(targets) if self.targetCollection.Count > 0: for target in self.targetCollection.OrderByMarginImpact(algorithm): orderQuantity = OrderSizing.GetUnorderedQuantity(algorithm, target) if orderQuantity == 0: continue symbol = target.Symbol security = algorithm.Securities[symbol] holding = algorithm.Portfolio[symbol] if security.IsTradable: marginRemaining = algorithm.Portfolio.MarginRemaining # we only need to check the remaining buying power if the order execution would increase the margin usage if not holding.Invested and holding.Quantity*orderQuantity >= 0 and marginRemaining < security.Price*abs(orderQuantity): buyingPower = min(1, security.Leverage)*marginRemaining*(1 - self.tol) orderQuantity = min(math.floor(buyingPower/security.Price), abs(orderQuantity))*sign(orderQuantity) if orderQuantity != 0 and abs(orderQuantity) >= security.SymbolProperties.LotSize: algorithm.MarketOrder(symbol, orderQuantity) self.targetCollection.ClearFulfilled(algorithm)
#region imports from AlgorithmImports import * #endregion from collections import defaultdict, namedtuple PortfolioSnapshot = namedtuple('PortfolioSnapshot', 'Entry, Exit, Quantity') PortfolioSnapshot.__new__.__defaults__ = (datetime.min, )*(len(PortfolioSnapshot._fields) - 1) + (0, ) # faster alternative for itertools.groupby def GroupBy(iterable, key = lambda x: x): d = defaultdict(list) for item in iterable: d[key(item)].append(item) return d.items() def sign(x): return 1 if x > 0 else(-1 if x < 0 else 0)
#region imports from AlgorithmImports import * #endregion from helper_tools import PortfolioSnapshot from datetime import datetime, timedelta, timezone from collections import namedtuple, defaultdict # ---------------------------------------------------------- # Import the Framework Modules for this strategy from selection import CustomUniverseSelectionModel from alpha_mom import MomentumAlphaModel from alpha_value import ValueAlphaModel from alpha_etf import ConstantEtfAlphaModel from portfolio import MultiSourcesPortfolioConstructionModel from execution import CustomExecutionModel # ---------------------------------------------------------- class MomentumValueStrategy(QCAlgorithm): def Initialize(self): ######################################################################## # -------------------------------------------------------------------- # Modifiable code block # -------------------------------------------------------------------- ######################################################################## # General Settings # -------------------- self.SetStartDate(2022, 11, 5) # self.SetEndDate(2021, 1, 1) self.SetCash(1_000_000) numberOfSymbolsCoarse = 2000 numberOfSymbolsFine = 1000 numberOfMonthsUntilFullyInvested = 8 numberOfTradingDaysInvestedIfInProfit = 260 numberOfTradingDaysInvestedIfInLoss = 260 self.SetBrokerageModel(BrokerageName.InteractiveBrokersBrokerage) self.UniverseSettings.Resolution = Resolution.Minute self.Settings.FreePortfolioValuePercentage = 0.0 self.DebugMode = False # Momentum Strategy Params # ------------------------- maxNumberOfStocksToHold_Momentum = 24 maxNumberOfStocksToHold_Momentum_VolatilityCondition = 24 minMomentumPercent = 0.1 maxBudget_Momentum = 0.5 maxBudget_Momentum_VolatilityCondition = 0.5 # Value Strategy Params # ---------------------- maxNumberOfStocksToHold_Value = 24 maxNumberOfStocksToHold_Value_VolatilityCondition = 48 min_ROA = 0.15 min_PE_Ratio = 5 maxBudget_Value = 0.5 maxBudget_Value_VolatilityCondition = 0.5 # Risk Management / Volatility Condition Params # ---------------------------------------------- riskManagementEnabled = False maxDrawdown = 0.2 maxVIX = 35 numberOfTradingDaysBeforeReset = 265 # ETF Investing Params # ------------------------ etfTickers = ["XSVM", "RFV"] minPortfolioBufferToEnableEtfInvesting = 0.1 ######################################################################## # ---------------------------------------------------------------------- # End of modifiable code block # ---------------------------------------------------------------------- ######################################################################## self.InitCharts() self.UniverseSettings.Leverage = 1 self.pcm = MultiSourcesPortfolioConstructionModel(maxBudget_Momentum, maxBudget_Momentum_VolatilityCondition, maxNumberOfStocksToHold_Momentum, maxNumberOfStocksToHold_Momentum_VolatilityCondition, maxBudget_Value, maxBudget_Value_VolatilityCondition, maxNumberOfStocksToHold_Value, maxNumberOfStocksToHold_Value_VolatilityCondition, numberOfTradingDaysBeforeReset, riskManagementEnabled, maxDrawdown, maxVIX, minPortfolioBufferToEnableEtfInvesting) self.AddUniverseSelection(CustomUniverseSelectionModel(numberOfSymbolsCoarse, numberOfSymbolsFine, etfTickers)) self.AddAlpha(MomentumAlphaModel(self, numberOfMonthsUntilFullyInvested, maxNumberOfStocksToHold_Momentum, maxNumberOfStocksToHold_Momentum_VolatilityCondition, numberOfTradingDaysInvestedIfInProfit, numberOfTradingDaysInvestedIfInLoss, minMomentumPercent, riskManagementEnabled)) self.AddAlpha(ValueAlphaModel(self, numberOfMonthsUntilFullyInvested, maxNumberOfStocksToHold_Value, maxNumberOfStocksToHold_Value_VolatilityCondition, min_ROA, min_PE_Ratio, numberOfTradingDaysInvestedIfInProfit, numberOfTradingDaysInvestedIfInLoss, riskManagementEnabled)) self.AddAlpha(ConstantEtfAlphaModel(etfTickers)) self.SetPortfolioConstruction(self.pcm) self.SetExecution(CustomExecutionModel()) def InitCharts(self): spy = self.AddEquity("SPY", Resolution.Daily).Symbol self.SetBenchmark(spy) chart = Chart('Number of Stocks') chart.AddSeries(Series('Total', SeriesType.Line, 0, "")) chart.AddSeries(Series('ETF', SeriesType.Line, 0, "")) chart.AddSeries(Series('Momentum', SeriesType.Line, 0, "")) chart.AddSeries(Series('Value', SeriesType.Line, 0, "")) self.AddChart(chart)
#region imports from AlgorithmImports import * #endregion from collections import defaultdict from datetime import datetime, timedelta, timezone import math import functools from helper_tools import GroupBy, PortfolioSnapshot class MultiSourcesPortfolioConstructionModel(PortfolioConstructionModel): def __init__(self, momentumBudget, momentumBudgetVolCond, momentumNumHoldings, momentumNumHoldingsVolCond, valueBudget, valueBudgetVolCond, valueNumHoldings, valueNumHoldingsVolCond, numberOfTradingDaysBeforeReset, riskManagementEnabled, maxDrawdown, maxVIX, minPortfolioBufferToEnableEtfInvesting): self.insightCollection = InsightCollection() self.removedSymbols = [] self.nextExpiryTime = datetime.min.replace(tzinfo=timezone.utc) self.updateFreq = timedelta(1) self.symbolsToIgnore = set() self.riskModel = None self.vix = Symbol.Create("VIX", SecurityType.Index, Market.USA) self.momentumBudget = momentumBudget self.momentumBudgetVolCond = momentumBudgetVolCond self.numberOfStocksMomentum = momentumNumHoldings self.numberOfStocksMomentumVolCond = momentumNumHoldingsVolCond self.valueBudget = valueBudget self.valueBudgetVolCond = valueBudgetVolCond self.numberOfStocksValue = valueNumHoldings self.numberOfStocksValueVolCond = valueNumHoldingsVolCond self.numberOfTradingDaysBeforeReset = numberOfTradingDaysBeforeReset self.riskManagementEnabled = riskManagementEnabled self.maxDrawdown = maxDrawdown self.maxVIX = maxVIX self.etfInvestingEnabled = True self.minPortfolioBufferToEnableEtfInvesting = minPortfolioBufferToEnableEtfInvesting self.scheduledEvent = None self.IsInitRun = True self.prevRiskMode = False def CreateTargets(self, algorithm, insights): targets = [] if not self.CanUpdateTargets(algorithm, insights): return targets self.insightCollection.AddRange(insights) targets.extend(self.CreateFlatTargetsForRemovedSecurities()) targets.extend(self.CreateFlatTargetsForExpiredInsights(algorithm)) lastActiveInsights = self.GetLastActiveInsights(algorithm) if self.ShouldUpdateTargets(algorithm, lastActiveInsights): targets.extend(self.UpdatePortfolioTargets(algorithm, lastActiveInsights)) self.UpdateNextExpiryTime(algorithm) return targets def CanUpdateTargets(self, algorithm, insights): return len(insights) > 0 or algorithm.UtcTime > self.nextExpiryTime def CreateFlatTargetsForRemovedSecurities(self): if len(self.removedSymbols) == 0: return [] flatTargets = [PortfolioTarget(symbol, 0) for symbol in self.removedSymbols] self.insightCollection.Clear(self.removedSymbols) self.removedSymbols = [] return flatTargets def CreateFlatTargetsForExpiredInsights(self, algorithm): flatTargets = [] expiredInsights = self.insightCollection.RemoveExpiredInsights(algorithm.UtcTime) if len(expiredInsights) == 0: return flatTargets for symbol, _ in GroupBy(expiredInsights, key = lambda insight: insight.Symbol): if not self.insightCollection.HasActiveInsights(symbol, algorithm.UtcTime): flatTargets.append(PortfolioTarget(symbol, 0)) continue return flatTargets def GetLastActiveInsights(self, algorithm): activeInsights = self.insightCollection.GetActiveInsights(algorithm.UtcTime) lastActiveInsights = [] for k,g in GroupBy(activeInsights, key = lambda insight: (insight.Symbol, insight.SourceModel)): lastActiveInsights.append(sorted(g, key = lambda insight: insight.GeneratedTimeUtc)[-1]) return lastActiveInsights def ShouldUpdateTargets(self, algorithm, lastActiveInsights): if algorithm.UtcTime > self.nextExpiryTime: return True for insight in lastActiveInsights: if insight.Symbol in self.symbolsToIgnore: continue holding = algorithm.Portfolio[insight.Symbol] if insight.Direction != InsightDirection.Flat and (not holding.Invested): return True if insight.Direction == InsightDirection.Up and (not holding.IsLong): return True if insight.Direction == InsightDirection.Down and (not holding.IsShort): return True if insight.Direction == InsightDirection.Flat and holding.Invested: return True else: continue return False def UpdateNextExpiryTime(self, algorithm): self.nextExpiryTime = self.insightCollection.GetNextExpiryTime() if self.nextExpiryTime is None: self.nextExpiryTime = algorithm.UtcTime + self.updateFreq self.nextExpiryTime = min(self.nextExpiryTime, algorithm.UtcTime + self.updateFreq) def UpdatePortfolioTargets(self, algorithm, lastActiveInsights): targets = [] buffer = algorithm.Settings.FreePortfolioValuePercentage residualBuyingPower = algorithm.Portfolio.MarginRemaining/algorithm.Portfolio.TotalPortfolioValue/algorithm.UniverseSettings.Leverage - buffer # create PortfolioTargets for Momentum and Value Alpha Model Insights, taking into account Volatility Condition for insight in lastActiveInsights: if (insight.SourceModel == 'ETF') or (insight.Symbol in self.symbolsToIgnore): continue if algorithm.Portfolio[insight.Symbol].Invested and insight.Direction != InsightDirection.Flat: continue count = self.riskModel.NumberOfStocksBySourceModel[insight.SourceModel] budget = self.riskModel.BudgetBySourceModel[insight.SourceModel] if count*budget == 0: target = PortfolioTarget(insight.Symbol, 0) targets.append(target) continue weight = (budget * insight.Direction - buffer)/ count target = self.CreatePortfolioTargetSafely(algorithm, insight.Symbol, weight) if target is None: continue # target = PortfolioTarget.Percent(algorithm, insight.Symbol, weight) # if target is None: # self.symbolsToIgnore.add(insight.Symbol) # continue targets.append(target) residualBuyingPower -= abs(weight) self.prevRiskMode = self.riskModel.SafeMode # create portfolio targets for ETFs, if necessary if self.etfInvestingEnabled: count = len([symbol for symbol, holding in algorithm.Portfolio.items() if holding.Invested]) etfInsightSymbols = [insight.Symbol for insight in lastActiveInsights if insight.SourceModel == 'ETF' and insight.Direction != InsightDirection.Flat] count -= len(etfInsightSymbols) if count >= self.numberOfStocksValue + self.numberOfStocksMomentum: etfWeight = 0 self.etfInvestingEnabled = False if len(etfInsightSymbols) == 0: return targets if self.etfInvestingEnabled: leverage = max(1, algorithm.UniverseSettings.Leverage) currentEtfAllocation = sum([holding.AbsoluteHoldingsValue for symbol,holding in algorithm.Portfolio.items() if symbol in etfInsightSymbols]) \ /algorithm.Portfolio.TotalPortfolioValue if (currentEtfAllocation + residualBuyingPower > self.minPortfolioBufferToEnableEtfInvesting): etfWeight = (currentEtfAllocation/leverage + residualBuyingPower)/len(etfInsightSymbols) else: etfWeight = 0 self.etfInvestingEnabled = False TPV = algorithm.Portfolio.TotalPortfolioValue for insight in lastActiveInsights: if insight.SourceModel == 'ETF' and insight.Symbol not in self.symbolsToIgnore: target = self.CreatePortfolioTargetSafely(algorithm, insight.Symbol, etfWeight * insight.Direction) if target is None: continue targets.append(target) # security = algorithm.Securities[insight.Symbol] # if security.AskPrice * security.BidPrice == 0: # self.symbolsToIgnore.add(insight.Symbol) # continue # if abs(security.AskPrice - security.BidPrice) / security.AskPrice <= .01: # target = PortfolioTarget.Percent(algorithm, insight.Symbol, etfWeight*insight.Direction) # else: # current_weight = security.Holdings.HoldingsValue / TPV # market_price = security.AskPrice if etfWeight * insight.Direction >= current_weight else security.BidPrice # quantity = math.floor(TPV * etfWeight * insightDirection / market_price) # quantity -= quantity % security.SymbolProperties.LotSize # if quantity == 0: # self.symbolsToIgnore.add(insight.Symbol) # target = PortfolioTarget(insight.Symbol, quantity) # if target is None: # self.symbolsToIgnore.add(insight.Symbol) # continue # targets.append(target) return targets def CreatePortfolioTargetSafely(self, algorithm, symbol, target_weight): security = algorithm.Securities[symbol] if security.AskPrice * security.BidPrice == 0: self.symbolsToIgnore.add(symbol) return if abs(security.AskPrice - security.BidPrice) / security.AskPrice <= .01: target = PortfolioTarget.Percent(algorithm, symbol, target_weight) else: TPV = algorithm.Portfolio.TotalPortfolioValue current_weight = security.Holdings.HoldingsValue / TPV market_price = security.AskPrice if target_weight >= current_weight else security.BidPrice quantity = math.floor(TPV * target_weight / market_price) quantity -= quantity % security.SymbolProperties.LotSize quantity = int(quantity) target = PortfolioTarget(symbol, quantity) if target is None: self.symbolsToIgnore.add(symbol) return target def OnSecuritiesChanged(self, algorithm, changes): if self.scheduledEvent is None: self.scheduledEvent = algorithm.Schedule.On(algorithm.DateRules.EveryDay(), algorithm.TimeRules.Midnight, functools.partial(self.UpdateCharts, algorithm)) for security in changes.RemovedSecurities: symbol = security.Symbol if symbol.Equals(self.vix): continue self.removedSymbols.append(symbol) if symbol in self.symbolsToIgnore: self.symbolsToIgnore.remove(symbol) for security in changes.AddedSecurities: if security.Symbol.Equals(self.vix) and self.riskModel is None: self.riskModel = RiskModel(algorithm, security, self.momentumBudget, self.momentumBudgetVolCond, self.numberOfStocksMomentum, self.numberOfStocksMomentumVolCond, self.valueBudget, self.valueBudgetVolCond, self.numberOfStocksValue, self.numberOfStocksValueVolCond, self.riskManagementEnabled, self.maxDrawdown, self.maxVIX, self.numberOfTradingDaysBeforeReset) def UpdateCharts(self, algorithm): activeInsights = self.insightCollection.GetActiveInsights(algorithm.UtcTime) holdingsCountBySourceModel = defaultdict(int) for k,g in GroupBy(activeInsights, key = lambda insight: insight.SourceModel): holdingsCountBySourceModel[k] = len({x.Symbol for x in g if x.Direction != InsightDirection.Flat and algorithm.Portfolio[x.Symbol].Invested}) for sourceModel in ['ETF', 'Momentum', 'Value']: algorithm.Plot('Number of Stocks', sourceModel, int(holdingsCountBySourceModel[sourceModel])) totalCount = len([holding for _,holding in algorithm.Portfolio.items() if holding.Invested]) algorithm.Plot('Number of Stocks', 'Total', totalCount) class RiskModel: def __init__(self, algorithm, security, momentumBudget, momentumBudgetVolCond, numberOfStocksMomentum, numberOfStocksMomentumVolCond, valueBudget, valueBudgetVolCond, numberOfStocksValue, numberOfStocksValueVolCond, riskManagementEnabled, maxDrawdown, maxVIX, numberOfTradingDaysBeforeReset): self.algorithm = algorithm self.Security = security self.Symbol = security.Symbol self.portfolioHigh = algorithm.Portfolio.TotalPortfolioValue self.SafeMode = False self.safeModeExpiryTime = datetime.min self.momentumBudget = momentumBudget self.momentumBudgetDefault = momentumBudget self.momentumBudgetVolCond = momentumBudgetVolCond self.numberOfStocksMomentum = numberOfStocksMomentum self.numberOfStocksMomentumDefault = numberOfStocksMomentum self.numberOfStocksMomentumVolCond = numberOfStocksMomentumVolCond self.valueBudget = valueBudget self.valueBudgetDefault = valueBudget self.valueBudgetVolCond = valueBudgetVolCond self.numberOfStocksValue = numberOfStocksValue self.numberOfStocksValueDefault = numberOfStocksValue self.numberOfStocksValueVolCond = numberOfStocksValueVolCond self.numberOfStocksBySourceModel = {'Momentum': numberOfStocksMomentum, 'Value': numberOfStocksValue} self.riskManagementEnabled = riskManagementEnabled self.maxDrawdown = -abs(maxDrawdown) self.maxVIX = maxVIX self.numberOfTradingDaysBeforeReset = timedelta(days=numberOfTradingDaysBeforeReset) self.budgetBySourceModel = defaultdict(float) self.budgetBySourceModel.update({'Momentum':momentumBudget, 'Value':valueBudget}) self.hour = None algorithm.Schedule.On(algorithm.DateRules.EveryDay(self.Symbol), algorithm.TimeRules.Every(timedelta(hours=1)), self.Update) @property def BudgetBySourceModel(self): if not self.riskManagementEnabled: return self.budgetBySourceModel self.budgetBySourceModel.update({"Momentum":self.momentumBudget, "Value":self.valueBudget}) return self.budgetBySourceModel @property def NumberOfStocksBySourceModel(self): if not self.riskManagementEnabled: return self.numberOfStocksBySourceModel self.numberOfStocksBySourceModel.update({'Momentum':self.numberOfStocksMomentum, 'Value':self.numberOfStocksValue}) return self.numberOfStocksBySourceModel def Update(self): if not self.SafeMode and self.algorithm.Time > self.safeModeExpiryTime: self.momentumBudget = self.momentumBudgetDefault self.valueBudget = self.valueBudgetDefault self.numberOfStocksMomentum = self.numberOfStocksMomentumDefault self.numberOfStocksValue = self.numberOfStocksValueDefault self.SafeMode = self.Security.Price > self.maxVIX and self.CurrentDrawdown < self.maxDrawdown if self.SafeMode: self.momentumBudget = self.momentumBudgetVolCond self.valueBudget = self.valueBudgetVolCond self.numberOfStocksMomentum = self.numberOfStocksMomentumVolCond self.numberOfStocksValue = self.numberOfStocksValueVolCond self.algorithm.Debug(f"{self.algorithm.Time} | Volatility Condition triggered!\nAdjusting Portfolio Targets and permitted Number of Stocks for each Alpha Model.") else: if self.SafeMode and self.Security.Price < self.maxVIX and self.CurrentDrawdown > self.maxDrawdown: self.safeModeExpiryTime = self.algorithm.Time + self.numberOfTradingDaysBeforeReset self.SafeMode = False self.algorithm.Debug(f"{self.algorithm.Time} | Volatility Condition removed!\nKeep Risk Management Settings unchanged until {self.safeModeExpiryTime}.") @property def CurrentDrawdown(self): currentTPV = self.algorithm.Portfolio.TotalPortfolioValue if currentTPV > self.portfolioHigh: self.portfolioHigh = currentTPV return currentTPV/self.portfolioHigh - 1
#region imports from AlgorithmImports import * #endregion from Selection.FundamentalUniverseSelectionModel import FundamentalUniverseSelectionModel class CustomUniverseSelectionModel(FundamentalUniverseSelectionModel): def __init__(self, maxNumberOfSymbolsCoarse, maxNumberOfSymbolsFine, etfTickers): self.nextSelectionUpdate = datetime.min self.maxNumberOfSymbolsCoarse = maxNumberOfSymbolsCoarse self.maxNumberOfSymbolsFine = maxNumberOfSymbolsFine self.etfSymbols = [Symbol.Create(ticker, SecurityType.Equity, Market.USA) for ticker in etfTickers] self.vix = Symbol.Create("VIX", SecurityType.Index, Market.USA) self.IsInitRun = True super().__init__(True, None) def SelectCoarse(self, algorithm, coarse): if algorithm.Time < self.nextSelectionUpdate: return Universe.Unchanged TPV = algorithm.Portfolio.TotalPortfolioValue coarseFiltered = [c for c in coarse if (c.HasFundamentalData and (c.Price < TPV/100) and (algorithm.Time - c.Symbol.ID.Date).days > 365)] sortedByDollarVolume = sorted(coarseFiltered, key = lambda c: c.DollarVolume, reverse = False) return [c.Symbol for c in sortedByDollarVolume][:self.maxNumberOfSymbolsCoarse] def SelectFine(self, algorithm, fine): filteredFine = [f for f in fine if (f.CompanyReference.CountryId in {"USA"} and f.CompanyReference.PrimaryExchangeID in {"NYS", "NAS"} and (algorithm.Time - f.SecurityReference.IPODate).days > 360 and f.CompanyReference.IndustryTemplateCode not in {"I", "B"} and 1e8 < f.MarketCap #< 1e10 and (f.FinancialStatements.BalanceSheet.TotalLiabilitiesAsReported.Value - f.FinancialStatements.BalanceSheet.CashAndCashEquivalents.Value < f.FinancialStatements.BalanceSheet.TangibleBookValue.Value*1.25 ))] # Note: .Value refers to the default period field which is "TwelveMonths". You can also use "ThreeMonths". sortedByMarketCap = sorted(filteredFine, key = lambda f: f.MarketCap, reverse=False) self.nextSelectionUpdate = Expiry.EndOfMonth(algorithm.Time) - timedelta(days=3) selection = [f.Symbol for f in sortedByMarketCap][:self.maxNumberOfSymbolsFine] + self.etfSymbols + [self.vix] + self.CurrentHoldings(algorithm) selection.extend(self.LoadSymbolsFromObjectStore(algorithm)) return selection def LoadSymbolsFromObjectStore(self, algorithm): # load symbols from the object store which insights haven't expired yet to ensure we have a data feed for those symbols if not (algorithm.LiveMode and self.IsInitRun): return [] suffix = 'InsightCollection' keys = [x.Key for x in algorithm.ObjectStore if x.Key.endswith(suffix)] dfList = [] symbols = [] for key in keys: storedInsightsDf = pd.read_json(algorithm.ObjectStore.Read(key), orient='split') storedInsightsDf.loc[:, 'Period'] = pd.to_timedelta(storedInsightsDf.Period, unit='ms') storedInsightsDf.loc[:, 'GeneratedTimeUtc'] = pd.to_datetime(storedInsightsDf.GeneratedTimeUtc, unit='ms', utc=True) storedInsightsDf.loc[:, 'CloseTimeUtc'] = pd.to_datetime(storedInsightsDf.CloseTimeUtc, unit='ms', utc=True) filteredInsightsDf = storedInsightsDf.loc[(storedInsightsDf.SourceModel.eq(key.split(suffix)[0]) & storedInsightsDf.CloseTimeUtc.gt(algorithm.UtcTime) & storedInsightsDf.Direction.eq(1) & storedInsightsDf.GeneratedTimeUtc.lt(algorithm.UtcTime))] if not filteredInsightsDf.empty: dfList.append(filteredInsightsDf) if len(dfList) > 0: df = pd.concat(dfList) symbols = df.Symbol.apply(SymbolCache.GetSymbol).unique().tolist() self.IsInitRun = False return symbols def CurrentHoldings(self, algorithm): return [security.Key for security in algorithm.ActiveSecurities if security.Value.Holdings.Invested]