Overall Statistics |
Total Trades 8390 Average Win 0.22% Average Loss -0.18% Compounding Annual Return 60.365% Drawdown 17.400% Expectancy 0.648 Net Profit 11295.206% Sharpe Ratio 2.132 Probabilistic Sharpe Ratio 99.962% Loss Rate 27% Win Rate 73% Profit-Loss Ratio 1.25 Alpha 0.36 Beta 0.425 Annual Standard Deviation 0.193 Annual Variance 0.037 Information Ratio 1.462 Tracking Error 0.2 Treynor Ratio 0.966 Total Fees $568237.29 Estimated Strategy Capacity $1400000.00 Lowest Capacity Asset TMF UBTUG7D0B7TX |
class MovingAverageAlphaCreationModel(AlphaModel): def __init__(self, algorithm, name, activateAlpha, parametersDict, canarySymbol, periodShortSMA, periodLongSMA, movingAverageThresholds, rebalancingFrequency): self.algo = algorithm self.Name = name self.activateAlpha = activateAlpha self.parametersDict = parametersDict self.movingAverageThresholds = movingAverageThresholds self.rebalancingFrequency = rebalancingFrequency self.weightsDict = {} self.timeToEmitInsights = False self.smaRatio = None if self.activateAlpha: shortSMA = algorithm.SMA(canarySymbol, periodShortSMA, Resolution.Daily) longSMA = algorithm.SMA(canarySymbol, periodLongSMA, Resolution.Daily) self.smaRatio = IndicatorExtensions.Over(shortSMA, longSMA) def Update(self, algorithm, data): insights = [] if not self.timeToEmitInsights: return insights insightExpiry = Expiry.EndOfDay(algorithm.Time) # loop through insights and send portfolio targets for symbol, weight in self.weightsDict.items(): if symbol.Value.split('_')[0] != 'CUSTOM' and not data.ContainsKey(symbol): continue if weight > 0: insightDirection = InsightDirection.Up elif weight < 0: insightDirection = InsightDirection.Down else: insightDirection = InsightDirection.Flat insights.append(Insight.Price(symbol, insightExpiry, insightDirection, None, None, None, abs(weight))) self.timeToEmitInsights = False return insights def GenerateSignals(self): """ Create new signals """ if not self.activateAlpha or not self.smaRatio.IsReady: return if self.smaRatio.Current.Value >= self.movingAverageThresholds[0]: self.weightsDict = {security.Symbol: self.parametersDict[security.Symbol.Value][0] for security in self.algo.ActiveSecurities.Values if security.Symbol.Value in self.parametersDict} elif self.smaRatio.Current.Value >= self.movingAverageThresholds[1]: self.weightsDict = {security.Symbol: self.parametersDict[security.Symbol.Value][1] for security in self.algo.ActiveSecurities.Values if security.Symbol.Value in self.parametersDict} else: self.weightsDict = {security.Symbol: self.parametersDict[security.Symbol.Value][2] for security in self.algo.ActiveSecurities.Values if security.Symbol.Value in self.parametersDict} self.timeToEmitInsights = True def OnSecuritiesChanged(self, algorithm, changes): """ Actions to take every time the universe changes """ for security in changes.AddedSecurities: if security.Symbol.Value == 'SPY': symbol = security.Symbol algorithm.Schedule.On(self.rebalancingFrequency[0](symbol), algorithm.TimeRules.AfterMarketOpen(symbol, self.rebalancingFrequency[1]), self.GenerateSignals)
import pandas as pd import general_utils as utils import json class BuyAndHoldAlphaCreationModel(AlphaModel): def __init__(self, algorithm, name, activateAlpha, parametersDict, rebalancingFrequency): self.algo = algorithm self.Name = name self.activateAlpha = activateAlpha self.parametersDict = parametersDict self.rebalancingFrequency = rebalancingFrequency self.weightsDict = {} self.timeToEmitInsights = False self.weightsRetrieved = False self.monthStartIsHere = False # retrieve information from object store in live mode if self.algo.LiveMode: dictFromOS = utils.ReadFromObjectStore(self.algo, 'BuyAndHoldSVOL') if dictFromOS: self.algo.Log('BuyAndHoldSVOL; retrieving last optimal weights from object store: ' + str(dictFromOS)) self.weightsDict = {self.algo.Symbol(ticker): weight for ticker, weight in dictFromOS.items()} self.weightsRetrieved = True def Update(self, algorithm, data): insights = [] if not self.timeToEmitInsights: return insights insightExpiry = Expiry.EndOfDay(algorithm.Time) # loop through insights and send portfolio targets for symbol, weight in self.weightsDict.items(): if symbol.Value.split('_')[0] != 'CUSTOM' and not data.ContainsKey(symbol): continue if weight > 0: insightDirection = InsightDirection.Up elif weight < 0: insightDirection = InsightDirection.Down else: insightDirection = InsightDirection.Flat insights.append(Insight.Price(symbol, insightExpiry, insightDirection, None, None, None, abs(weight))) self.timeToEmitInsights = False return insights def GenerateSignals(self): """ Create new signals """ if not self.activateAlpha: return if self.monthStartIsHere or self.weightsRetrieved: self.weightsDict = {security.Symbol: self.parametersDict[security.Symbol.Value] for security in self.algo.ActiveSecurities.Values if security.Symbol.Value in self.parametersDict} self.weightsRetrieved = False self.timeToEmitInsights = True if self.algo.LiveMode or self.algo.BackTestWarmUp == True: dictToOS = {symbol.Value: weight for symbol, weight in self.weightsDict.items()} utils.SaveToObjectStore(self.algo, 'BuyAndHoldSVOL', dictToOS) self.monthStartIsHere = False def MonthStart(self): self.monthStartIsHere = True def OnSecuritiesChanged(self, algorithm, changes): """ Actions to take every time the universe changes """ for security in changes.AddedSecurities: if security.Symbol.Value == 'SPY': symbol = security.Symbol algorithm.Schedule.On(self.rebalancingFrequency[0](symbol), algorithm.TimeRules.AfterMarketOpen(symbol, self.rebalancingFrequency[1]), self.MonthStart) algorithm.Schedule.On(algorithm.DateRules.EveryDay(symbol), algorithm.TimeRules.AfterMarketOpen(symbol, self.rebalancingFrequency[1]), self.GenerateSignals)
def GetAlphaParameters(self, alphaName=None, parameter=None): stocksTicker = 'TQQQ' spyTicker = 'SPXL' bondsTicker = 'TMF' cashTicker = 'IEF' inflTicker = 'TIP' goldTicker = 'IAU' volatilityTicker = 'UVXY' #volatilityFactor = 1 minutesAfterStart = 90 alphaParametersDict = {'RotationalOptimizer': {'activate': True, 'riskyAssetsParameters': { 'SPY': {'addTicker': [True, spyTicker], 'sma': [200, (-0.10, 0.10), 0.0], 'macd': [(231, 567, 168), 0, 0.3], 'yield': [True, 0.2], 'optimizationBounds': (0, 1)}, 'QQQ': {'addTicker': [True, stocksTicker], 'sma': [200, (-0.20, 0.20), 0.0], 'macd': [(231, 567, 168), 0, 0.3], 'yield': [True, 0.2], 'optimizationBounds': (0, 1)}, 'TLT': {'addTicker': [True, bondsTicker], 'sma': [300, (-0.15, 0.15), 0.0], 'macd': [(50, 150, 30), 0, 0.0], 'yield': [False, 0], 'optimizationBounds': (0, 1)}, #'GLD': {'addTicker': [True, 'DGP'], # 'sma': [100, (-0.10, 0.10), 0.0], # 'macd': [(50, 150, 30), 0, 0.3], # 'yield': [False, 0], # 'optimizationBounds': (0, 1)}, #'BITO': {'addTicker': [True, 'BLOK'], #Change to BITO # 'sma': [200, (-0.20, 0.20), 0.0], # 'macd': [(50, 150, 30), 0, 0.0], # 'yield': [False, 0], # 'optimizationBounds': (0, 1)} }, 'lookbackOptimization': 63, 'objFunction': 'minVariance', 'tickerCash': inflTicker, 'rebalancingFrequency': (self.DateRules.MonthStart, minutesAfterStart), #'optimizationBounds': (0.01, 0.01)}, 'optimizationBounds': (0.10, 1.00)}, 'RotationalOptimizer1x': {'activate': False, 'riskyAssetsParameters': { 'SPY': {'addTicker': [True, 'SPY'], #Change to actual tickers 'sma': [200, (-0.10, 0.10), 0], 'macd': [(231, 567, 168), 0, 0.3], 'yield': [False, 0], 'optimizationBounds': (0, 1)}, 'TLT': {'addTicker': [True, 'TLT'], 'sma': [600, (-0.2, 0.2), 0], 'macd': [(63, 168, 42), 0, 0], 'yield': [False, 0], 'optimizationBounds': (0, 1)}, 'GLD': {'addTicker': [True, goldTicker], 'sma': [100, (-0.15, 0.15), 0], 'macd': [(50, 150, 30), 0, 0.3], 'yield': [False, 0], 'optimizationBounds': (0, 1)}, }, 'lookbackOptimization': 21, 'objFunction': 'minVariance', 'tickerCash': inflTicker, 'rebalancingFrequency': (self.DateRules.WeekStart, minutesAfterStart), 'optimizationBounds': (0.01, 0.20)}, 'BuyAndHoldSVOL': {'activate': False, 'tickersWeights': {'CUSTOM_SVOL': 0.0, 'CUSTOM_SPD': 0.0,'TQQQ': 0.13, 'TMF': 0.13, 'BITO': 0.00, 'IAU': 0.00, 'DGP': 0, 'BND': 0.74, 'SVXY': 0, 'IEF': 0.00, 'TIP': 0.0, 'TLT': 0, 'TYD': 0}, 'rebalancingFrequency': (self.DateRules.MonthStart, minutesAfterStart), 'optimizationBounds': (0.30, 0.40)}, 'VixTermStructure': {'activate': True, #'tickersWeights': { volatilityTicker: [0.0, 0.0, 0.0, 0.1, 0.6], stocksTicker: [0.5, 0.6, 0.6, 0.5, 0.0], bondsTicker: [0.1, 0.0, 0.0, 0.0, 0.0], inflTicker: [0.4, 0.4, 0.4, 0.4, 0.4]}, 'tickersWeights': { volatilityTicker: [0.00, 0.05, 0.1, 0.4, 1.0, 0.02], stocksTicker: [0.80, 0.85, 0.9, 0.6, 0.0, 0.80], bondsTicker: [0.20, 0.10, 0.0, 0.0, 0.0, 0.18], inflTicker: [0.00, 0.00, 0.0, 0.0, 0.0, 0.00]}, 'canaryTicker': 'SPY', 'periodShortSMA': 3, 'periodLongSMA': 15, 'contangoThresholds': [1.25, 0.923], 'rebalancingFrequency': (self.DateRules.EveryDay, minutesAfterStart), #'optimizationBounds': (0.99, 1.00)}, 'optimizationBounds': (0.20, 1.00)}, 'MovingAverageSPY': {'activate': False, 'tickersWeights': { 'TQQQ': [0.14, 0.14, 0.00], 'TMF': [0.07, 0.07, 0.07], 'IAU': [0.04, 0.04, 0.04], 'IEF': [0.35, 0.35, 0.40], 'TIP': [0.40, 0.40, 0.49] }, 'canaryTicker': 'SPY', 'periodShortSMA': 1, 'periodLongSMA': 168, 'movingAverageThresholds': [1.1, 1.0], 'rebalancingFrequency': (self.DateRules.MonthStart, minutesAfterStart), 'optimizationBounds': (0.01, 1)}, 'InAndOut': {'activate': True, 'assetsWeights': {'stocksProxy': [stocksTicker, 0.6], 'bondsProxy': [bondsTicker, 0.4], 'cashProxy': [cashTicker, 0], 'inflProxy': [inflTicker, 0], 'goldProxy': [goldTicker, 0], 'volatilityProxy': [volatilityTicker, 0]}, 'rebalancingFrequency': (self.DateRules.EveryDay, minutesAfterStart), 'optimizationBounds': (0.20, 1.00)}, #'optimizationBounds': (0.99, 1.00)}, } # if no filtering elements provided, return the whole dictionary if alphaName is None and parameter is None: return alphaParametersDict return alphaParametersDict[alphaName][parameter]
from OptimizerModules.OptimizerClass import PortfolioOptimizer import pandas as pd import general_utils as utils class RotationalOptimizerAlphaCreationModel(AlphaModel): def __init__(self, algorithm, name, activateAlpha, parametersDict, rebalancingFrequency, lookbackOptimization, objFunction, tickerCash, treasuryRatesSymbol): self.algo = algorithm self.Name = name self.activateAlpha = activateAlpha self.parametersDict = parametersDict self.rebalancingFrequency = rebalancingFrequency self.lookbackOptimization = lookbackOptimization self.objFunction = objFunction self.tickerCash = tickerCash self.treasuryRatesSymbol = treasuryRatesSymbol self.insuranceMonitor = False # for yield signal crisis self.lookbackNegativeYield = 147 # number of days to lookback for negative values self.startCrisisYieldValue = 0.3 # the yield value above which we apply the yield weight condition # initialize the optimizer self.optimizer = PortfolioOptimizer() # get all the parameters for the indicators valuesList = [] self.alphaTickers = [] for ticker in parametersDict.keys(): if parametersDict[ticker]['addTicker'][0]: valuesList.append(parametersDict[ticker]['sma'][0]) valuesList.append(sum(parametersDict[ticker]['macd'][0])) self.alphaTickers.append(ticker) tradableTicker = parametersDict[ticker]['addTicker'][1] if tradableTicker != ticker: self.alphaTickers.append(tradableTicker) self.alphaTickers.append(self.tickerCash) self.alphaTickers.append("UVXY") # keep the highest parameter provided to call history self.lookbackHistory = max(lookbackOptimization, max(valuesList)) self.weightsDict = {} self.timeToEmitInsights = False self.atrStocksCrossover = None self.atrBondsCrossover = None self.weightsRetrieved = False self.weekStart = False # Sudhir changes to code to optimize on start if nothing found in object store. if self.algo.LiveMode: dictFromOS = utils.ReadFromObjectStore(self.algo, 'RO') if dictFromOS: self.algo.Log('RO; retrieving last optimal weights from object store: ' + str(dictFromOS)) self.weightsDict = {self.algo.Symbol(ticker): weight for ticker, weight in dictFromOS.items()} self.weightsRetrieved = True def Update(self, algorithm, data): insights = [] if not self.timeToEmitInsights: return insights insightExpiry = Expiry.EndOfDay(algorithm.Time) # loop through insights and send portfolio targets for symbol, weight in self.weightsDict.items(): if symbol.Value.split('_')[0] != 'CUSTOM' and not data.ContainsKey(symbol): continue if weight > 0: insightDirection = InsightDirection.Up elif weight < 0: insightDirection = InsightDirection.Down else: insightDirection = InsightDirection.Flat insights.append(Insight.Price(symbol, insightExpiry, insightDirection, None, None, None, abs(weight))) self.timeToEmitInsights = False return insights def GenerateSignals(self): """ Create new signals """ if not self.activateAlpha: return # get active symbols for this alpha symbols = [x.Symbol for x in self.algo.ActiveSecurities.Values if x.Symbol.Value in self.alphaTickers] # determine target percent for the given insights (check function DetermineTargetPercent for details) self.weightsDict = self.DetermineTargetPercent(self.algo, symbols) #self.AdjustWeightsForInsurance() self.timeToEmitInsights = True if self.algo.LiveMode or self.algo.BackTestWarmUp == True: dictToOS = {symbol.Value: weight for symbol, weight in self.weightsDict.items()} utils.SaveToObjectStore(self.algo, 'RO', dictToOS) def CheckCrossoverATR(self): if self.weightsRetrieved == True: self.weightsRetrieved = False self.timeToEmitInsights = True """ Check if there was an ATR Crossover to trigger GenerateSignals """ if (self.atrStocksCrossover is not None and self.atrBondsCrossover is not None and (self.atrStocksCrossover.Current.Value > 1.5 or self.atrBondsCrossover.Current.Value > 1.5) # ): and (self.weekStart)): self.algo.Log('RO; triggering GenerateSignals due to ATR Crossover') self.GenerateSignals() if self.algo.isRiskHighest or self.algo.isRiskHigh or self.insuranceMonitor == True: self.algo.Log('RO; triggering GenerateSignals due to High Risk flag') #self.GenerateSignals() self.AdjustWeightsForInsurance() self.weekStart = False def AdjustWeightsForInsurance(self): cashWeight = 0 if self.algo.isRiskHighest: insurancePercentage = 0.20 self.insuranceMonitor = True elif self.algo.isRiskHigh: insurancePercentage = 0.02 self.insuranceMonitor = True else: insurancePercentage = 0 self.insuranceMonitor = False #if self.algo.isRiskHighest or self.algo.isRiskHigh: # for symbol, weight in self.weightsDict.items(): # if symbol.Value == self.tickerCash or symbol.Value == "UVXY": # cashWeight = cashWeight + self.weightsDict[symbol] # for symbol, weight in self.weightsDict.items(): # if symbol.Value == self.tickerCash: # self.weightsDict[symbol] = cashWeight*(1 - insurancePercentage) # if symbol.Value == "UVXY": # self.weightsDict[symbol] = cashWeight*insurancePercentage # self.timeToEmitInsights = True if self.algo.isRiskHighest or self.algo.isRiskHigh: for symbol, weight in self.weightsDict.items(): if symbol.Value == "UVXY": self.weightsDict[symbol] = insurancePercentage else: self.weightsDict[symbol] = self.weightsDict[symbol]*(1-insurancePercentage) self.timeToEmitInsights = True def OnWeekStart(self): self.weekStart = True def OnSecuritiesChanged(self, algorithm, changes): """ Actions to take every time the universe changes """ for security in changes.AddedSecurities: if security.Symbol.Value == 'SPY': symbol = security.Symbol algorithm.Schedule.On(self.rebalancingFrequency[0](symbol), algorithm.TimeRules.AfterMarketOpen(symbol, self.rebalancingFrequency[1]), self.GenerateSignals) algorithm.Schedule.On(algorithm.DateRules.EveryDay(symbol), algorithm.TimeRules.AfterMarketOpen(symbol, self.rebalancingFrequency[1]), self.CheckCrossoverATR) algorithm.Schedule.On(algorithm.DateRules.WeekStart(symbol), algorithm.TimeRules.AfterMarketOpen(symbol, 1), self.OnWeekStart) # initialize atr indicators if self.activateAlpha and security.Symbol.Value in ['SPY', 'TLT']: shortATR = algorithm.ATR(security.Symbol, 10, Resolution.Hour) longATR = algorithm.ATR(security.Symbol, 63, Resolution.Hour) if security.Symbol.Value == 'SPY': self.atrStocksCrossover = IndicatorExtensions.Over(shortATR, longATR) elif security.Symbol.Value == 'TLT': self.atrBondsCrossover = IndicatorExtensions.Over(shortATR, longATR) def DetermineTargetPercent(self, algorithm, symbols): """ Description: Determine the target percent for each insight Args: algorithm: The algorithm instance symbols: The symbols to generate an insight for """ # empty dictionary to store portfolio targets by symbol finalWeights = {} # get symbols for calculations calculationSymbols = [x for x in symbols if x.Value in self.parametersDict.keys()] # get historical data for calculationSymbols for the last n trading days history = algorithm.History(calculationSymbols, self.lookbackHistory, Resolution.Daily) # empty dictionary for calculations calculations = {} # iterate over all symbols and perform calculations for symbol in calculationSymbols: # check if we have enough historical data, otherwise just skip this security if not utils.CheckHistory(symbol, history): algorithm.Log(str(symbol.Value) + ': skipping security due to no/not enough historical data') continue else: # add symbol to calculations calculations[symbol] = SymbolData(symbol, parametersDict=self.parametersDict) try: # get series of daily returns calculations[symbol].CalculateDailyReturnSeries(history, self.lookbackOptimization) # update technical indicators calculations[symbol].UpdateIndicators(history) except BaseException as e: algorithm.Log('returning empty weights for now for: ' + str(symbol.Value) + 'due to ' + str(e) + '; we will try again at the next iteration') return finalWeights # calculate optimal weights optWeights = self.CalculateOptimalWeights(algorithm, calculations) algorithm.Log('optimal weights for the period: ' + str(optWeights)) if optWeights is None: algorithm.Log('returning empty weights due to optWeights being None; we will try again at the next iteration') return finalWeights # modify optimal weights using specific criteria finalWeights = self.FilterOptimalWeights(algorithm, calculations, optWeights) algorithm.Log('filtered optimal weights for the period: ' + str({symbol.Value: weight for symbol, weight in finalWeights.items()})) # check if we can trade the tradable symbol, otherwise we trade the original ticker replaceSymbol = {} for symbol in finalWeights: tradableTicker = self.parametersDict[symbol.Value]['addTicker'][1] if symbol.Value != tradableTicker: tradableSymbol = algorithm.Symbol(tradableTicker) # check if the price of tradableSymbol is not zero if algorithm.Securities[tradableSymbol].Price != 0: # we trade the original ticker replaceSymbol[tradableSymbol] = symbol for newSymbol, replacedSymbol in replaceSymbol.items(): finalWeights[newSymbol] = finalWeights[replacedSymbol] finalWeights.pop(replacedSymbol) # check how much we have allocated so far to risky assets totalWeight = sum(finalWeights.values()) if totalWeight >= 1: totalWeight = 1 algorithm.Log('total allocation after weight filtering: ' + str(totalWeight)) if self.algo.isRiskHighest: volatilityWeight = 0.0 else: volatilityWeight = 0.0 # allocate remaining cash to tickerCash for symbol in symbols: if symbol.Value == 'UVXY': finalWeights[symbol] = volatilityWeight if symbol.Value == self.tickerCash: if symbol in finalWeights.keys(): finalWeights[symbol] = finalWeights[symbol] + (1 - totalWeight - volatilityWeight) else: finalWeights[symbol] = 1 - totalWeight - volatilityWeight algorithm.Log(str(self.tickerCash) + '; final allocation for tickerCash: ' + str(finalWeights[symbol])) # avoid very small numbers and make them 0 for symbol, weight in finalWeights.items(): if weight <= 1e-10: finalWeights[symbol] = 0 return finalWeights def CalculateOptimalWeights(self, algorithm, calculations): """ Description: Calculate the individual weights for each symbol that optimize some given objective function Args: algorithm: The algorithm instance calculations: Dictionary containing calculations for each symbol """ # create a dictionary keyed by the symbols in calculations with a pandas.Series as value # to create a dataframe of returns dailyReturnsDict = {symbol.Value: symbolData.dailyReturnsSeries for symbol, symbolData in calculations.items()} dailyReturnsDf = pd.DataFrame(dailyReturnsDict) listTickers = list(dailyReturnsDf.columns) try: # portfolio optimizer finds the optimal weights for the given data bounds = [self.parametersDict[x]['optimizationBounds'] for x in dailyReturnsDf.columns] listOptWeights = self.optimizer.Optimize(objFunction=self.objFunction, dailyReturnsDf=dailyReturnsDf, bounds=bounds) except BaseException as e: algorithm.Log('Optimize failed due to ' + str(e)) return None # create dictionary with the optimal weights by symbol weights = {listTickers[i]: listOptWeights[i] for i in range(len(listTickers))} # avoid very small numbers and make them 0 for ticker, weight in weights.items(): if weight <= 1e-10: weights[ticker] = 0 return weights def CheckYieldSignalCrisis(self): """ Check the Yield condition """ yieldSignalCrisis = False # get the last six months of historical USTREASURY/YIELD values histYield = self.algo.History([self.treasuryRatesSymbol], self.lookbackNegativeYield + 1, Resolution.Daily).loc[self.treasuryRatesSymbol] tenYr = histYield['10 yr'] # get the 10-year yield threeMo = histYield['3 mo'] # get the 3-month yield tenYrMinusThreeMo = tenYr - threeMo # calculate the difference between the two indexNegative = tenYrMinusThreeMo[tenYrMinusThreeMo < 0].head(1).index # check if there was actually some negative yield values if len(indexNegative) > 0: cutOff = indexNegative[0] # filter the series for days after that day with negative value afterNegative = tenYrMinusThreeMo[tenYrMinusThreeMo.index > cutOff] # check if at some point it reached our startCrisisYieldValue if len(afterNegative) > 0 and max(afterNegative) > self.startCrisisYieldValue: yieldSignalCrisis = True return yieldSignalCrisis def FilterOptimalWeights(self, algorithm, calculations, optWeights): """ Description: Filter and modify the optimal weights using a combination of technical indicators Args: algorithm: The algorithm instance calculations: Dictionary containing calculations for each symbol optWeights: Dictionary with the optimal weights by symbol """ # check for yield signal crisis yieldSignalCrisis = self.CheckYieldSignalCrisis() # empty dictionary to store weights weights = {} # loop through calculations and check conditions for weight filtering ------------------------ for symbol, symbolData in calculations.items(): if symbolData.SMA.IsReady and symbolData.MACD.IsReady: currentPrice = algorithm.ActiveSecurities[symbol].Price # check if sma condition is met and act accordingly ---------------------------------- smaLowerBoundCondition = self.parametersDict[symbol.Value]['sma'][1][0] smaUpperBoundCondition = self.parametersDict[symbol.Value]['sma'][1][1] smaConditionWeight = self.parametersDict[symbol.Value]['sma'][2] algorithm.Log(str(symbol.Value) + '; current price: ' + str(round(currentPrice, 2)) + '; SMA: ' + str(round(symbolData.SMA.Current.Value, 2)) + '; Price vs SMA: ' + str(round((currentPrice / symbolData.SMA.Current.Value) - 1, 2))) if (currentPrice <= symbolData.SMA.Current.Value * (1 + smaLowerBoundCondition) or currentPrice >= symbolData.SMA.Current.Value * (1 + smaUpperBoundCondition)): weights[symbol] = min(optWeights[symbol.Value], smaConditionWeight) algorithm.Log(str(symbol.Value) + '; modifying weight due to sma filtering from ' + str(optWeights[symbol.Value]) + ' to ' + str(weights[symbol])) else: weights[symbol] = optWeights[symbol.Value] smaModifiedWeight = weights[symbol] # check if macd condition is met and act accordingly ---------------------------------- macdCondition = self.parametersDict[symbol.Value]['macd'][1] macdConditionWeight = self.parametersDict[symbol.Value]['macd'][2] # calculate our macd vs signal score between -1 and 1 macdMinusSignal = symbolData.MACD.Current.Value - symbolData.MACD.Signal.Current.Value macdVsSignalScore = macdMinusSignal / (1 + abs(macdMinusSignal)) algorithm.Log(str(symbol.Value) + '; MACD: ' + str(round(symbolData.MACD.Current.Value, 2)) + '; MACD Signal: ' + str(round(symbolData.MACD.Signal.Current.Value, 2)) + '; MACD vs Signal Score: ' + str(round(macdVsSignalScore, 2))) if macdVsSignalScore <= macdCondition: weights[symbol] = min(smaModifiedWeight, macdConditionWeight) algorithm.Log(str(symbol.Value) + '; modifying weight due to macd filtering from ' + str(smaModifiedWeight) + ' to ' + str(weights[symbol])) else: weights[symbol] = smaModifiedWeight macdModifiedWeight = weights[symbol] # check if yield condition is met and act accordingly ---------------------------------- activateYield = self.parametersDict[symbol.Value]['yield'][0] yieldConditionWeight = self.parametersDict[symbol.Value]['yield'][1] if yieldSignalCrisis and activateYield: weights[symbol] = min(macdModifiedWeight, yieldConditionWeight) else: weights[symbol] = macdModifiedWeight else: weights[symbol] = optWeights[symbol.Value] algorithm.Log(str(symbol.Value) + '; indicators are not ready so assign optimal weight') return weights class SymbolData: def __init__(self, symbol, parametersDict): self.Symbol = symbol self.dailyReturnsSeries = None smaPeriod = parametersDict[symbol.Value]['sma'][0] self.SMA = SimpleMovingAverage(smaPeriod) macdFastPeriod = parametersDict[self.Symbol.Value]['macd'][0][0] macdSlowPeriod = parametersDict[self.Symbol.Value]['macd'][0][1] macdSignalPeriod = parametersDict[self.Symbol.Value]['macd'][0][2] self.MACD = MovingAverageConvergenceDivergence(macdFastPeriod, macdSlowPeriod, macdSignalPeriod, MovingAverageType.Exponential) def CalculateDailyReturnSeries(self, history, lookbackOptimization): """ Calculate the daily returns series for each security """ tempReturnsSeries = history.loc[str(self.Symbol)]['close'].pct_change(periods=2).dropna() # 2-day returns self.dailyReturnsSeries = tempReturnsSeries[-lookbackOptimization:] def UpdateIndicators(self, history): """ Update the indicators with historical data """ for index, row in history.loc[str(self.Symbol)].iterrows(): self.SMA.Update(index, row['close']) self.MACD.Update(index, row['close'])
from itertools import groupby from OptimizerModules.OptimizerClass import PortfolioOptimizer import numpy as np import pandas as pd import general_utils as utils class CompositePortfolioConstructionModel(PortfolioConstructionModel): def __init__(self, algorithm, alphaModels, optimizationBounds, alphaPortfolioOptLookback=21, alphaPortfolioOptObjFunction='minVariance'): self.algo = algorithm self.alphasInsightReturns = {alphaModel: {} for alphaModel in alphaModels} # nested dict for insight returns self.alphasEqualWeighting = 1 / len(self.alphasInsightReturns.keys()) self.alphasDailyReturns = {alphaModel: {} for alphaModel in alphaModels} # nested dict for daily returns self.alphasCumReturns = {alphaModel: 1 for alphaModel in alphaModels} # store alphas cumulative returns self.alphasLastPeak = {alphaModel: 1 for alphaModel in alphaModels} # store alphas last peak for DD calc self.alphasDD = {alphaModel: 0 for alphaModel in alphaModels} # store alphas DD self.alphasOptWeights = {alphaModel: 0 for alphaModel in alphaModels} # store alphas optimal allocations self.alphasAccuracy = {alphaModel: 0 for alphaModel in alphaModels} # store alphas accuracy self.entryDirection = {alphaModel: {} for alphaModel in alphaModels} # store alphas insights entry direction self.previousPrice = {alphaModel: {} for alphaModel in alphaModels} # store alphas insights previous price # variables for alpha weight optimization self.alphaPortfolioOptLookback = alphaPortfolioOptLookback # number of days for rolling window of returns self.alphaPortfolioOptObjFunction = alphaPortfolioOptObjFunction self.allowAlphaWeightOptimization = True self.optAlphaAllocationReady = False self.optimizer = PortfolioOptimizer() self.optimizationBounds = optimizationBounds self.insightCollection = InsightCollection() self.lastActiveInsights = [] self.removedSymbols = [] self.portfolioLastValue = 0 self.portfolioCumReturns = 1 self.portfolioLastPeak = 1 self.symbolsWeights = {} # retrieve information from object store in live mode if self.algo.LiveMode: alphasOptWeightsOS = utils.ReadFromObjectStore(self.algo, 'AlphasOptWeights') if alphasOptWeightsOS: self.alphasOptWeights = alphasOptWeightsOS self.optAlphaAllocationReady = True self.algo.Log('Portfolio; retrieving alphasOptWeights from object store: ' + str(self.alphasOptWeights)) alphasDdOS = utils.ReadFromObjectStore(self.algo, 'AlphasDD') if alphasDdOS: self.alphasDD = alphasDdOS self.algo.Log('Portfolio; retrieving alphasDD from object store: ' + str(self.alphasDD)) algorithm.Schedule.On(algorithm.DateRules.EveryDay(), algorithm.TimeRules.At(0, 1), self.GetTodayDate) self.GetTodayDate() def CreateTargets(self, algorithm, insights): """ Description: Create portfolio targets from the specified insights Args: algorithm: The algorithm instance insights: The insights to create portfolio targets from Returns: An enumerable of portfolio targets to be sent to the execution model """ targets = [] # skip if no action to be taken if not self.ShouldCreateTargets(algorithm, insights) and self.removedSymbols is None: return targets # create flatten target for each security that was removed from the universe if self.removedSymbols is not None: universeDeselectionTargets = [PortfolioTarget(symbol, 0) for symbol in self.removedSymbols] algorithm.Log('liquidate any open positions for removed symbols: ' + str([x.Value for x in self.removedSymbols])) targets.extend(universeDeselectionTargets) self.removedSymbols = None # add new insights to insight collection self.insightCollection.AddRange(insights) # more than one alpha model can generate insights for the same symbol # we group by alpha model and symbol, and keep the most recent insight self.lastActiveInsights = [] for sourceModel, f in groupby(sorted(self.insightCollection, key=lambda ff: ff.SourceModel), lambda fff: fff.SourceModel): for symbol, g in groupby(sorted(list(f), key=lambda gg: gg.Symbol), lambda ggg: ggg.Symbol): self.lastActiveInsights.append(sorted(g, key=lambda x: x.GeneratedTimeUtc)[-1]) # calculate targets ---------------------------------------------------- symbolsNewWeights = {} for insight in self.lastActiveInsights: if insight.Symbol not in algorithm.ActiveSecurities.Keys: continue # check if we can do optimal alpha allocation and if we have enough data for it if self.optAlphaAllocationReady: # insight weight * the optimal allocation for the model weight = insight.Direction * insight.Weight * self.alphasOptWeights[insight.SourceModel] else: weight = insight.Direction * insight.Weight * self.alphasEqualWeighting # apply leverage weight *= self.algo.accountLeverage # update returns if we close the position with flat insight if (insight.Direction == InsightDirection.Flat and self.entryDirection[insight.SourceModel].get(insight.Symbol) is not None and self.entryDirection[insight.SourceModel].get(insight.Symbol) != 0): self.UpdateReturnsDictionaries(algorithm, insight) self.entryDirection[insight.SourceModel].pop(insight.Symbol) self.previousPrice[insight.SourceModel].pop(insight.Symbol) elif insight.Direction != InsightDirection.Flat: if (self.entryDirection[insight.SourceModel].get(insight.Symbol) is None and self.previousPrice[insight.SourceModel].get(insight.Symbol) is None): self.entryDirection[insight.SourceModel][insight.Symbol] = insight.Direction # simulate entry price self.previousPrice[insight.SourceModel][insight.Symbol] = algorithm.ActiveSecurities[insight.Symbol].Price # add weight to weightBySymbol if insight.Symbol not in symbolsNewWeights: symbolsNewWeights[insight.Symbol] = weight else: symbolsNewWeights[insight.Symbol] += weight algorithm.Log(str(insight.SourceModel) + '; ' + str(insight.Symbol.Value) + '; ' + str(insight.Direction) + '; ' + str(round(weight, 2))) countWeightChangesAbovethreshold = self.CountWeightChangesAboveThreshold(symbolsNewWeights) if countWeightChangesAbovethreshold > 0 or not self.symbolsWeights or self.algo.stopLossTriggered == True: self.symbolsWeights = symbolsNewWeights self.algo.stopLossTriggered == False else: return targets # we add together all the weights for each symbol for symbol, finalWeight in self.symbolsWeights.items(): # calculate the percent target target = PortfolioTarget.Percent(algorithm, symbol, finalWeight) targets.append(target) algorithm.Log('final weights; ' + str(symbol.Value) + '; ' + str(round(finalWeight, 2))) return targets def EveryDayAtOpen(self): """ Update dictionaries every day at the open """ # update and plot portfolio cumulative returns self.PlotPortfolioInfo() # update daily returns ------------------------------------------------- for insight in self.lastActiveInsights: if (insight.Symbol not in self.algo.ActiveSecurities.Keys or not self.algo.ActiveSecurities[insight.Symbol].Invested or insight.Direction == InsightDirection.Flat): continue self.UpdateReturnsDictionaries(self.algo, insight) # update previous price self.previousPrice[insight.SourceModel][insight.Symbol] = self.algo.ActiveSecurities[insight.Symbol].Price # calculate the optimal weights ---------------------------------------- # create a dataframe of returns of last n days from the dictionary alphasDailyReturns dailyReturnsDf = pd.DataFrame(self.alphasDailyReturns)[-self.alphaPortfolioOptLookback:] dailyReturnsDf = dailyReturnsDf.dropna() # run optimization if needed, otherwise keep using the alphasOptWeights from last optimization if self.allowAlphaWeightOptimization: # wait until we have at least a week of returns if len(dailyReturnsDf) >= 5: try: bounds = [self.optimizationBounds[x] for x in dailyReturnsDf.columns] newAlphasOptWeights = self.optimizer.Optimize(objFunction=self.alphaPortfolioOptObjFunction, dailyReturnsDf=dailyReturnsDf, bounds=bounds) self.alphasOptWeights = pd.Series(newAlphasOptWeights, index=dailyReturnsDf.columns).to_dict() # Add left-over allocation to default alpha leftoverallocation = max(0,1 - sum(self.alphasOptWeights.values())) if leftoverallocation !=0: self.alphasOptWeights['RotationalOptimizer1x'] = self.alphasOptWeights['RotationalOptimizer1x'] + leftoverallocation self.allowAlphaWeightOptimization = False self.algo.Log('optimal weights for alphas: ' + '\n' + str(newAlphasOptWeights)) except BaseException as e: self.algo.Log('Optimize failed due to ' + str(e)) # check if all alphas have been optimized and have positive weights self.optAlphaAllocationReady = False if all(x > 0 for x in self.alphasOptWeights.values()): self.optAlphaAllocationReady = True # for each alpha model ------------------------------------------------- for key in self.alphasInsightReturns: # update the cumulative returns for each alpha model and plot it filteredDailyReturns = pd.DataFrame.from_dict({key: value for key, value in self.alphasDailyReturns[key].items() if not np.isnan(value)}, orient='index') self.alphasCumReturns[key] = (filteredDailyReturns.add(1).cumprod().iloc[-1] if not filteredDailyReturns.empty else pd.Series(1))[0] self.algo.Plot('Alphas Cumulative Returns (%)', key, float((self.alphasCumReturns[key] - 1) * 100)) # update DD for each alpha and plot it if self.alphasCumReturns[key] > self.alphasLastPeak[key]: self.alphasLastPeak[key] = self.alphasCumReturns[key] self.alphasDD[key] = (self.alphasCumReturns[key] / self.alphasLastPeak[key]) - 1 self.algo.Plot('Alphas DD (%)', key, float(self.alphasDD[key]) * 100) # get the optimal allocation per alpha and plot it if self.optAlphaAllocationReady: self.algo.Plot('Alphas Allocation (%)', key, float(self.alphasOptWeights[key] * 100)) else: self.algo.Plot('Alphas Allocation (%)', key, float(self.alphasEqualWeighting * 100)) # calculate the accuracy per model for last n insights and plot it # these are the individual returns of last n insights # (we're using lookback times X, assuming X insights per day) #flatReturns = [val for sublist in self.alphasInsightReturns[key].values() # for val in sublist][-(self.alphaPortfolioOptLookback * 5):] #if len(flatReturns) > 0: # self.alphasAccuracy[key] = sum(x > 0 for x in flatReturns) / len(flatReturns) # self.algo.Plot('Alphas Accuracy (% of Positive Days)', key, float(self.alphasAccuracy[key] * 100)) if self.algo.LiveMode or self.algo.BackTestWarmUp == True: utils.SaveToObjectStore(self.algo, 'AlphasOptWeights', self.alphasOptWeights) utils.SaveToObjectStore(self.algo, 'AlphasDD', self.alphasDD) def TriggerAlphaWeightOptimization(self): """ Allow alpha weight optimization to happen """ self.allowAlphaWeightOptimization = True def OnSecuritiesChanged(self, algorithm, changes): """ Actions to take every time the universe changes """ # get removed symbol and invalidate them in the insight collection self.removedSymbols = [x.Symbol for x in changes.RemovedSecurities] self.insightCollection.Clear(self.removedSymbols) for security in changes.AddedSecurities: if security.Symbol.Value == 'SPY': symbol = security.Symbol algorithm.Schedule.On(algorithm.DateRules.EveryDay(symbol), algorithm.TimeRules.AfterMarketOpen(symbol, 120), self.EveryDayAtOpen) algorithm.Schedule.On(algorithm.DateRules.MonthStart(symbol), algorithm.TimeRules.AfterMarketOpen(symbol, 1), self.TriggerAlphaWeightOptimization) def ShouldCreateTargets(self, algorithm, insights): """ Description: Determine whether we should rebalance the portfolio when: - We want to include some new security in the portfolio - We want to modify the direction of some existing security Args: insights: The last set of insights sent """ for insight in insights: if not algorithm.Portfolio[insight.Symbol].Invested and insight.Direction != InsightDirection.Flat: return True elif algorithm.Portfolio[insight.Symbol].Invested: return True return False def CalculateInsightReturn(self, algorithm, insight): """ Calculate the returns from the insights of each model as if 100% was invested in the model, with daily rebalancing """ insightReturn = 0 if (algorithm.ActiveSecurities[insight.Symbol].Price != 0 and self.entryDirection[insight.SourceModel].get(insight.Symbol) is not None and self.previousPrice[insight.SourceModel].get(insight.Symbol) is not None): previousPrice = self.previousPrice[insight.SourceModel][insight.Symbol] currentPrice = algorithm.ActiveSecurities[insight.Symbol].Price entryDirection = self.entryDirection[insight.SourceModel][insight.Symbol] weight = insight.Weight * self.algo.accountLeverage # return the insight return (weight-adjusted) insightReturn = entryDirection * (currentPrice / previousPrice - 1) * weight return insightReturn def UpdateReturnsDictionaries(self, algorithm, insight): """ Update alphasInsightReturns and alphasDailyReturns """ # calculate the insight return insightReturn = self.CalculateInsightReturn(algorithm, insight) # append the dictionary of insight returns by model self.alphasInsightReturns[insight.SourceModel][self.todayDate].append(insightReturn) # update the dictionary of daily returns by model self.alphasDailyReturns[insight.SourceModel][self.todayDate] = sum(self.alphasInsightReturns[insight.SourceModel][self.todayDate]) def CountWeightChangesAboveThreshold(self, symbolsNewWeights, threshold=0.03): """ Check how many new targets deviate enough from current targets """ # calculate the change in new weights vs current weights weightChanges = [abs(symbolsNewWeights[symbol] - weight) for symbol, weight in self.symbolsWeights.items()] # count how many are above a certain threshold countWeightChangesAbovethreshold = sum(x for x in weightChanges if x > threshold) return countWeightChangesAbovethreshold def PlotPortfolioInfo(self): """ Update and plot the portfolio cumulative returns and DD """ if self.portfolioLastValue == 0: self.portfolioLastValue = self.algo.Portfolio.TotalPortfolioValue # update and plot portfolio cumulative returns currentPortfolioValue = self.algo.Portfolio.TotalPortfolioValue portfolioDailyReturn = currentPortfolioValue / self.portfolioLastValue - 1 self.portfolioCumReturns *= (1 + portfolioDailyReturn) self.algo.Plot('Alphas Cumulative Returns (%)', 'Portfolio', float((self.portfolioCumReturns - 1) * 100)) # update and plot portfolio DD if self.portfolioCumReturns > self.portfolioLastPeak: self.portfolioLastPeak = self.portfolioCumReturns currentPortfolioDD = (self.portfolioCumReturns / self.portfolioLastPeak) - 1 self.algo.Plot('Alphas DD (%)', 'Portfolio', float(currentPortfolioDD) * 100) self.portfolioLastValue = self.algo.Portfolio.TotalPortfolioValue def GetTodayDate(self): """ Get today date and update relevant dictionaries """ # get today date for reference self.todayDate = datetime(self.algo.Time.year, self.algo.Time.month, self.algo.Time.day).date() # initialize today empty list for alphas insight returns and daily returns for key in self.alphasInsightReturns: self.alphasInsightReturns[key][self.todayDate] = [] self.alphasDailyReturns[key][self.todayDate] = np.nan
import numpy as np from scipy.optimize import minimize class PortfolioOptimizer: """ Description: Implementation of a custom optimizer that calculates the weights for each asset to optimize a given objective function Details: Optimization can be: - Equal Weighting - Maximize Portfolio Return - Minimize Portfolio Standard Deviation - Maximize Portfolio Sharpe Ratio - Maximize Portfolio Sortino Ratio - Risk Parity Portfolio - Target Return (minimize Standard Deviation given a min target return) - Target Variance (maximize Return given a max target variance) - Target Variance Negative Returns (maximize Return given a max target variance during negative returns) Constraints: - Weights must be between some given boundaries - Weights must sum to 1 """ def __init__(self): """ Initialize the CustomPortfolioOptimizer """ self.targetReturn = 0.1 self.targetVariance = 0.15 def Optimize(self, objFunction, dailyReturnsDf, bounds): """ Description: Perform portfolio optimization given a series of returns Args: objFunction: The objective function to optimize (equalWeighting, maxReturn, minVariance, maxSharpe, maxSortino, riskParity, targetReturn, targetVariance, targetVarianceNegativeReturns) dailyReturnsDf: DataFrame of historical daily arithmetic returns bounds: List of tuples with the min and max weights for each element Returns: Array of double with the portfolio weights (size: K x 1) """ # initial weights: equally weighted size = dailyReturnsDf.columns.size # K x 1 self.initWeights = np.array(size * [1. / size]) # get sample covariance matrix covariance = dailyReturnsDf.cov() # get the sample covariance matrix of only negative returns for sortino ratio negativeReturnsDf = dailyReturnsDf[dailyReturnsDf < 0] covarianceNegativeReturns = negativeReturnsDf.cov() if objFunction == 'equalWeighting': return self.initWeights # create constraints constraints = {'type': 'eq', 'fun': lambda x: np.sum(x) - 1.0} if objFunction == 'targetReturn': constraints.update({'type': 'eq', 'fun': lambda weights: self.CalculateAnnualizedPortfolioReturn(dailyReturnsDf, weights) - self.targetReturn}, {'type': 'eq', 'fun': lambda x: np.sum(x) - 1.0}) if objFunction == 'targetVariance': constraints = ({'type': 'ineq', 'fun': lambda weights: self.targetVariance - self.CalculateAnnualizedPortfolioStd(covariance, weights)}, {'type': 'ineq', 'fun': lambda x: 1.0 - np.sum(x)} ) if objFunction == 'targetVarianceNegativeReturns': constraints = ({'type': 'ineq', 'fun': lambda weights: self.targetVariance - self.CalculateAnnualizedPortfolioNegativeStd(covarianceNegativeReturns, weights)}, {'type': 'ineq', 'fun': lambda x: 1.0 - np.sum(x)} ) opt = minimize(lambda weights: self.ObjectiveFunction(objFunction, dailyReturnsDf, covariance, covarianceNegativeReturns, weights), x0=self.initWeights, bounds=bounds, constraints=constraints, method='SLSQP') return opt['x'] def ObjectiveFunction(self, objFunction, dailyReturnsDf, covariance, covarianceNegativeReturns, weights): """ Description: Compute the objective function Args: objFunction: The objective function to optimize (equalWeighting, maxReturn, minVariance, meanVariance, maxSharpe, maxSortino, riskParity) dailyReturnsDf: DataFrame of historical daily returns covariance: Sample covariance covarianceNegativeReturns: Sample covariance matrix of only negative returns weights: Portfolio weights """ if objFunction == 'maxReturn': f = self.CalculateAnnualizedPortfolioReturn(dailyReturnsDf, weights) return -f # convert to negative to be maximized elif objFunction == 'minVariance': f = self.CalculateAnnualizedPortfolioStd(covariance, weights) return f elif objFunction == 'maxSharpe': f = self.CalculateAnnualizedPortfolioSharpeRatio(dailyReturnsDf, covariance, weights) return -f # convert to negative to be maximized elif objFunction == 'maxSortino': f = self.CalculateAnnualizedPortfolioSortinoRatio(dailyReturnsDf, covarianceNegativeReturns, weights) return -f # convert to negative to be maximized elif objFunction == 'riskParity': f = self.CalculateRiskParityFunction(covariance, weights) return f elif objFunction == 'targetReturn': f = self.CalculateAnnualizedPortfolioStd(covariance, weights) return f elif objFunction in ['targetVariance', 'targetVarianceNegativeReturns']: f = self.CalculateAnnualizedPortfolioReturn(dailyReturnsDf, weights) return -f # convert to negative to be maximized else: raise ValueError(f'PortfolioOptimizer.ObjectiveFunction: objFunction input has to be one of equalWeighting,' + ' maxReturn, minVariance, maxSharpe, maxSortino, riskParity, targetReturn,' + ' targetVariance', 'targetVarianceNegativeReturns') def CalculateAnnualizedPortfolioReturn(self, dailyReturnsDf, weights): annualizedPortfolioReturns = np.sum(((1 + dailyReturnsDf.mean())**252 - 1) * weights) return annualizedPortfolioReturns def CalculateAnnualizedPortfolioStd(self, covariance, weights): annualizedPortfolioStd = np.sqrt( np.dot(weights.T, np.dot(covariance * 252, weights)) ) if annualizedPortfolioStd == 0: raise ValueError(f'PortfolioOptimizer.CalculateAnnualizedPortfolioStd: annualizedPortfolioStd cannot be zero. Weights: {weights}') return annualizedPortfolioStd def CalculateAnnualizedPortfolioNegativeStd(self, covarianceNegativeReturns, weights): annualizedPortfolioNegativeStd = np.sqrt(np.dot(weights.T, np.dot(covarianceNegativeReturns * 252, weights))) if annualizedPortfolioNegativeStd == 0: raise ValueError(f'PortfolioOptimizer.CalculateAnnualizedPortfolioNegativeStd: annualizedPortfolioNegativeStd cannot be zero. Weights: {weights}') return annualizedPortfolioNegativeStd def CalculateAnnualizedPortfolioSharpeRatio(self, dailyReturnsDf, covariance, weights): annualizedPortfolioReturn = self.CalculateAnnualizedPortfolioReturn(dailyReturnsDf, weights) annualizedPortfolioStd = self.CalculateAnnualizedPortfolioStd(covariance, weights) annualizedPortfolioSharpeRatio = annualizedPortfolioReturn / annualizedPortfolioStd return annualizedPortfolioSharpeRatio def CalculateAnnualizedPortfolioSortinoRatio(self, dailyReturnsDf, covarianceNegativeReturns, weights): annualizedPortfolioReturn = self.CalculateAnnualizedPortfolioReturn(dailyReturnsDf, weights) annualizedPortfolioNegativeStd = self.CalculateAnnualizedPortfolioNegativeStd(covarianceNegativeReturns, weights) annualizedPortfolioSortinoRatio = annualizedPortfolioReturn / annualizedPortfolioNegativeStd return annualizedPortfolioSortinoRatio def CalculateRiskParityFunction(self, covariance, weights): """ Spinu formulation for risk parity portfolio """ assetsRiskBudget = self.initWeights portfolioVolatility = self.CalculateAnnualizedPortfolioStd(covariance, weights) x = weights / portfolioVolatility riskParity = (np.dot(x.T, np.dot(covariance, x)) / 2) - np.dot(assetsRiskBudget.T, np.log(x)) return riskParity
from clr import AddReference AddReference("System") AddReference("QuantConnect.Common") AddReference("QuantConnect.Algorithm.Framework") from System import * from QuantConnect import * from QuantConnect.Algorithm import * from QuantConnect.Algorithm.Framework import * from Selection.FundamentalUniverseSelectionModel import FundamentalUniverseSelectionModel class PriceVolumeUniverseSelectionModel(FundamentalUniverseSelectionModel): def __init__(self, minPrice=20, nStocks=500, filterFineData=False, universeSettings=None): super().__init__(filterFineData, universeSettings) self.minPrice = minPrice # min price for stock selection self.nStocks = nStocks # number of stocks to select in universe def SelectCoarse(self, algorithm, coarse): """ Perform coarse selection based on price and dollar volume """ # filter universe to select only stocks with price above preFiltered = [x for x in coarse if x.HasFundamentalData and x.Price > self.minPrice] # sort the tickers by criteria and take the top candidates by dollar volume topDollarVolume = sorted(preFiltered, key=lambda x: x.DollarVolume, reverse=True)[:self.nStocks] topDollarVolumeSymbols = [x.Symbol for x in topDollarVolume] return topDollarVolumeSymbols
class InAndOutAlphaCreationModel(AlphaModel): def __init__(self, algorithm, name, activateAlpha, assetsWeights, rebalancingFrequency): self.algo = algorithm self.Name = name self.activateAlpha = activateAlpha self.assetsWeights = assetsWeights self.rebalancingFrequency = rebalancingFrequency self.weightsDict = {} self.timeToEmitInsights = False self.WeekEndIsHere = False self.weightsRetrieved = False self.previousVolatilityAllocation = 0 # market and list of signals based on ETFs self.signalInstruments = {'PRDC': ['XLI', None], 'METL': ['DBB', None], 'NRES': ['IGE', None], 'DEBT': ['SHY', None], 'USDX': ['UUP', None], 'GOLD': ['GLD', None], 'SLVA': ['SLV', None], 'UTIL': ['XLU', None], 'SHCU': ['FXF', None], 'RICU': ['FXA', None], 'INDU': ['XLI', None], 'MRKT': ['QQQ', None], 'TLT': ['TLT', None], 'QQQ': ['QQQ', None], 'SPY': ['SPY', None]} for signal, tickerSymbol in self.signalInstruments.items(): self.signalInstruments[signal][1] = self.algo.AddEquity(tickerSymbol[0], Resolution.Hour).Symbol self.smaSPY = self.algo.SMA(self.signalInstruments['SPY'][1], 231, Resolution.Daily) self.smaTLT = self.algo.SMA(self.signalInstruments['TLT'][1], 21, Resolution.Daily) self.smaGLD = self.algo.SMA(self.signalInstruments['GOLD'][1], 21, Resolution.Daily) self.signalSymbols = [self.signalInstruments['PRDC'][1], self.signalInstruments['METL'][1], self.signalInstruments['NRES'][1], self.signalInstruments['USDX'][1], self.signalInstruments['DEBT'][1], self.signalInstruments['MRKT'][1]] self.forPairs = [self.signalInstruments['GOLD'][1], self.signalInstruments['SLVA'][1], self.signalInstruments['UTIL'][1], self.signalInstruments['INDU'][1]] self.pairList = ['G_S', 'U_I'] # initialize constants and variables # [out for 3 trading weeks, set period for returns sample, # 'in'/'out' indicator, count of total days since start, # daysCount when self.beIn=0, portfolio value] self.initWaitDays, self.lookback, self.beIn, self.daysCount, self.outDay = [15, 252 * 5, True, 0, 0] # create symbols list self.symbols = list(set(self.signalSymbols + [self.signalInstruments['MRKT'][1]] + self.forPairs + [self.signalInstruments['QQQ'][1]])) # retrieve information from object store in live mode if self.algo.LiveMode: dictFromOS = utils.ReadFromObjectStore(self.algo, 'IO') if 'daysCount' in dictFromOS and 'outDay' in dictFromOS: self.algo.Log('IO; retrieving variable states from object store; daysCount: ' + str(dictFromOS['daysCount']) + '; outDay: ' + str(dictFromOS['outDay'])) self.daysCount, self.outDay = [dictFromOS['daysCount'], dictFromOS['outDay']] self.weightsRetrieved = True def Update(self, algorithm, data): insights = [] if not self.timeToEmitInsights: return insights insightExpiry = Expiry.EndOfDay(algorithm.Time) # loop through insights and send portfolio targets for symbol, weight in self.weightsDict.items(): if symbol.Value.split('_')[0] != 'CUSTOM' and not data.ContainsKey(symbol): continue if weight > 0: insightDirection = InsightDirection.Up elif weight < 0: insightDirection = InsightDirection.Down else: insightDirection = InsightDirection.Flat insights.append(Insight.Price(symbol, insightExpiry, insightDirection, None, None, None, abs(weight))) self.timeToEmitInsights = False return insights def GenerateSignals(self): """ Create new signals """ if not self.activateAlpha: return # get historical data latestHistory, latestHistoryShifted = self.GetHistoricalData() # returns sample to detect extreme observations returnsSample = (latestHistory / latestHistoryShifted) - 1 # Reverse code USDX: sort largest changes to bottom returnsSample[self.signalInstruments['USDX'][1]] = returnsSample[self.signalInstruments['USDX'][1]] * -1 # for pairs, take returns differential, reverse coded returnsSample['G_S'] = -(returnsSample[self.signalInstruments['GOLD'][1]] - returnsSample[self.signalInstruments['SLVA'][1]]) returnsSample['U_I'] = -(returnsSample[self.signalInstruments['UTIL'][1]] - returnsSample[self.signalInstruments['INDU'][1]]) # extreme observations; statistical significance = 5% extremeB = returnsSample.iloc[-1] < np.nanpercentile(returnsSample, 5, axis=0) # re-assess/disambiguate double-edged signals aboveMedian = returnsSample.iloc[-1] > np.nanmedian(returnsSample, axis=0) # interest rate expectations (cost of debt) may increase because the economic outlook improves # (showing in rising input prices) = actually not a negative signal extremeB.loc[self.signalInstruments['DEBT'][1]] = np.where((extremeB.loc[self.signalInstruments['DEBT'][1]].any()) & (aboveMedian[[self.signalInstruments['METL'][1], self.signalInstruments['NRES'][1]]].any()), False, extremeB.loc[self.signalInstruments['DEBT'][1]]) # determine whether 'in' or 'out' of the market if (extremeB[self.signalSymbols + self.pairList]).any(): self.beIn = False self.outDay = self.daysCount if self.daysCount >= self.outDay + self.initWaitDays: self.beIn = True if self.algo.Securities[self.signalInstruments['GOLD'][1]].Price != 0 and self.smaGLD.Current.Value != 0: self.algo.TLTorGLD = (self.algo.Securities[self.signalInstruments['TLT'][1]].Price / self.algo.Securities[self.signalInstruments['GOLD'][1]].Price) > (self.smaTLT.Current.Value/ self.smaGLD.Current.Value) else: self.algo.TLTorGLD = True if not self.beIn: self.assetsWeights['stocksProxy'][1] = 0 self.assetsWeights['volatilityProxy'][1] = 0.0 if self.algo.TLTorGLD: self.assetsWeights['bondsProxy'][1] = 1.0 self.assetsWeights['cashProxy'][1] = 0 self.assetsWeights['inflProxy'][1] = 0.0 else: self.assetsWeights['bondsProxy'][1] = 0.0 self.assetsWeights['cashProxy'][1] = 0.0 self.assetsWeights['inflProxy'][1] = 1.0 if self.WeekEndIsHere or self.weightsRetrieved == True: # or volatilityAllocationChanged: self.WeekEndIsHere = False self.weightsRetrieved = False if self.beIn: #if self.algo.Securities[self.signalInstruments['SPY'][1]].Price > self.smaSPY.Current.Value: # self.assetsWeights['stocksProxy'][1] = 0.0 + (volatilityMax - self.assetsWeights['volatilityProxy'][1]) - 0.0 #else: self.assetsWeights['stocksProxy'][1] = 0.8 self.assetsWeights['bondsProxy'][1] = 0.0 self.assetsWeights['cashProxy'][1] = 0.0 self.assetsWeights['inflProxy'][1] = 0.2 self.assetsWeights['volatilityProxy'][1] = 0.0 volatilityMax = 0.9 volatilityAllocation = 0 if self.algo.isRiskHighest == True: volatilityAllocation = volatilityMax elif self.algo.isRiskHigh == True: volatilityAllocation = volatilityMax*0.40 else: volatilityAllocation = 0 if self.previousVolatilityAllocation != volatilityAllocation or self.algo.stopLossTriggered: volatilityAllocationChanged = True else: volatilityAllocationChanged = False self.previousVolatilityAllocation = volatilityAllocation if volatilityAllocationChanged: self.assetsWeights['volatilityProxy'][1] = volatilityAllocation self.assetsWeights['bondsProxy'][1] = self.assetsWeights['bondsProxy'][1]*(1- volatilityAllocation) self.assetsWeights['cashProxy'][1] = self.assetsWeights['cashProxy'][1]*(1- volatilityAllocation) self.assetsWeights['stocksProxy'][1] = self.assetsWeights['stocksProxy'][1]*(1- volatilityAllocation) self.assetsWeights['inflProxy'][1] = self.assetsWeights['inflProxy'][1]*(1- volatilityAllocation) # update the weightsDict for proxy, symbolWeight in self.assetsWeights.items(): self.weightsDict[symbolWeight[0]] = symbolWeight[1] self.daysCount += 1 self.timeToEmitInsights = True if self.algo.LiveMode or self.algo.BackTestWarmUp == True: dictToOS = {'daysCount': self.daysCount, 'outDay': self.outDay} utils.SaveToObjectStore(self.algo, 'IO', dictToOS) self.algo.Log('InOut : ' + str(self.beIn)) self.algo.Log('InOut Weights - SBC : ' + str(round(self.assetsWeights['stocksProxy'][1], 2)) + "|" + str(round(self.assetsWeights['bondsProxy'][1], 2)) + "|" + str(round(self.assetsWeights['cashProxy'][1], 2)) + "|" + str(round(self.assetsWeights['volatilityProxy'][1], 2))) def WeekEnd(self): self.WeekEndIsHere = True def OnSecuritiesChanged(self, algorithm, changes): """ Actions to take every time the universe changes """ for security in changes.AddedSecurities: if security.Symbol.Value == 'SPY': symbol = security.Symbol algorithm.Schedule.On(self.rebalancingFrequency[0](symbol), algorithm.TimeRules.AfterMarketOpen(symbol, self.rebalancingFrequency[1]), self.GenerateSignals) algorithm.Schedule.On(algorithm.DateRules.WeekEnd(symbol), algorithm.TimeRules.AfterMarketOpen(symbol, 1), self.WeekEnd) def GetHistoricalData(self): """ Get two dataframes with historical data """ latestHistory = self.algo.History(self.symbols, self.lookback, Resolution.Daily) latestHistory = latestHistory['close'].unstack(level=0).dropna() latestHistoryShifted = latestHistory.rolling(11, center=True).mean().shift(60) return latestHistory, latestHistoryShifted
from Alpha.RotationalOptimizerAlphaCreation import RotationalOptimizerAlphaCreationModel from Alpha.RotationalOptimizer1xAlphaCreation import RotationalOptimizer1xAlphaCreationModel from Alpha.BuyAndHoldAlphaCreation import BuyAndHoldAlphaCreationModel from Alpha.VixTermStructureAlphaCreation import VixTermStructureAlphaCreationModel from Alpha.MovingAverageAlphaCreation import MovingAverageAlphaCreationModel from Alpha.InAndOutAlphaCreation import InAndOutAlphaCreationModel from Portfolio.CompositePortfolioConstruction import CompositePortfolioConstructionModel from Alpha.AlphaParameters import GetAlphaParameters from QuantConnect.DataSource import * from custom_data import CustomData class MultiAlphaFrameworkAlgorithm(QCAlgorithmFramework): def Initialize(self): # delete all data in OS #keys = [str(j).split(',')[0][1:] for _, j in enumerate(self.ObjectStore.GetEnumerator())] #for key in keys: # self.ObjectStore.Delete(key) ### user-defined inputs --------------------------------------------------------------------------- self.SetStartDate(2012, 1, 1) # set start date #self.SetEndDate(2021, 8, 1) # set end date self.SetCash(1000000) # set strategy cash # set the account leverage self.accountLeverage = 1.0 self.BackTestWarmUp = False #REMEMBER TO SWITCH THIS BACK self.stopLossTriggered = False self.isRiskHighest = False self.isRiskHigh = False self.TLTflag = False self.TLTorGLD = True # PORTFOLIO ------------------------------------------------------------ # number of days for rolling window of returns alphaPortfolioOptLookback = 147 # options are: maxReturn, minVariance, maxSharpe, maxSortino, riskParity, targetReturn, # targetVariance, targetVarianceNegativeReturns alphaPortfolioOptObjFunction = 'minVariance' ### ------------------------------------------------------------------------------------------------ # apply CustomSecurityInitializer self.SetSecurityInitializer(lambda x: CustomSecurityInitializer(self, x)) # add universe --------------------------------------------------------- tickersToAdd = [] # RO universe parametersDictRO = GetAlphaParameters(self, 'RotationalOptimizer', 'riskyAssetsParameters') tickerCashRO = GetAlphaParameters(self, 'RotationalOptimizer', 'tickerCash') treasuryRatesSymbol = None if GetAlphaParameters(self, 'RotationalOptimizer', 'activate'): for ticker in parametersDictRO.keys(): if parametersDictRO[ticker]['addTicker'][0]: tickersToAdd.append(ticker) tradableTicker = parametersDictRO[ticker]['addTicker'][1] if tradableTicker not in tickersToAdd: tickersToAdd.append(tradableTicker) if tickerCashRO is not None and tickerCashRO not in tickersToAdd: tickersToAdd.append(tickerCashRO) treasuryRatesSymbol = self.AddData(QuandlTreasuryRates, 'USTREASURY/YIELD', Resolution.Daily).Symbol # RO universe - 1x parametersDictRO1x = GetAlphaParameters(self, 'RotationalOptimizer1x', 'riskyAssetsParameters') tickerCashRO1x = GetAlphaParameters(self, 'RotationalOptimizer1x', 'tickerCash') treasuryRatesSymbol1x = None if GetAlphaParameters(self, 'RotationalOptimizer1x', 'activate'): for ticker in parametersDictRO1x.keys(): if parametersDictRO1x[ticker]['addTicker'][0]: tickersToAdd.append(ticker) tradableTicker = parametersDictRO1x[ticker]['addTicker'][1] if tradableTicker not in tickersToAdd: tickersToAdd.append(tradableTicker) if tickerCashRO1x is not None and tickerCashRO1x not in tickersToAdd: tickersToAdd.append(tickerCashRO1x) treasuryRatesSymbol1x = self.AddData(QuandlTreasuryRates, 'USTREASURY/YIELD', Resolution.Daily).Symbol # BH universe - SVOL parametersDictBuyHoldSVOL = GetAlphaParameters(self, 'BuyAndHoldSVOL', 'tickersWeights') if GetAlphaParameters(self, 'BuyAndHoldSVOL', 'activate'): for ticker in parametersDictBuyHoldSVOL.keys(): if ticker not in tickersToAdd: tickersToAdd.append(ticker) # VTS universe parametersDictVTS = GetAlphaParameters(self, 'VixTermStructure', 'tickersWeights') canarySymbolVTS = None vixSymbol, vxvSymbol = None, None if GetAlphaParameters(self, 'VixTermStructure', 'activate'): for ticker in parametersDictVTS.keys(): if ticker not in tickersToAdd: tickersToAdd.append(ticker) canaryTickerVTS = GetAlphaParameters(self, 'MovingAverageSPY', 'canaryTicker') canarySymbolVTS = self.AddEquity(canaryTickerVTS, Resolution.Minute).Symbol vixSymbol = self.AddData(CBOE, 'VIX', Resolution.Daily).Symbol vxvSymbol = self.AddData(CBOE, 'VIX3M', Resolution.Daily).Symbol # MA universe - SPY parametersDictMovingAverageSPY = GetAlphaParameters(self, 'MovingAverageSPY', 'tickersWeights') canarySymbolMovingAverageSPY = None if GetAlphaParameters(self, 'MovingAverageSPY', 'activate'): for ticker in parametersDictMovingAverageSPY.keys(): if ticker not in tickersToAdd: tickersToAdd.append(ticker) canaryTickerMovingAverageSPY = GetAlphaParameters(self, 'MovingAverageSPY', 'canaryTicker') canarySymbolMovingAverageSPY = self.AddEquity(canaryTickerMovingAverageSPY, Resolution.Minute).Symbol # IO universe assetsWeights = GetAlphaParameters(self, 'InAndOut', 'assetsWeights') if GetAlphaParameters(self, 'InAndOut', 'activate'): for proxy, tickerWeight in assetsWeights.items(): # replace ticker with symbol ticker = tickerWeight[0] assetsWeights[proxy][0] = self.AddEquity(ticker, Resolution.Minute).Symbol if ticker not in tickersToAdd: tickersToAdd.append(ticker) # add all universes for ticker in tickersToAdd: if ticker.split('_')[0] == 'CUSTOM': customSecurity = self.AddData(CustomData, ticker, Resolution.Daily) customSecurity.SetFeeModel(ConstantFeeModel(0)) else: self.AddEquity(ticker, Resolution.Minute) # make sure we add SPY for scheduling if 'SPY' not in tickersToAdd: self.AddEquity('SPY', Resolution.Minute) # get all active alpha models alphaModels = [key for key, value in GetAlphaParameters(self).items() if value['activate']] # add alpha models ----------------------------------------------------- self.SetAlpha( CompositeAlphaModel( RotationalOptimizerAlphaCreationModel(self, name='RotationalOptimizer', activateAlpha=GetAlphaParameters(self, 'RotationalOptimizer', 'activate'), parametersDict=parametersDictRO, rebalancingFrequency=GetAlphaParameters(self, 'RotationalOptimizer', 'rebalancingFrequency'), lookbackOptimization=GetAlphaParameters(self, 'RotationalOptimizer', 'lookbackOptimization'), objFunction=GetAlphaParameters(self, 'RotationalOptimizer', 'objFunction'), tickerCash=tickerCashRO, treasuryRatesSymbol=treasuryRatesSymbol), RotationalOptimizer1xAlphaCreationModel(self, name='RotationalOptimizer1x', activateAlpha=GetAlphaParameters(self, 'RotationalOptimizer1x', 'activate'), parametersDict=parametersDictRO1x, rebalancingFrequency=GetAlphaParameters(self, 'RotationalOptimizer1x', 'rebalancingFrequency'), lookbackOptimization=GetAlphaParameters(self, 'RotationalOptimizer1x', 'lookbackOptimization'), objFunction=GetAlphaParameters(self, 'RotationalOptimizer1x', 'objFunction'), tickerCash=tickerCashRO1x, treasuryRatesSymbol=treasuryRatesSymbol1x), BuyAndHoldAlphaCreationModel(self, name='BuyAndHoldSVOL', activateAlpha=GetAlphaParameters(self, 'BuyAndHoldSVOL', 'activate'), parametersDict=parametersDictBuyHoldSVOL, rebalancingFrequency=GetAlphaParameters(self, 'BuyAndHoldSVOL', 'rebalancingFrequency')), VixTermStructureAlphaCreationModel(self, name='VixTermStructure', activateAlpha=GetAlphaParameters(self, 'VixTermStructure', 'activate'), parametersDict=parametersDictVTS, vtsSymbols=[vixSymbol, vxvSymbol], canarySymbol=canarySymbolVTS, periodShortSMA=GetAlphaParameters(self, 'VixTermStructure', 'periodShortSMA'), periodLongSMA=GetAlphaParameters(self, 'VixTermStructure', 'periodLongSMA'), contangoThresholds=GetAlphaParameters(self, 'VixTermStructure', 'contangoThresholds'), rebalancingFrequency=GetAlphaParameters(self, 'VixTermStructure', 'rebalancingFrequency')), MovingAverageAlphaCreationModel(self, name='MovingAverageSPY', activateAlpha=GetAlphaParameters(self, 'MovingAverageSPY', 'activate'), parametersDict=parametersDictMovingAverageSPY, canarySymbol=canarySymbolMovingAverageSPY, periodShortSMA=GetAlphaParameters(self, 'MovingAverageSPY', 'periodShortSMA'), periodLongSMA=GetAlphaParameters(self, 'MovingAverageSPY', 'periodLongSMA'), movingAverageThresholds=GetAlphaParameters(self, 'MovingAverageSPY', 'movingAverageThresholds'), rebalancingFrequency=GetAlphaParameters(self, 'MovingAverageSPY', 'rebalancingFrequency')), InAndOutAlphaCreationModel(self, name='InAndOut', activateAlpha=GetAlphaParameters(self, 'InAndOut', 'activate'), assetsWeights=assetsWeights, rebalancingFrequency=GetAlphaParameters(self, 'InAndOut', 'rebalancingFrequency')) ) ) # add portfolio model -------------------------------------------------- optimizationBounds = {key: value['optimizationBounds'] for key, value in GetAlphaParameters(self).items() if value['activate']} self.SetPortfolioConstruction( CompositePortfolioConstructionModel(self, alphaModels=alphaModels, optimizationBounds=optimizationBounds, alphaPortfolioOptLookback=alphaPortfolioOptLookback, alphaPortfolioOptObjFunction=alphaPortfolioOptObjFunction)) # add execution model -------------------------------------------------- self.SetExecution(ImmediateExecutionModel()) # add risk model ------------------------------------------------------- self.SetRiskManagement(NullRiskManagementModel()) # warm up algorithm self.SetWarmup(200, Resolution.Daily) def CustomSecurityInitializer(self, security): """ Initialize the security """ security.SetLeverage(self.accountLeverage + 1) class QuandlTreasuryRates(PythonQuandl): def __init__(self): self.ValueColumnName = 'value'
import json def CheckHistory(symbol, history): """ Check if the history dataframe is valid """ if (str(symbol) not in history.index or history.loc[str(symbol)].get('open') is None or history.loc[str(symbol)].get('open').isna().any() or history.loc[str(symbol)].get('high') is None or history.loc[str(symbol)].get('high').isna().any() or history.loc[str(symbol)].get('low') is None or history.loc[str(symbol)].get('low').isna().any() or history.loc[str(symbol)].get('close') is None or history.loc[str(symbol)].get('close').isna().any()): return False else: return True def ShouldEmitInsight(self, algorithm, symbol): """ Check if we should emit new insights for a symbol """ tradingDay = algorithm.Time.day generatedInsight = self.generatedInsightBySymbol.get(symbol) if generatedInsight is not None: if self.generatedInsightBySymbol[symbol] == tradingDay: return False else: self.generatedInsightBySymbol[symbol] = tradingDay return True else: self.generatedInsightBySymbol[symbol] = tradingDay return True def SaveToObjectStore(algorithm, fileName, dictToSave): """ Save information into the Object Store """ jsonFile = json.dumps(dictToSave) algorithm.ObjectStore.Save(fileName, jsonFile) def ReadFromObjectStore(algorithm, fileName): if not algorithm.ObjectStore.ContainsKey(fileName): return {} jsonObj = algorithm.ObjectStore.Read(fileName) dictObj = json.loads(jsonObj) return dictObj
class VixTermStructureAlphaCreationModel(AlphaModel): def __init__(self, algorithm, name, activateAlpha, parametersDict, vtsSymbols, canarySymbol, periodShortSMA, periodLongSMA, contangoThresholds, rebalancingFrequency): self.algo = algorithm self.Name = name self.activateAlpha = activateAlpha self.parametersDict = parametersDict self.canarySymbol = canarySymbol self.contangoThresholds = contangoThresholds self.rebalancingFrequency = rebalancingFrequency self.weightsDict = {} self.timeToEmitInsights = False self.isCanaryGreaterThanMA = False self.isCanaryLessThanMA = False self.isCanaryAboveMA = True self.atrStocksCrossover = None self.vtsRatioDiffMA = SimpleMovingAverage(3) self.waterMark = -0.03 self.waterMarkStocks = -0.03 self.VolatilityStopped = False self.counterdays = 0 #self.gdp = self.algo.AddData(Fred, "FRED/GDP", Resolution.Daily,TimeZones.NewYork, True) #self.mcap = self.algo.AddData(Fred, "FRED/NCBCEL", Resolution.Daily,TimeZones.NewYork, True) if self.activateAlpha: self.vixLongSMA = algorithm.SMA(vtsSymbols[0], periodLongSMA, Resolution.Daily) self.vxvLongSMA = algorithm.SMA(vtsSymbols[1], periodLongSMA, Resolution.Daily) self.vixShortSMA = algorithm.SMA(vtsSymbols[0], periodShortSMA, Resolution.Daily) self.vxvShortSMA = algorithm.SMA(vtsSymbols[1], periodShortSMA, Resolution.Daily) self.canarySymbolSMA = algorithm.SMA(canarySymbol, 231, Resolution.Daily) self.canarySymbolSMA34 = algorithm.SMA(canarySymbol, 42, Resolution.Daily) self.canarySymbolSMA05 = algorithm.SMA(canarySymbol, 3, Resolution.Daily) def Update(self, algorithm, data): insights = [] if not self.timeToEmitInsights: return insights insightExpiry = Expiry.EndOfDay(algorithm.Time) # loop through insights and send portfolio targets for symbol, weight in self.weightsDict.items(): if symbol.Value.split('_')[0] != 'CUSTOM' and not data.ContainsKey(symbol): continue if weight > 0: insightDirection = InsightDirection.Up elif weight < 0: insightDirection = InsightDirection.Down else: insightDirection = InsightDirection.Flat insights.append(Insight.Price(symbol, insightExpiry, insightDirection, None, None, None, abs(weight))) self.timeToEmitInsights = False return insights def EvaluateRisk(self): if (not self.activateAlpha or not self.vixLongSMA.IsReady or not self.vxvLongSMA.IsReady or not self.canarySymbolSMA): return # made it a dynamic calculation to avoid problems with the Indicator Extension vtsRatioShort = self.vxvShortSMA.Current.Value / self.vixShortSMA.Current.Value vtsRatioLong = self.vxvLongSMA.Current.Value / self.vixLongSMA.Current.Value diff = vtsRatioShort - vtsRatioLong self.vtsRatioDiffMA.Update(self.algo.Time, diff) isvtsRatioDiffMANegative = self.vtsRatioDiffMA.Current.Value < 0 canaryPrice = self.algo.Securities[self.canarySymbol].Price canaryMA = self.canarySymbolSMA.Current.Value canaryMA34 = self.canarySymbolSMA34.Current.Value canaryMA05 = self.canarySymbolSMA05.Current.Value self.IsCanaryGreaterThanMA = False #canaryPrice > canaryMA * 1.1 self.isCanaryLessThanMA = canaryPrice < canaryMA isTermStructureNegative = vtsRatioShort < vtsRatioLong #isTermStructureNegative = isvtsRatioDiffMANegative isTermStructureLower = vtsRatioShort < 0.923 isTermStructureGreater = vtsRatioShort > 1.25 isVixGreater = self.vixShortSMA.Current.Value > 25 isATRCrossOverSPY = self.atrStocksCrossover.Current.Value > 1.5 isATRCrossOverSPYLongTerm = canaryMA05 < canaryMA34 #self.algo.isRiskHighest = isTermStructureNegative and isTermStructureLower and isVixGreater #self.algo.isRiskHigh = isTermStructureNegative and isVixGreater self.algo.isRiskHighest = False self.algo.isRiskHigh = False self.algo.Log("Term Structure Negative ? :" + str(isTermStructureNegative)) self.algo.Log("Term Structure Lower ? :" + str(isTermStructureLower)) self.algo.Log("Is VIX Greater ? :" + str(isVixGreater)) self.algo.Log("Has ATR CrossOvered ? :" + str(isATRCrossOverSPY)) self.algo.Log("Is SPY < MA ? :" + str(self.isCanaryLessThanMA)) if isATRCrossOverSPY or self.isCanaryLessThanMA: if isTermStructureNegative and isTermStructureLower: self.weightsDict = {security.Symbol: self.parametersDict[security.Symbol.Value][4] for security in self.algo.ActiveSecurities.Values if security.Symbol.Value in self.parametersDict} self.algo.isRiskHighest = True elif isTermStructureNegative: self.weightsDict = {security.Symbol: self.parametersDict[security.Symbol.Value][3] for security in self.algo.ActiveSecurities.Values if security.Symbol.Value in self.parametersDict} self.algo.isRiskHigh = True else: if self.isCanaryLessThanMA or isTermStructureGreater: self.weightsDict = {security.Symbol: self.parametersDict[security.Symbol.Value][5] for security in self.algo.ActiveSecurities.Values if security.Symbol.Value in self.parametersDict} else: self.weightsDict = {security.Symbol: self.parametersDict[security.Symbol.Value][0] for security in self.algo.ActiveSecurities.Values if security.Symbol.Value in self.parametersDict} else: if isTermStructureNegative and isTermStructureLower and isVixGreater: self.weightsDict = {security.Symbol: self.parametersDict[security.Symbol.Value][4] for security in self.algo.ActiveSecurities.Values if security.Symbol.Value in self.parametersDict} self.algo.isRiskHighest = True elif isTermStructureNegative and isVixGreater: self.weightsDict = {security.Symbol: self.parametersDict[security.Symbol.Value][3] for security in self.algo.ActiveSecurities.Values if security.Symbol.Value in self.parametersDict} self.algo.isRiskHigh = True else: if self.isCanaryLessThanMA or isTermStructureGreater: self.weightsDict = {security.Symbol: self.parametersDict[security.Symbol.Value][5] for security in self.algo.ActiveSecurities.Values if security.Symbol.Value in self.parametersDict} else: self.weightsDict = {security.Symbol: self.parametersDict[security.Symbol.Value][0] for security in self.algo.ActiveSecurities.Values if security.Symbol.Value in self.parametersDict} if vtsRatioShort < 0.85 and isTermStructureNegative == False: self.weightsDict = {security.Symbol: self.parametersDict[security.Symbol.Value][2] for security in self.algo.ActiveSecurities.Values if security.Symbol.Value in self.parametersDict} self.algo.isRiskHighest = False self.algo.isRiskHigh = False if vtsRatioShort > 0.923 and self.vixShortSMA.Current.Value > 27 and isTermStructureNegative == False : self.weightsDict = {security.Symbol: self.parametersDict[security.Symbol.Value][2] for security in self.algo.ActiveSecurities.Values if security.Symbol.Value in self.parametersDict} self.algo.isRiskHighest = False self.algo.isRiskHigh = False #if not self.algo.TLTorGLD: # for symbol, weight in self.weightsDict.items(): # if symbol.Value == "TMF": # TMFweight = self.weightsDict[symbol] # self.weightsDict[symbol] = 0 # for symbol, weight in self.weightsDict.items(): # if symbol.Value == "TIP": # self.weightsDict[symbol] = TMFweight def GenerateSignals(self): self.timeToEmitInsights = True def VolatilityStopLoss(self): if self.algo.Time.hour == 9 or self.algo.Time.hour == 10: return stopLossLevel = 0.035 for symbol in self.weightsDict.keys(): if symbol.Value == 'UVXY': pnlVolatility = self.algo.Securities[symbol.Value].Holdings.UnrealizedProfitPercent if pnlVolatility > self.waterMark + stopLossLevel: self.waterMark = pnlVolatility - stopLossLevel if pnlVolatility < self.waterMark: self.VolatilityStopped = True self.waterMark = -stopLossLevel #for symbol in self.weightsDict.keys(): if self.VolatilityStopped == True: # and symbol.Value == 'UVXY': self.weightsDict[symbol] = 0 self.VolatilityStopped = False #self.timeToEmitInsights = True self.algo.SetHoldings(symbol, 0) self.algo.stopLossTriggered = True def StocksStopLoss(self): if self.algo.Time.hour == 9 or self.algo.Time.hour == 10: return StocksStopLossLevel = 0.15 for symbol in self.weightsDict.keys(): if symbol.Value == 'TQQQ': pnlStocks = self.algo.Securities[symbol.Value].Holdings.UnrealizedProfitPercent if pnlStocks > self.waterMarkStocks + StocksStopLossLevel: self.waterMarkStocks = pnlStocks - StocksStopLossLevel if pnlStocks < self.waterMarkStocks: self.waterMarkStocks = -StocksStopLossLevel self.weightsDict[symbol] = 0 self.algo.SetHoldings(symbol, 0) self.algo.stopLossTriggered = True def OnMonthStart(self): if self.algo.Securities[self.canarySymbol].Price >= self.canarySymbolSMA.Current.Value and self.algo.Securities[self.canarySymbol].Price <= self.canarySymbolSMA.Current.Value*1.1: self.isCanaryAboveMA = True else: self.isCanaryAboveMA = False def OnSecuritiesChanged(self, algorithm, changes): """ Actions to take every time the universe changes """ for security in changes.AddedSecurities: if security.Symbol.Value == 'SPY': symbol = security.Symbol algorithm.Schedule.On(self.rebalancingFrequency[0](symbol), algorithm.TimeRules.AfterMarketOpen(symbol, self.rebalancingFrequency[1]-1), self.EvaluateRisk) algorithm.Schedule.On(self.rebalancingFrequency[0](symbol), algorithm.TimeRules.AfterMarketOpen(symbol, self.rebalancingFrequency[1]), self.GenerateSignals) algorithm.Schedule.On(self.rebalancingFrequency[0](symbol), algorithm.TimeRules.Every(TimeSpan.FromMinutes(60)) , self.VolatilityStopLoss) algorithm.Schedule.On(algorithm.DateRules.MonthStart(symbol), algorithm.TimeRules.AfterMarketOpen(symbol, 1), self.OnMonthStart) if self.activateAlpha and security.Symbol.Value in ['SPY']: shortATR = algorithm.ATR(security.Symbol, 10, Resolution.Hour) longATR = algorithm.ATR(security.Symbol, 63, Resolution.Hour) longerATR = algorithm.ATR(security.Symbol, 210, Resolution.Hour) self.atrStocksCrossover = IndicatorExtensions.Over(shortATR, longATR) self.atrStocksCrossoverLongTerm = IndicatorExtensions.Over(longATR, longerATR) class Fred(PythonQuandl): def __init__(self): self.ValueColumnName = 'value'
class CustomData(PythonData): """ Add custom ticker """ def __init__(self): # dictionary with custom tickers and urls self.urls = {'CUSTOM_SPD': 'https://www.dropbox.com/s/fgx97uwvcdnb3ib/spd_index.csv?dl=1', 'CUSTOM_SVOL': 'https://www.dropbox.com/s/v4aq9l49572gyvr/svol_bsv_3_index.csv?dl=1'} def GetSource(self, config, date, isLive): # retrieve the relevant url source = self.urls.get(config.Symbol.Value, '') return SubscriptionDataSource(source, SubscriptionTransportMedium.RemoteFile) def Reader(self, config, line, date, isLive): # if first character is not digit, pass if not (line.strip() and line[0].isdigit()): return None data = line.split(',') customTicker = CustomData() customTicker.Symbol = config.Symbol customTicker.Time = datetime.strptime(data[0], '%d/%m/%Y') customTicker.Value = float(data[1]) return customTicker
# Version that works with 3x def GetAlphaParameters(self, alphaName=None, parameter=None): stocksTicker = 'TQQQ' spyTicker = 'SPXL' bondsTicker = 'TMF' cashTicker = 'IEF' inflTicker = 'TIP' goldTicker = 'IAU' volatilityTicker = 'UVXY' volatilityFactor = 1 minutesAfterStart = 90 alphaParametersDict = {'RotationalOptimizer': {'activate': True, 'riskyAssetsParameters': { 'SPY': {'addTicker': [True, spyTicker], 'sma': [200, (-0.10, 0.10), 0.0], 'macd': [(231, 567, 168), 0, 0.3], 'yield': [True, 0], 'optimizationBounds': (0, 1)}, 'TLT': {'addTicker': [True, bondsTicker], 'sma': [600, (-0.2, 0.2), 0.0], 'macd': [(63, 168, 42), 0, 0], 'yield': [False, 0], 'optimizationBounds': (0, 1)}, 'GLD': {'addTicker': [True, goldTicker], 'sma': [100, (-0.15, 0.15), 0.0], 'macd': [(50, 150, 30), 0, 0.3], 'yield': [False, 0], 'optimizationBounds': (0, 1)}, 'BLOK': {'addTicker': [True, 'BLOK'], #Change to BITO 'sma': [100, (-0.15, 0.15), 0.0], 'macd': [(50, 150, 30), 0, 0.3], 'yield': [False, 0], 'optimizationBounds': (0, 1)} }, 'lookbackOptimization': 63, 'objFunction': 'minVariance', 'tickerCash': inflTicker, 'rebalancingFrequency': (self.DateRules.MonthStart, minutesAfterStart), 'optimizationBounds': (0.10, 0.80)}, 'RotationalOptimizer1x': {'activate': True, 'riskyAssetsParameters': { 'CUSTOM_SVOL': {'addTicker': [True, 'CUSTOM_SVOL'], #Change to actual tickers 'sma': [200, (-0.10, 0.10), 1], 'macd': [(231, 567, 168), 0, 1], 'yield': [False, 0], 'optimizationBounds': (0, 1)}, 'CUSTOM_SPD': {'addTicker': [True, 'CUSTOM_SPD'], 'sma': [200, (-0.10, 0.10), 1], 'macd': [(231, 567, 168), 0, 1], 'yield': [False, 0], 'optimizationBounds': (0, 1)}, 'TIP': {'addTicker': [True, 'TIP'], 'sma': [100, (-0.15, 0.15), 1], 'macd': [(50, 150, 30), 0, 1], 'yield': [False, 0], 'optimizationBounds': (0, 1)}, }, 'lookbackOptimization': 21, 'objFunction': 'minVariance', 'tickerCash': inflTicker, 'rebalancingFrequency': (self.DateRules.EveryDay, minutesAfterStart), 'optimizationBounds': (0.01, 0.05)}, 'BuyAndHoldSVOL': {'activate': False, 'tickersWeights': {'CUSTOM_SVOL': 0.33, 'CUSTOM_SPD': 0.33,'TQQQ': 0.00, 'TMF': 0.00, 'BITO': 0.00, 'IAU': 0.00, 'DGP': 0, 'BND': 0, 'SVXY': 0, 'IEF': 0.00, 'TIP': 0.34, 'TLT': 0, 'TYD': 0}, 'rebalancingFrequency': (self.DateRules.MonthStart, minutesAfterStart), 'optimizationBounds': (0.14, 0.15)}, 'VixTermStructure': {'activate': True, 'tickersWeights': { volatilityTicker: [0.0, 0.1, 0.2, 0.4, 1.0], stocksTicker: [0.6, 0.9, 0.8, 0.6, 0.0], bondsTicker: [0.4, 0.0, 0.0, 0.0, 0.0], goldTicker: [0.0, 0.0, 0.0, 0.0, 0.0]}, 'canaryTicker': 'SPY', 'periodShortSMA': 3, 'periodLongSMA': 15, 'contangoThresholds': [1.25, 0.923], 'rebalancingFrequency': (self.DateRules.EveryDay, minutesAfterStart), 'optimizationBounds': (0.30, 0.40)}, 'MovingAverageSPY': {'activate': False, 'tickersWeights': { 'TQQQ': [0.14, 0.14, 0.00], 'TMF': [0.07, 0.07, 0.07], 'IAU': [0.04, 0.04, 0.04], 'IEF': [0.35, 0.35, 0.40], 'TIP': [0.40, 0.40, 0.49] }, 'canaryTicker': 'SPY', 'periodShortSMA': 1, 'periodLongSMA': 168, 'movingAverageThresholds': [1.1, 1.0], 'rebalancingFrequency': (self.DateRules.MonthStart, minutesAfterStart), 'optimizationBounds': (0.01, 1)}, 'InAndOut': {'activate': True, 'assetsWeights': {'stocksProxy': [stocksTicker, 0.6], 'bondsProxy': [bondsTicker, 0.4], 'cashProxy': [cashTicker, 0], 'inflProxy': [inflTicker, 0], 'goldProxy': [goldTicker, 0], 'volatilityProxy': [volatilityTicker, 0]}, 'rebalancingFrequency': (self.DateRules.EveryDay, minutesAfterStart), 'optimizationBounds': (0.20, 0.40)}, } # if no filtering elements provided, return the whole dictionary if alphaName is None and parameter is None: return alphaParametersDict return alphaParametersDict[alphaName][parameter] # With 15% DD def GetAlphaParameters(self, alphaName=None, parameter=None): stocksTicker = 'TQQQ' spyTicker = 'SPXL' bondsTicker = 'TMF' cashTicker = 'IEF' inflTicker = 'TIP' goldTicker = 'IAU' volatilityTicker = 'UVXY' volatilityFactor = 1 minutesAfterStart = 90 alphaParametersDict = {'RotationalOptimizer': {'activate': True, 'riskyAssetsParameters': {'SPY': {'addTicker': [True, spyTicker], 'sma': [200, (-0.10, 0.10), 0.0], 'macd': [(231, 567, 168), 0, 0.3], 'yield': [True, 0], 'optimizationBounds': (0, 1)}, 'TLT': {'addTicker': [True, bondsTicker], 'sma': [600, (-0.2, 0.2), 0.0], 'macd': [(63, 168, 42), 0, 0], 'yield': [False, 0], 'optimizationBounds': (0, 1)}, 'GLD': {'addTicker': [True, goldTicker], 'sma': [100, (-0.15, 0.15), 0.0], 'macd': [(50, 150, 30), 0, 0.3], 'yield': [False, 0], 'optimizationBounds': (0, 1)}, 'BLOK': {'addTicker': [True, 'BLOK'], #Change to BITO 'sma': [100, (-0.15, 0.15), 0.0], 'macd': [(50, 150, 30), 0, 0.3], 'yield': [False, 0], 'optimizationBounds': (0, 1)} }, 'lookbackOptimization': 63, 'objFunction': 'minVariance', 'tickerCash': inflTicker, 'rebalancingFrequency': (self.DateRules.MonthStart, minutesAfterStart), 'optimizationBounds': (0.15, 0.50)}, 'RotationalOptimizer1x': {'activate': True, 'riskyAssetsParameters': { 'CUSTOM_SVOL': {'addTicker': [True, 'CUSTOM_SVOL'], #Change to actual tickers 'sma': [200, (-0.10, 0.10), 1], 'macd': [(231, 567, 168), 0, 1], 'yield': [False, 0], 'optimizationBounds': (0, 1)}, 'CUSTOM_SPD': {'addTicker': [True, 'CUSTOM_SPD'], 'sma': [200, (-0.10, 0.10), 1], 'macd': [(231, 567, 168), 0, 1], 'yield': [False, 0], 'optimizationBounds': (0, 1)}, 'TIP': {'addTicker': [True, 'TIP'], 'sma': [100, (-0.15, 0.15), 1], 'macd': [(50, 150, 30), 0, 1], 'yield': [False, 0], 'optimizationBounds': (0, 1)}, }, 'lookbackOptimization': 21, 'objFunction': 'minVariance', 'tickerCash': inflTicker, 'rebalancingFrequency': (self.DateRules.WeekStart, minutesAfterStart), 'optimizationBounds': (0.14, 0.15)}, 'BuyAndHoldSVOL': {'activate': False, 'tickersWeights': {'CUSTOM_SVOL': 0, 'TQQQ': 0.05, 'TMF': 0.03, 'BITO': 0.07, 'IAU': 0.15, 'DGP': 0, 'BND': 0, 'SVXY': 0, 'IEF': 0.35, 'TIP': 0.35, 'TLT': 0, 'TYD': 0}, 'rebalancingFrequency': (self.DateRules.MonthStart, minutesAfterStart), 'optimizationBounds': (0.01, 1)}, 'VixTermStructure': {'activate': True, 'tickersWeights': {volatilityTicker: [0.0, 0.1, 0.2, 0.5, 1.0], stocksTicker: [0.8, 0.9, 0.8, 0.5, 0.0], bondsTicker: [0.2, 0.0, 0.0, 0.0, 0.0]}, 'canaryTicker': 'SPY', 'periodShortSMA': 3, 'periodLongSMA': 15, 'contangoThresholds': [1.25, 0.923], 'rebalancingFrequency': (self.DateRules.EveryDay, minutesAfterStart), 'optimizationBounds': (0.19, 0.20)}, 'MovingAverageSPY': {'activate': False, 'tickersWeights': { 'TQQQ': [0.14, 0.14, 0.00], 'TMF': [0.07, 0.07, 0.07], 'IAU': [0.04, 0.04, 0.04], 'IEF': [0.35, 0.35, 0.40], 'TIP': [0.40, 0.40, 0.49] }, 'canaryTicker': 'SPY', 'periodShortSMA': 1, 'periodLongSMA': 168, 'movingAverageThresholds': [1.1, 1.0], 'rebalancingFrequency': (self.DateRules.MonthStart, minutesAfterStart), 'optimizationBounds': (0.01, 1)}, 'InAndOut': {'activate': True, 'assetsWeights': {'stocksProxy': [stocksTicker, 0.6], 'bondsProxy': [bondsTicker, 0.4], 'cashProxy': [cashTicker, 0], 'inflProxy': [inflTicker, 0], 'goldProxy': [goldTicker, 0], 'volatilityProxy': [volatilityTicker, 0]}, 'rebalancingFrequency': (self.DateRules.EveryDay, minutesAfterStart), 'optimizationBounds': (0.15, 0.25)}, } # if no filtering elements provided, return the whole dictionary if alphaName is None and parameter is None: return alphaParametersDict return alphaParametersDict[alphaName][parameter]
from OptimizerModules.OptimizerClass import PortfolioOptimizer import pandas as pd import general_utils as utils class RotationalOptimizer1xAlphaCreationModel(AlphaModel): def __init__(self, algorithm, name, activateAlpha, parametersDict, rebalancingFrequency, lookbackOptimization, objFunction, tickerCash, treasuryRatesSymbol): self.algo = algorithm self.Name = name self.activateAlpha = activateAlpha self.parametersDict = parametersDict self.rebalancingFrequency = rebalancingFrequency self.lookbackOptimization = lookbackOptimization self.objFunction = objFunction self.tickerCash = tickerCash self.treasuryRatesSymbol = treasuryRatesSymbol self.leverageFactor = 1 # for yield signal crisis self.lookbackNegativeYield = 147 # number of days to lookback for negative values self.startCrisisYieldValue = 0.3 # the yield value above which we apply the yield weight condition # initialize the optimizer self.optimizer = PortfolioOptimizer() # get all the parameters for the indicators valuesList = [] self.alphaTickers = [] for ticker in parametersDict.keys(): if parametersDict[ticker]['addTicker'][0]: valuesList.append(parametersDict[ticker]['sma'][0]) valuesList.append(sum(parametersDict[ticker]['macd'][0])) self.alphaTickers.append(ticker) tradableTicker = parametersDict[ticker]['addTicker'][1] if tradableTicker != ticker: self.alphaTickers.append(tradableTicker) self.alphaTickers.append(self.tickerCash) # keep the highest parameter provided to call history self.lookbackHistory = max(lookbackOptimization, max(valuesList)) self.weightsDict = {} self.timeToEmitInsights = False self.atrStocksCrossover = None self.atrBondsCrossover = None self.weightsRetrieved = False self.weekStart = False # Sudhir changes to code to optimize on start if nothing found in object store. if self.algo.LiveMode: dictFromOS = utils.ReadFromObjectStore(self.algo, 'RO1x') if dictFromOS: self.algo.Log('RO1x; retrieving last optimal weights from object store: ' + str(dictFromOS)) self.weightsDict = {self.algo.Symbol(ticker): weight for ticker, weight in dictFromOS.items()} self.weightsRetrieved = True def Update(self, algorithm, data): insights = [] if not self.timeToEmitInsights: return insights insightExpiry = Expiry.EndOfDay(algorithm.Time) # loop through insights and send portfolio targets for symbol, weight in self.weightsDict.items(): if symbol.Value.split('_')[0] != 'CUSTOM' and not data.ContainsKey(symbol): continue if weight > 0: insightDirection = InsightDirection.Up elif weight < 0: insightDirection = InsightDirection.Down else: insightDirection = InsightDirection.Flat insights.append(Insight.Price(symbol, insightExpiry, insightDirection, None, None, None, abs(weight))) self.timeToEmitInsights = False return insights def GenerateSignals(self): """ Create new signals """ if not self.activateAlpha: return #if self.algo.TLTorGLD: # self.tickerCash = "IEF" #else: # self.tickerCash = "TIP" # get active symbols for this alpha symbols = [x.Symbol for x in self.algo.ActiveSecurities.Values if x.Symbol.Value in self.alphaTickers] # determine target percent for the given insights (check function DetermineTargetPercent for details) self.weightsDict = self.DetermineTargetPercent(self.algo, symbols) self.CalculateModelLeverageFactor() self.timeToEmitInsights = True if self.algo.LiveMode or self.algo.BackTestWarmUp == True: dictToOS = {symbol.Value: weight for symbol, weight in self.weightsDict.items()} utils.SaveToObjectStore(self.algo, 'RO1x', dictToOS) def CheckCrossoverATR(self): if self.weightsRetrieved == True: self.weightsRetrieved = False self.timeToEmitInsights = True """ Check if there was an ATR Crossover to trigger GenerateSignals """ if (self.atrStocksCrossover is not None and self.atrBondsCrossover is not None and (self.atrStocksCrossover.Current.Value > 1.5 or self.atrBondsCrossover.Current.Value > 1.5) and (self.weekStart)): self.algo.Log('RO; triggering GenerateSignals due to ATR Crossover') self.GenerateSignals() self.weekStart = False def OnWeekStart(self): self.weekStart = True def OnSecuritiesChanged(self, algorithm, changes): """ Actions to take every time the universe changes """ for security in changes.AddedSecurities: if security.Symbol.Value == 'SPY': symbol = security.Symbol algorithm.Schedule.On(self.rebalancingFrequency[0](symbol), algorithm.TimeRules.AfterMarketOpen(symbol, self.rebalancingFrequency[1]), self.GenerateSignals) algorithm.Schedule.On(algorithm.DateRules.EveryDay(symbol), algorithm.TimeRules.AfterMarketOpen(symbol, self.rebalancingFrequency[1]), self.CheckCrossoverATR) algorithm.Schedule.On(algorithm.DateRules.WeekStart(symbol), algorithm.TimeRules.AfterMarketOpen(symbol, 1), self.OnWeekStart) # initialize atr indicators if self.activateAlpha and security.Symbol.Value in ['SPY', 'TLT']: shortATR = algorithm.ATR(security.Symbol, 10, Resolution.Hour) longATR = algorithm.ATR(security.Symbol, 63, Resolution.Hour) if security.Symbol.Value == 'SPY': self.atrStocksCrossover = IndicatorExtensions.Over(shortATR, longATR) elif security.Symbol.Value == 'TLT': self.atrBondsCrossover = IndicatorExtensions.Over(shortATR, longATR) def DetermineTargetPercent(self, algorithm, symbols): """ Description: Determine the target percent for each insight Args: algorithm: The algorithm instance symbols: The symbols to generate an insight for """ # empty dictionary to store portfolio targets by symbol finalWeights = {} # get symbols for calculations calculationSymbols = [x for x in symbols if x.Value in self.parametersDict.keys()] # get historical data for calculationSymbols for the last n trading days history = algorithm.History(calculationSymbols, self.lookbackHistory, Resolution.Daily) #Changed to Hour # empty dictionary for calculations calculations = {} # iterate over all symbols and perform calculations for symbol in calculationSymbols: # check if we have enough historical data, otherwise just skip this security if not utils.CheckHistory(symbol, history): algorithm.Log(str(symbol.Value) + ': skipping security due to no/not enough historical data') continue else: # add symbol to calculations calculations[symbol] = SymbolData(symbol, parametersDict=self.parametersDict) try: # get series of daily returns calculations[symbol].CalculateDailyReturnSeries(history, self.lookbackOptimization) # update technical indicators calculations[symbol].UpdateIndicators(history) except BaseException as e: algorithm.Log('returning empty weights for now for: ' + str(symbol.Value) + 'due to ' + str(e) + '; we will try again at the next iteration') return finalWeights # calculate optimal weights optWeights = self.CalculateOptimalWeights(algorithm, calculations) algorithm.Log('optimal weights for the period: ' + str(optWeights)) if optWeights is None: algorithm.Log('returning empty weights due to optWeights being None; we will try again at the next iteration') return finalWeights # modify optimal weights using specific criteria finalWeights = self.FilterOptimalWeights(algorithm, calculations, optWeights) algorithm.Log('filtered optimal weights for the period: ' + str({symbol.Value: weight for symbol, weight in finalWeights.items()})) # check if we can trade the tradable symbol, otherwise we trade the original ticker replaceSymbol = {} for symbol in finalWeights: tradableTicker = self.parametersDict[symbol.Value]['addTicker'][1] if symbol.Value != tradableTicker: tradableSymbol = algorithm.Symbol(tradableTicker) # check if the price of tradableSymbol is not zero if algorithm.Securities[tradableSymbol].Price != 0: # we trade the original ticker replaceSymbol[tradableSymbol] = symbol for newSymbol, replacedSymbol in replaceSymbol.items(): finalWeights[newSymbol] = finalWeights[replacedSymbol] finalWeights.pop(replacedSymbol) # check how much we have allocated so far to risky assets totalWeight = sum(finalWeights.values()) if totalWeight >= 1: totalWeight = 1 algorithm.Log('total allocation after weight filtering: ' + str(totalWeight)) # allocate remaining cash to tickerCash for symbol in symbols: if symbol.Value == self.tickerCash: if symbol in finalWeights.keys(): finalWeights[symbol] = finalWeights[symbol] + (1 - totalWeight) else: finalWeights[symbol] = 1 - totalWeight algorithm.Log(str(self.tickerCash) + '; final allocation for tickerCash: ' + str(finalWeights[symbol])) # avoid very small numbers and make them 0 for symbol, weight in finalWeights.items(): if weight <= 1e-10: finalWeights[symbol] = 0 return finalWeights def CalculateOptimalWeights(self, algorithm, calculations): """ Description: Calculate the individual weights for each symbol that optimize some given objective function Args: algorithm: The algorithm instance calculations: Dictionary containing calculations for each symbol """ # create a dictionary keyed by the symbols in calculations with a pandas.Series as value # to create a dataframe of returns dailyReturnsDict = {symbol.Value: symbolData.dailyReturnsSeries for symbol, symbolData in calculations.items()} dailyReturnsDf = pd.DataFrame(dailyReturnsDict) listTickers = list(dailyReturnsDf.columns) try: # portfolio optimizer finds the optimal weights for the given data bounds = [self.parametersDict[x]['optimizationBounds'] for x in dailyReturnsDf.columns] listOptWeights = self.optimizer.Optimize(objFunction=self.objFunction, dailyReturnsDf=dailyReturnsDf, bounds=bounds) except BaseException as e: algorithm.Log('Optimize failed due to ' + str(e)) return None # create dictionary with the optimal weights by symbol weights = {listTickers[i]: listOptWeights[i] for i in range(len(listTickers))} # avoid very small numbers and make them 0 for ticker, weight in weights.items(): if weight <= 1e-10: weights[ticker] = 0 return weights def CheckYieldSignalCrisis(self): """ Check the Yield condition """ yieldSignalCrisis = False # get the last six months of historical USTREASURY/YIELD values histYield = self.algo.History([self.treasuryRatesSymbol], self.lookbackNegativeYield + 1, Resolution.Daily).loc[self.treasuryRatesSymbol] tenYr = histYield['10 yr'] # get the 10-year yield threeMo = histYield['3 mo'] # get the 3-month yield tenYrMinusThreeMo = tenYr - threeMo # calculate the difference between the two indexNegative = tenYrMinusThreeMo[tenYrMinusThreeMo < 0].head(1).index # check if there was actually some negative yield values if len(indexNegative) > 0: cutOff = indexNegative[0] # filter the series for days after that day with negative value afterNegative = tenYrMinusThreeMo[tenYrMinusThreeMo.index > cutOff] # check if at some point it reached our startCrisisYieldValue if len(afterNegative) > 0 and max(afterNegative) > self.startCrisisYieldValue: yieldSignalCrisis = True return yieldSignalCrisis def FilterOptimalWeights(self, algorithm, calculations, optWeights): """ Description: Filter and modify the optimal weights using a combination of technical indicators Args: algorithm: The algorithm instance calculations: Dictionary containing calculations for each symbol optWeights: Dictionary with the optimal weights by symbol """ # check for yield signal crisis yieldSignalCrisis = self.CheckYieldSignalCrisis() # empty dictionary to store weights weights = {} # loop through calculations and check conditions for weight filtering ------------------------ for symbol, symbolData in calculations.items(): if symbolData.SMA.IsReady and symbolData.MACD.IsReady: currentPrice = algorithm.ActiveSecurities[symbol].Price # check if sma condition is met and act accordingly ---------------------------------- smaLowerBoundCondition = self.parametersDict[symbol.Value]['sma'][1][0] smaUpperBoundCondition = self.parametersDict[symbol.Value]['sma'][1][1] smaConditionWeight = self.parametersDict[symbol.Value]['sma'][2] algorithm.Log(str(symbol.Value) + '; current price: ' + str(round(currentPrice, 2)) + '; SMA: ' + str(round(symbolData.SMA.Current.Value, 2)) + '; Price vs SMA: ' + str(round((currentPrice / symbolData.SMA.Current.Value) - 1, 2))) if (currentPrice <= symbolData.SMA.Current.Value * (1 + smaLowerBoundCondition) or currentPrice >= symbolData.SMA.Current.Value * (1 + smaUpperBoundCondition)): weights[symbol] = min(optWeights[symbol.Value], smaConditionWeight) algorithm.Log(str(symbol.Value) + '; modifying weight due to sma filtering from ' + str(optWeights[symbol.Value]) + ' to ' + str(weights[symbol])) else: weights[symbol] = optWeights[symbol.Value] smaModifiedWeight = weights[symbol] # check if macd condition is met and act accordingly ---------------------------------- macdCondition = self.parametersDict[symbol.Value]['macd'][1] macdConditionWeight = self.parametersDict[symbol.Value]['macd'][2] # calculate our macd vs signal score between -1 and 1 macdMinusSignal = symbolData.MACD.Current.Value - symbolData.MACD.Signal.Current.Value macdVsSignalScore = macdMinusSignal / (1 + abs(macdMinusSignal)) algorithm.Log(str(symbol.Value) + '; MACD: ' + str(round(symbolData.MACD.Current.Value, 2)) + '; MACD Signal: ' + str(round(symbolData.MACD.Signal.Current.Value, 2)) + '; MACD vs Signal Score: ' + str(round(macdVsSignalScore, 2))) if macdVsSignalScore <= macdCondition: weights[symbol] = min(smaModifiedWeight, macdConditionWeight) algorithm.Log(str(symbol.Value) + '; modifying weight due to macd filtering from ' + str(smaModifiedWeight) + ' to ' + str(weights[symbol])) else: weights[symbol] = smaModifiedWeight macdModifiedWeight = weights[symbol] # check if yield condition is met and act accordingly ---------------------------------- activateYield = self.parametersDict[symbol.Value]['yield'][0] yieldConditionWeight = self.parametersDict[symbol.Value]['yield'][1] if yieldSignalCrisis and activateYield: weights[symbol] = min(macdModifiedWeight, yieldConditionWeight) else: weights[symbol] = macdModifiedWeight else: weights[symbol] = optWeights[symbol.Value] algorithm.Log(str(symbol.Value) + '; indicators are not ready so assign default weight') return weights def CalculateModelLeverageFactor(self): sp = 0 peakDD = 15 #Set up as 50% of the peak DD in observable history. At this level of DD, the algo will go "all-in" minallocation = 1 # Calculates the scaling percentage based on a straight-line relationship between DD and Scaling # Minimum leveragefactor of 50% that then scales up to a max of 100% when DD increases to 15% #for key in self.algo.alphasDD: # if key == self.Name: # self.algo.leverageFactors[key] = 1 # #self.algo.leverageFactors[key] = max(minallocation,min(1, sp + ((1-sp)/peakDD)*float(self.algo.alphasDD[key]))) # #self.algo.leverageFactors[key] = max(0.5,min(1, (1) - (sp/peakDD)*float(self.algo.alphasDD[key]))) for key in self.weightsDict: self.weightsDict[key] = self.weightsDict[key]* float(self.leverageFactor) # Adds the non-leveraged portion to the "cash" portion of the algo for symbol in self.weightsDict.keys(): if symbol.Value == 'IEF': self.weightsDict[symbol] = self.weightsDict[symbol] + max(0,1-float(self.leverageFactor)) class SymbolData: def __init__(self, symbol, parametersDict): self.Symbol = symbol self.dailyReturnsSeries = None smaPeriod = parametersDict[symbol.Value]['sma'][0] self.SMA = SimpleMovingAverage(smaPeriod) macdFastPeriod = parametersDict[self.Symbol.Value]['macd'][0][0] macdSlowPeriod = parametersDict[self.Symbol.Value]['macd'][0][1] macdSignalPeriod = parametersDict[self.Symbol.Value]['macd'][0][2] self.MACD = MovingAverageConvergenceDivergence(macdFastPeriod, macdSlowPeriod, macdSignalPeriod, MovingAverageType.Exponential) def CalculateDailyReturnSeries(self, history, lookbackOptimization): """ Calculate the daily returns series for each security """ tempReturnsSeries = history.loc[str(self.Symbol)]['close'].pct_change(periods=2).dropna() # 2-day returns self.dailyReturnsSeries = tempReturnsSeries[-lookbackOptimization:] def UpdateIndicators(self, history): """ Update the indicators with historical data """ for index, row in history.loc[str(self.Symbol)].iterrows(): self.SMA.Update(index, row['close']) self.MACD.Update(index, row['close'])