Overall Statistics |
Total Trades 492 Average Win 3.31% Average Loss -1.39% Compounding Annual Return 55.124% Drawdown 20.200% Expectancy 1.497 Net Profit 9345.241% Sharpe Ratio 2.212 Probabilistic Sharpe Ratio 99.829% Loss Rate 26% Win Rate 74% Profit-Loss Ratio 2.38 Alpha 0.481 Beta -0.126 Annual Standard Deviation 0.211 Annual Variance 0.045 Information Ratio 1.303 Tracking Error 0.274 Treynor Ratio -3.697 Total Fees $2266.24 |
from clr import AddReference AddReference("System") AddReference("QuantConnect.Common") AddReference("QuantConnect.Algorithm") AddReference("QuantConnect.Algorithm.Framework") from System import * from QuantConnect import * from QuantConnect.Algorithm import * from QuantConnect.Algorithm.Framework import * from QuantConnect.Algorithm.Framework.Alphas import AlphaModel, Insight, InsightType, InsightDirection class LongOnlyConstantAlphaCreationModel(AlphaModel): def __init__(self, rebalanceAtLaunch = False, rebalancingHour = 10): self.rebalanceAtLaunch = rebalanceAtLaunch self.rebalancingHour = rebalancingHour self.insightDirection = InsightDirection.Up # insight direction self.securities = [] # list to store securities to consider self.portfolioValueHigh = 0 # initialize portfolioValueHigh for drawdown calculation self.portfolioValueHighInitialized = False # initialize portfolioValueHighInitialized for drawdown calculation self.initBenchmarkPrice = 0 def Update(self, algorithm, data): insights = [] # list to store the new insights to be created # wait until 9:00 to avoid when alpha runs before universe if algorithm.Time < datetime(algorithm.Time.year, algorithm.Time.month, algorithm.Time.day, 9, 00, 00, tzinfo = algorithm.Time.tzinfo): return insights # make sure we only send insights once a day at a specific time if algorithm.Time.hour == self.rebalancingHour or self.rebalanceAtLaunch: self.rebalanceAtLaunch = False ### plotting ---------------------------------------------------------------------------------------- # simulate buy and hold the benchmark and plot its daily value self.UpdateBenchmarkValue(algorithm) algorithm.Plot('Strategy Equity', 'SPY', self.benchmarkValue) currentTotalPortfolioValue = algorithm.Portfolio.TotalPortfolioValue # get current portfolio value # plot the daily total portfolio exposure % totalPortfolioExposure = (algorithm.Portfolio.TotalHoldingsValue / currentTotalPortfolioValue) * 100 algorithm.Plot('Chart Total Portfolio Exposure %', 'Daily Portfolio Exposure %', totalPortfolioExposure) # plot the drawdown % from the most recent high if not self.portfolioValueHighInitialized: self.portfolioHigh = currentTotalPortfolioValue # set initial portfolio value self.portfolioValueHighInitialized = True # update trailing high value of the portfolio if self.portfolioValueHigh < currentTotalPortfolioValue: self.portfolioValueHigh = currentTotalPortfolioValue currentDrawdownPercent = ((float(currentTotalPortfolioValue) / float(self.portfolioValueHigh)) - 1.0) * 100 algorithm.Plot('Chart Drawdown %', 'Drawdown %', currentDrawdownPercent) ### generate insights ------------------------------------------------------------------------------ # calculate insight expiry time insightExpiry = Expiry.EndOfDay(algorithm.Time) # loop through securities and generate insights for security in self.securities: # append the insights list with the prediction for each symbol insights.append(Insight.Price(security.Symbol, insightExpiry, self.insightDirection)) return insights def UpdateBenchmarkValue(self, algorithm): ''' Simulate buy and hold the Benchmark ''' if self.initBenchmarkPrice == 0: self.initBenchmarkCash = algorithm.Portfolio.Cash self.initBenchmarkPrice = algorithm.Benchmark.Evaluate(algorithm.Time) self.benchmarkValue = self.initBenchmarkCash else: currentBenchmarkPrice = algorithm.Benchmark.Evaluate(algorithm.Time) self.benchmarkValue = (currentBenchmarkPrice / self.initBenchmarkPrice) * self.initBenchmarkCash def OnSecuritiesChanged(self, algorithm, changes): ''' Description: Event fired each time the we add/remove securities from the data feed Args: algorithm: The algorithm instance that experienced the change in securities changes: The security additions and removals from the algorithm ''' # add new securities for added in changes.AddedSecurities: algorithm.Log('adding symbol: ' + str(added.Symbol)) self.securities.append(added) # remove securities for removed in changes.RemovedSecurities: if removed in self.securities: self.securities.remove(removed)
from clr import AddReference AddReference("QuantConnect.Common") AddReference("QuantConnect.Algorithm.Framework") from QuantConnect import Resolution, Extensions from QuantConnect.Algorithm.Framework.Alphas import * from QuantConnect.Algorithm.Framework.Portfolio import * from itertools import groupby from datetime import datetime, timedelta import numpy as np import pandas as pd from optimizer import CustomPortfolioOptimizer class CustomPortfolioOptimizationConstructionModel(PortfolioConstructionModel): def __init__(self, emails, emailSubject, maxTotalAllocationPercent = 1, rebalanceAtLaunch = False, rebalancingPeriod = Expiry.EndOfMonth, lookbackOptimization = 252, objectiveFunction = 'sharpe', activateWeightFiltering = False, cashDict = None, dictParameters = None, lookbackNegativeYield = 180, startCrisisYieldValue = 0): self.emails = emails self.emailSubject = emailSubject self.rebalanceAtLaunch = rebalanceAtLaunch self.maxTotalAllocationPercent = maxTotalAllocationPercent self.rebalancingPeriod = rebalancingPeriod self.lookbackOptimization = lookbackOptimization self.activateWeightFiltering = activateWeightFiltering self.cashDict = cashDict self.dictParameters = dictParameters self.lookbackNegativeYield = lookbackNegativeYield self.startCrisisYieldValue = startCrisisYieldValue self.yieldSignalCrisis = False # initialize the optimizer self.optimizer = CustomPortfolioOptimizer(minWeight = 0, maxWeight = 1, objFunction = objectiveFunction) # get all the parameters for the indicators valuesList = [] for ticker in dictParameters.keys(): if dictParameters[ticker]['addTicker'][0]: valuesList.append( dictParameters[ticker]['sma'][0] ) valuesList.extend( dictParameters[ticker]['macd'][0] ) # keep the highest parameter provided to call history self.lookbackHistory = max(lookbackOptimization, max(valuesList)) self.insightCollection = InsightCollection() self.rebalancingTime = None def CreateTargets(self, algorithm, insights): ''' Description: Create portfolio targets from the specified insights Args: algorithm: The algorithm instance insights: The insights to create portfolio targets from Returns: An enumerable of portfolio targets to be sent to the execution model ''' # initialize the first rebalancing time if self.rebalanceAtLaunch: # allow to rebalance immediately at launch self.rebalancingTime = algorithm.Time self.rebalanceAtLaunch = False elif self.rebalancingTime is None: # get next rebalancing time self.rebalancingTime = self.rebalancingPeriod(algorithm.Time) # empty list to store portfolio targets targets = [] # check if there is new insights coming if len(insights) == 0: return targets # check if it is time to rebalance if not algorithm.Time >= self.rebalancingTime: return targets # here we get the new insights and add them to our insight collection for insight in insights: self.insightCollection.Add(insight) # get insight that haven't expired of each symbol that is still in the universe activeInsights = self.insightCollection.GetActiveInsights(algorithm.UtcTime) # get the last generated active insight for each symbol lastActiveInsights = [] for symbol, g in groupby(activeInsights, lambda x: x.Symbol): lastActiveInsights.append(sorted(g, key = lambda x: x.GeneratedTimeUtc)[-1]) # determine target percent for the given insights (check function DetermineTargetPercent for details) percents = self.DetermineTargetPercent(algorithm, lastActiveInsights) if not percents: return targets # refactor weights to make sure we only use maxTotalAllocationPercent for insight, weight in percents.items(): percents[insight] = percents[insight] * self.maxTotalAllocationPercent infoLog = {insight.Symbol.Value: weight for insight, weight in percents.items()} algorithm.Log('refactored optimal weights for the period: ' + str(infoLog)) # send email notification with final weights for the period for email in self.emails: algorithm.Notify.Email(email, self.emailSubject + ' Portfolio Optimization Strategy - Portfolio Weights Next Period', str(infoLog)) # loop through insights and send portfolio targets for insight in percents: target = PortfolioTarget.Percent(algorithm, insight.Symbol, percents[insight]) targets.append(target) algorithm.Plot('Chart Optimal Weights %', insight.Symbol.Value, float(percents[insight])) # update rebalancing time self.rebalancingTime = self.rebalancingPeriod(algorithm.Time) return targets def DetermineTargetPercent(self, algorithm, activeInsights): ''' Description: Determine the target percent for each insight Args: algorithm: The algorithm instance activeInsights: The active insights to generate a target for ''' # empty dictionary to store portfolio targets by symbol result = {} # create a mapping dictionary with {calculation insight: tradable insight} mapInsightsDict = {} for insight in activeInsights: if insight.Symbol.Value in self.dictParameters.keys(): tradableTicker = self.dictParameters[insight.Symbol.Value]['addTicker'][1] tradableInsight = [insight for insight in activeInsights if insight.Symbol.Value == tradableTicker][0] mapInsightsDict[insight] = tradableInsight # calculation symbols with active insights calculationSymbols = [x.Symbol for x in activeInsights if x.Symbol.Value in self.dictParameters.keys()] # get historical data for calculationSymbols for the last n trading days history = algorithm.History(calculationSymbols, self.lookbackHistory, Resolution.Daily) # empty dictionary for calculations calculations = {} # iterate over all symbols and perform calculations for symbol in calculationSymbols: # check if we have enough historical data, otherwise just skip this security if not self.CheckData(algorithm, symbol, history): algorithm.Log(str(symbol.Value) + ': skipping security due to no/not enough historical data') continue else: # add symbol to calculations calculations[symbol] = SymbolData(symbol, dictParameters = self.dictParameters) try: # get series of log-returns calculations[symbol].CalculateLogReturnSeries(history, self.lookbackOptimization) # update technical indicators calculations[symbol].UpdateIndicators(history) except: algorithm.Log('returning empty weights for now due to calculations failing for: ' + str(symbol.Value) + '; we will try again at the next iteration') return result # calculate optimal weights optWeights = self.CalculateOptimalWeights(algorithm, calculations) algorithm.Log('optimal weights for the period: ' + str(optWeights)) if optWeights is None: algorithm.Log('returning empty weights due to optWeights being None; we will try again at the next iteration') return result # if activateWeightFiltering is True, modify optimal weights using specific criteria if self.activateWeightFiltering: finalWeights = self.FilterOptimalWeights(algorithm, calculations, optWeights) algorithm.Log('filtered optimal weights for the period: ' + str(finalWeights)) else: finalWeights = optWeights # loop through active securities and generate portfolio weights for insight in activeInsights: if insight.Symbol.Value in finalWeights.keys(): if insight in mapInsightsDict.keys(): # get the tradableInsight tradableInsight = mapInsightsDict[insight] # check if the price of tradableInsight is zero if algorithm.ActiveSecurities[tradableInsight.Symbol].Price == 0: # we trade the original ticker tradableInsight = insight else: # make sure we close positions in original tickers result[insight] = 0 # create the portfolio target for the tradable security result[tradableInsight] = insight.Direction * finalWeights[insight.Symbol.Value] # these are the cash tickers else: # check if the price is zero if algorithm.ActiveSecurities[insight.Symbol].Price > 0: # create the portfolio target for the cash security result[insight] = insight.Direction * finalWeights[insight.Symbol.Value] # check how much we have allocated so far to risky assets (and their cash tickers) totalAllocation = sum(result.values()) if totalAllocation >= 1: totalAllocation = 1 algorithm.Log('total allocation after weight filtering: ' + str(totalAllocation)) # allocate remaining cash to cashDict cashAllocation = 1 - totalAllocation for insight in activeInsights: if insight.Symbol.Value in self.cashDict.keys() and algorithm.ActiveSecurities[insight.Symbol].Price > 0: finalCashAllocation = self.cashDict[insight.Symbol.Value] * cashAllocation if insight in result.keys(): result[insight] = result[insight] + finalCashAllocation else: result[insight] = finalCashAllocation algorithm.Log(str(insight.Symbol.Value) + '; adding remaining cash allocation: ' + str(result[insight])) # avoid very small numbers and make them 0 for insight, weight in result.items(): if weight <= 1e-10: result[insight] = 0 return result def CalculateOptimalWeights(self, algorithm, calculations): ''' Description: Calculate the individual weights for each symbol that optimize some given objective function Args: algorithm: The algorithm instance calculations: Dictionary containing calculations for each symbol ''' # create a dictionary keyed by the symbols in calculations with a pandas.Series as value to create a dataframe of log-returns logReturnsDict = { symbol.Value: symbolData.logReturnSeries for symbol, symbolData in calculations.items() } logReturnsDf = pd.DataFrame(logReturnsDict) listTickers = list(logReturnsDf.columns) try: # portfolio optimizer finds the optimal weights for the given data listOptWeights = self.optimizer.Optimize(historicalLogReturns = logReturnsDf) except: algorithm.Log('optimization failed') return None # create dictionary with the optimal weights by symbol weights = {listTickers[i]: listOptWeights[i] for i in range(len(listTickers))} # avoid very small numbers and make them 0 for ticker, weight in weights.items(): if weight <= 1e-10: weights[ticker] = 0 return weights def FilterOptimalWeights(self, algorithm, calculations, optWeights): ''' Description: Filter and modify the optimal weights using a combination of technical indicators Args: algorithm: The algorithm instance calculations: Dictionary containing calculations for each symbol optWeights: Dictionary with the optimal weights by symbol ''' # check the yield condition ----------------------------------------------------------------- # get the last six months of historical USTREASURY/YIELD values histYield = algorithm.History(['USTREASURY/YIELD'], self.lookbackNegativeYield + 1, Resolution.Daily).loc['USTREASURY/YIELD'] histYield = histYield.rename(columns = {col: col.replace(' ', '') for col in histYield.columns}) tenYr = histYield['10yr'] # get the 10-year yield threeMo = histYield['3mo'] # get the 3-month yield tenYrMinusThreeMo = tenYr - threeMo # calculate the difference between the two #algorithm.Plot('Chart Yield %', 'Yield %', float(tenYrMinusThreeMo[-1])) #algorithm.Plot('Chart Yield %', 'Zero Line', float(0)) # get the first date when the yield turned negative indexNegative = tenYrMinusThreeMo[tenYrMinusThreeMo < 0].head(1).index # check if there was actually some negative yield values if len(indexNegative) > 0: cutOff = indexNegative[0] # filter the series for days after that day with negative value afterNegative = tenYrMinusThreeMo[tenYrMinusThreeMo.index > cutOff] # check if at some point it reached our startCrisisYieldValue if len(afterNegative) > 0 and max(afterNegative) > self.startCrisisYieldValue: self.yieldSignalCrisis = True else: self.yieldSignalCrisis = False else: self.yieldSignalCrisis = False # ------------------------------------------------------------------------------------------- # empty dicitonary to store weights weights = {} # loop through calculations and check conditions for weight filtering ------------------------ for symbol, symbolData in calculations.items(): if symbolData.SMA.IsReady and symbolData.MACD.IsReady: currentPrice = algorithm.ActiveSecurities[symbol].Price # check if sma condition is met and act accordingly ---------------------------------- smaLowerBoundCondition = self.dictParameters[symbol.Value]['sma'][1][0] smaUpperBoundCondition = self.dictParameters[symbol.Value]['sma'][1][1] smaConditionWeight = self.dictParameters[symbol.Value]['sma'][2] algorithm.Log(str(symbol.Value) + '; current price: ' + str(round(currentPrice, 2)) + '; SMA: ' + str(round(symbolData.SMA.Current.Value, 2)) + '; Price vs SMA: ' + str(round((currentPrice / symbolData.SMA.Current.Value) - 1, 2))) #algorithm.Plot('Chart SMA', symbol.Value + ' - sma', symbolData.SMA.Current.Value) #algorithm.Plot('Chart SMA', symbol.Value + ' - price', currentPrice) if (currentPrice <= symbolData.SMA.Current.Value * (1 + smaLowerBoundCondition) or currentPrice >= symbolData.SMA.Current.Value * (1 + smaUpperBoundCondition)): weights[symbol.Value] = min(optWeights[symbol.Value], smaConditionWeight) algorithm.Log(str(symbol.Value) + '; modifying weight due to sma filtering from ' + str(optWeights[symbol.Value]) + ' to ' + str(weights[symbol.Value])) else: weights[symbol.Value] = optWeights[symbol.Value] smaModifiedWeight = weights[symbol.Value] # check if macd condition is met and act accordingly ---------------------------------- macdCondition = self.dictParameters[symbol.Value]['macd'][1] macdConditionWeight = self.dictParameters[symbol.Value]['macd'][2] # calculate our macd vs signal score between -1 and 1 macdMinusSignal = symbolData.MACD.Current.Value - symbolData.MACD.Signal.Current.Value macdVsSignalScore = macdMinusSignal / (1 + abs(macdMinusSignal)) algorithm.Log(str(symbol.Value) + '; MACD: ' + str(round(symbolData.MACD.Current.Value, 2)) + '; MACD Signal: ' + str(round(symbolData.MACD.Signal.Current.Value, 2)) + '; MACD vs Signal Score: ' + str(round(macdVsSignalScore, 2))) #algorithm.Plot('Chart MACD', symbol.Value + ' - macd', symbolData.MACD.Current.Value) #algorithm.Plot('Chart MACD', symbol.Value + ' - signal', symbolData.MACD.Signal.Current.Value) if macdVsSignalScore <= macdCondition: weights[symbol.Value] = min(smaModifiedWeight, macdConditionWeight) algorithm.Log(str(symbol.Value) + '; modifying weight due to macd filtering from ' + str(smaModifiedWeight) + ' to ' + str(weights[symbol.Value])) else: weights[symbol.Value] = smaModifiedWeight macdModifiedWeight = weights[symbol.Value] # check if yield condition is met and act accordingly ---------------------------------- activateYield = self.dictParameters[symbol.Value]['yield'][0] yieldConditionWeight = self.dictParameters[symbol.Value]['yield'][1] if self.yieldSignalCrisis and activateYield: weights[symbol.Value] = min(macdModifiedWeight, yieldConditionWeight) algorithm.Log(str(symbol.Value) + '; modifying weight due to yield curve filtering from ' + str(macdModifiedWeight) + ' to ' + str(weights[symbol.Value])) else: weights[symbol.Value] = macdModifiedWeight # allocate to cashTicker the difference between optimal weight and final weight -------- cashAllocation = optWeights[symbol.Value] - weights[symbol.Value] cashTicker = self.dictParameters[symbol.Value]['cashTicker'][0] multiple = self.dictParameters[symbol.Value]['cashTicker'][1] finalCashAllocation = cashAllocation * multiple if cashTicker in weights: weights[cashTicker] = weights[cashTicker] + finalCashAllocation else: weights[cashTicker] = finalCashAllocation algorithm.Log(str(symbol.Value) + '; adding remaining cash allocation to ' + str(cashTicker) + ': ' + str(weights[cashTicker])) else: weights[symbol.Value] = 0 cashTicker = self.dictParameters[symbol.Value]['cashTicker'][0] weights[cashTicker] = 0 algorithm.Log(str(symbol.Value) + '; indicators are not ready so assign zero weight') return weights def CheckData(self, algorithm, symbol, history): ''' Check if the history dataframe is valid ''' if (str(symbol) not in history.index or history.loc[str(symbol)].get('close') is None or history.loc[str(symbol)].get('close').isna().any() or symbol not in algorithm.ActiveSecurities.Keys or len(history.loc[str(symbol)]) < self.lookbackHistory): return False else: return True class SymbolData: ''' Contain data specific to a symbol required by this model ''' def __init__(self, symbol, dictParameters): self.Symbol = symbol self.logReturnSeries = None smaPeriod = dictParameters[symbol.Value]['sma'][0] self.SMA = SimpleMovingAverage(smaPeriod) macdFastPeriod = dictParameters[self.Symbol.Value]['macd'][0][0] macdSlowPeriod = dictParameters[self.Symbol.Value]['macd'][0][1] macdSignalPeriod = dictParameters[self.Symbol.Value]['macd'][0][2] self.MACD = MovingAverageConvergenceDivergence(macdFastPeriod, macdSlowPeriod, macdSignalPeriod, MovingAverageType.Exponential) def CalculateLogReturnSeries(self, history, lookbackOptimization): ''' Calculate the log-returns series for each security ''' tempLogReturnSeries = np.log(1 + history.loc[str(self.Symbol)]['close'].pct_change(periods = 1).dropna()) # 1-day log-returns self.logReturnSeries = tempLogReturnSeries[-lookbackOptimization:] def UpdateIndicators(self, history): ''' Update the indicators with historical data ''' for index, row in history.loc[str(self.Symbol)].iterrows(): self.SMA.Update(index, row['close']) self.MACD.Update(index, row['close'])
import numpy as np from scipy.optimize import minimize class CustomPortfolioOptimizer: ''' Description: Implementation of a custom optimizer that calculates the weights for each asset to optimize a given objective function Details: Optimization can be: - Maximize Portfolio Sharpe Ratio - Maximize Portfolio Sortino Ratio - Maximize Portfolio Return - Minimize Portfolio Standard Deviation - Risk Parity Portfolio Constraints: - Weights must be between some given boundaries - Weights must sum to 1 ''' def __init__(self, minWeight = -1, maxWeight = 1, objFunction = 'std'): ''' Description: Initialize the CustomPortfolioOptimizer Args: minWeight(float): The lower bound on portfolio weights maxWeight(float): The upper bound on portfolio weights objFunction: The objective function to optimize (sharpe, sortino, return, std, riskParity) ''' self.minWeight = minWeight self.maxWeight = maxWeight self.objFunction = objFunction def Optimize(self, historicalLogReturns): ''' Description: Perform portfolio optimization using a provided matrix of historical returns and covariance (optional) Args: historicalLogReturns: Matrix of historical log-returns where each column represents a security and each row log-returns for the given date/time (size: K x N) Returns: Array of double with the portfolio weights (size: K x 1) ''' # get sample covariance matrix covariance = historicalLogReturns.cov() # get the sample covariance matrix of only negative returns for sortino ratio historicalNegativeLogReturns = historicalLogReturns[historicalLogReturns < 0] covarianceNegativeReturns = historicalNegativeLogReturns.cov() size = historicalLogReturns.columns.size # K x 1 x0 = np.array(size * [1. / size]) # apply equality constraints constraints = ({'type': 'eq', 'fun': lambda weights: self.GetBudgetConstraint(weights)}) opt = minimize(lambda weights: self.ObjectiveFunction(weights, historicalLogReturns, covariance, covarianceNegativeReturns), # Objective function x0, # Initial guess bounds = self.GetBoundaryConditions(size), # Bounds for variables constraints = constraints, # Constraints definition method = 'SLSQP') # Optimization method: Sequential Least Squares Programming return opt['x'] def ObjectiveFunction(self, weights, historicalLogReturns, covariance, covarianceNegativeReturns): ''' Description: Compute the objective function Args: weights: Portfolio weights historicalLogReturns: Matrix of historical log-returns covariance: Covariance matrix of historical log-returns ''' # calculate the annual return of portfolio annualizedPortfolioReturns = np.sum(historicalLogReturns.mean() * 252 * weights) # calculate the annual standard deviation of portfolio annualizedPortfolioStd = np.sqrt( np.dot(weights.T, np.dot(covariance * 252, weights)) ) annualizedPortfolioNegativeStd = np.sqrt( np.dot(weights.T, np.dot(covarianceNegativeReturns * 252, weights)) ) if annualizedPortfolioStd == 0 or annualizedPortfolioNegativeStd == 0: raise ValueError(f'CustomPortfolioOptimizer.ObjectiveFunction: annualizedPortfolioStd/annualizedPortfolioNegativeStd cannot be zero. Weights: {weights}') # calculate annual sharpe ratio of portfolio annualizedPortfolioSharpeRatio = (annualizedPortfolioReturns / annualizedPortfolioStd) # calculate annual sortino ratio of portfolio annualizedPortfolioSortinoRatio = (annualizedPortfolioReturns / annualizedPortfolioNegativeStd) # Spuni's formulation for risk parity portfolio size = historicalLogReturns.columns.size assetsRiskBudget = np.array(size * [1. / size]) portfolioVolatility = np.sqrt( np.dot(weights.T, np.dot(covariance, weights)) ) x = weights / portfolioVolatility riskParity = (np.dot(x.T, np.dot(covariance, x)) / 2) - np.dot(assetsRiskBudget.T, np.log(x)) if self.objFunction == 'sharpe': return -annualizedPortfolioSharpeRatio # convert to negative to be minimized elif self.objFunction == 'sortino': return -annualizedPortfolioSortinoRatio # convert to negative to be minimized elif self.objFunction == 'return': return -annualizedPortfolioReturns # convert to negative to be minimized elif self.objFunction == 'std': return annualizedPortfolioStd elif self.objFunction == 'riskParity': return riskParity else: raise ValueError(f'CustomPortfolioOptimizer.ObjectiveFunction: objFunction input has to be one of sharpe, sortino, return, std or riskParity') def GetBoundaryConditions(self, size): ''' Create the boundary condition for the portfolio weights ''' return tuple((self.minWeight, self.maxWeight) for x in range(size)) def GetBudgetConstraint(self, weights): ''' Define a budget constraint: the sum of the weights equal to 1 ''' return np.sum(weights) - 1
from clr import AddReference AddReference("System") AddReference("QuantConnect.Common") AddReference("QuantConnect.Algorithm") AddReference("QuantConnect.Algorithm.Framework") from QuantConnect import * from QuantConnect.Algorithm import * from QuantConnect.Algorithm.Framework import * from QuantConnect.Algorithm.Framework.Portfolio import PortfolioTarget from QuantConnect.Algorithm.Framework.Risk import RiskManagementModel class ATRTrailingStopRiskManagementModel(RiskManagementModel): def __init__(self, emails, emailSubject, dictParameters = None, rebalancingPeriod = Expiry.EndOfMonth): self.emails = emails self.emailSubject = emailSubject self.dictParameters = dictParameters self.rebalancingPeriod = rebalancingPeriod # add the relevant keys to the dictionaries for atrPeriod and atrMultiple self.recentAtrPeriodByTicker = {} self.pastAtrPeriodByTicker = {} self.percentRecentAbovePastAtrByTicker = {} self.atrMultipleByTicker = {} self.emergencyAtrMultipleByTicker = {} for ticker in dictParameters.keys(): if dictParameters[ticker]['addTicker'][0] and dictParameters[ticker]['atrTrailStop'][0]: self.recentAtrPeriodByTicker[ticker] = dictParameters[ticker]['atrTrailStop'][1][0] self.pastAtrPeriodByTicker[ticker] = dictParameters[ticker]['atrTrailStop'][1][1] self.percentRecentAbovePastAtrByTicker[ticker] = dictParameters[ticker]['atrTrailStop'][1][2] self.atrMultipleByTicker[ticker] = dictParameters[ticker]['atrTrailStop'][2] self.emergencyAtrMultipleByTicker[ticker] = dictParameters[ticker]['atrTrailStop'][3] tradableTicker = dictParameters[ticker]['addTicker'][1] self.recentAtrPeriodByTicker[tradableTicker] = dictParameters[ticker]['atrTrailStop'][1][0] self.pastAtrPeriodByTicker[tradableTicker] = dictParameters[ticker]['atrTrailStop'][1][1] self.percentRecentAbovePastAtrByTicker[tradableTicker] = dictParameters[ticker]['atrTrailStop'][1][2] self.atrMultipleByTicker[tradableTicker] = dictParameters[ticker]['atrTrailStop'][2] self.emergencyAtrMultipleByTicker[tradableTicker] = dictParameters[ticker]['atrTrailStop'][3] self.rebalancingTime = None def ManageRisk(self, algorithm, targets): ''' Description: Manages the algorithm's risk at each time step Args: algorithm: The algorithm instance targets: The current portfolio targets to be assessed for risk ''' # initialize the first rebalancing time if self.rebalancingTime is None or algorithm.Time >= self.rebalancingTime: self.calculations = {} # dictionary to store calculations for each security self.trailingStopTargetBySymbol = {} # dictionary to store stop-loss target by symbol # get next rebalancing time self.rebalancingTime = self.rebalancingPeriod(algorithm.Time) # empty list to store portfolio targets for liquidation riskAdjustedTargets = list() # make sure we only run risk management once a day at a specific time if algorithm.Time.hour == 15: for security in algorithm.ActiveSecurities.Values: # if not invested in the security or trailing stop not activated, make sure the dictionaries are empty if (not security.Invested or security.Symbol.Value not in self.recentAtrPeriodByTicker.keys() or security.Symbol.Value not in self.pastAtrPeriodByTicker.keys() or security.Symbol.Value not in self.percentRecentAbovePastAtrByTicker.keys() or security.Symbol.Value not in self.atrMultipleByTicker.keys() or security.Symbol.Value not in self.emergencyAtrMultipleByTicker.keys()): self.calculations.pop(security.Symbol, None) self.trailingStopTargetBySymbol.pop(security.Symbol, None) continue # if the security is not already part of calculations we add it to calculations if security.Symbol not in self.calculations: history = algorithm.History(security.Symbol, self.pastAtrPeriodByTicker[security.Symbol.Value], Resolution.Daily) if not self.CheckHistory(security, history): algorithm.Log('no history found for: ' + str(security.Symbol.Value)) continue else: try: # add symbol to calculations # we need to capture the entry price for the stopPercent calculation self.calculations[security.Symbol] = SymbolData(security.Symbol, algorithm.ActiveSecurities[security.Symbol].Holdings.AveragePrice, self.recentAtrPeriodByTicker[security.Symbol.Value], self.pastAtrPeriodByTicker[security.Symbol.Value], self.percentRecentAbovePastAtrByTicker[security.Symbol.Value], self.atrMultipleByTicker[security.Symbol.Value], self.emergencyAtrMultipleByTicker[security.Symbol.Value]) self.calculations[security.Symbol].WarmUpIndicators(history) except Exception as e: algorithm.Log('removing from calculations due to exception: ' + str(e)) self.calculations.pop(security.Symbol, None) self.trailingStopTargetBySymbol.pop(security.Symbol, None) continue if self.calculations[security.Symbol].stopPercent is None: self.calculations.pop(security.Symbol, None) self.trailingStopTargetBySymbol.pop(security.Symbol, None) continue # check if there is already a trailing stop level or need to create one if security.Symbol in self.trailingStopTargetBySymbol: # if current price is already below the trailing stop level, liquidate long position if security.Price < self.trailingStopTargetBySymbol[security.Symbol]: # add portfolio target of zero for the security riskAdjustedTargets.append(PortfolioTarget(security.Symbol, 0)) # remove security from dictionaries as the position is closed self.calculations.pop(security.Symbol, None) self.trailingStopTargetBySymbol.pop(security.Symbol, None) infoLog = str(security.Symbol.Value) + '; liquidate position due to trailing stop triggering' algorithm.Log(infoLog) algorithm.Plot('Chart Optimal Weights %', security.Symbol.Value, float(0)) # send email notification alerting that trailing stop triggered for email in self.emails: algorithm.Notify.Email(email, self.emailSubject + ' Portfolio Optimization Strategy - Trailing Stop Triggered', str(infoLog)) else: # update trailing stop level as max value between current level and (current price * (1 - stopPercent)) self.trailingStopTargetBySymbol[security.Symbol] = max(self.trailingStopTargetBySymbol[security.Symbol], security.Price * (1 - self.calculations[security.Symbol].stopPercent)) else: # get the initial stop level initialStop = self.calculations[security.Symbol].entryPrice * (1 - self.calculations[security.Symbol].stopPercent) self.trailingStopTargetBySymbol[security.Symbol] = initialStop infoLog = (str(security.Symbol.Value) + '; activate trailing stop' + '; recentAverageTrueRange: ' + str(round(self.calculations[security.Symbol].recentAverageTrueRange.Current.Value, 2)) + '; pastAverageTrueRange: ' + str(round(self.calculations[security.Symbol].pastAverageTrueRange.Current.Value, 2)) + '; entry average holding price: ' + str(round(self.calculations[security.Symbol].entryPrice, 2)) + '; initial stop-loss level: ' + str(round(self.trailingStopTargetBySymbol[security.Symbol], 2)) + '; current market price: ' + str(round(security.Price, 2))) algorithm.Log(infoLog) # send email notification with information about the initial trailing stop for the period for email in self.emails: algorithm.Notify.Email(email, self.emailSubject + ' Portfolio Optimization Strategy - Activate Trailing Stop', str(infoLog)) return riskAdjustedTargets def CheckHistory(self, security, history): ''' Check if the history dataframe is valid ''' if (str(security.Symbol) not in history.index or history.loc[str(security.Symbol)].get('open') is None or history.loc[str(security.Symbol)].get('open').isna().any() or history.loc[str(security.Symbol)].get('high') is None or history.loc[str(security.Symbol)].get('high').isna().any() or history.loc[str(security.Symbol)].get('low') is None or history.loc[str(security.Symbol)].get('low').isna().any() or history.loc[str(security.Symbol)].get('close') is None or history.loc[str(security.Symbol)].get('close').isna().any()): return False else: return True class SymbolData: ''' Make all the calculations needed for each symbol ''' def __init__(self, symbol, entryPrice, recentAtrPeriod, pastAtrPeriod, percentRecentAbovePastAtr, atrMultiple, emergencyAtrMultiple): self.symbol = symbol self.entryPrice = entryPrice self.recentAverageTrueRange = AverageTrueRange(recentAtrPeriod, MovingAverageType.Exponential) self.pastAverageTrueRange = AverageTrueRange(pastAtrPeriod, MovingAverageType.Exponential) self.percentRecentAbovePastAtr = percentRecentAbovePastAtr self.atrMultiple = atrMultiple self.emergencyAtrMultiple = emergencyAtrMultiple def WarmUpIndicators(self, history): # get the single index dataframe for the symbol symbolHistory = history.loc[str(self.symbol)] for index, row in symbolHistory.iterrows(): if 'open' in row and 'high' in row and 'low' in row and 'close' in row: bar = TradeBar(index, self.symbol, row['open'], row['high'], row['low'], row['close'], 0) self.recentAverageTrueRange.Update(bar) self.pastAverageTrueRange.Update(bar) else: raise Exception('missing some OHLC data for: ' + str(self.symbol.Value)) @property def multipleTrailingStop(self): if self.recentAverageTrueRange.IsReady and self.pastAverageTrueRange.IsReady: recentAtr = self.recentAverageTrueRange.Current.Value pastAtr = self.pastAverageTrueRange.Current.Value if recentAtr >= pastAtr * (1 + self.percentRecentAbovePastAtr): return self.emergencyAtrMultiple else: return self.atrMultiple else: return None @property def stopPercent(self): if self.entryPrice > 0 and self.recentAverageTrueRange.IsReady and self.multipleTrailingStop is not None: recentAtr = self.recentAverageTrueRange.Current.Value stop = recentAtr * self.multipleTrailingStop # stop value (dollar value) return stop / self.entryPrice # stop % else: return None
from LongOnlyConstantAlphaCreation import LongOnlyConstantAlphaCreationModel from CustomPortfolioOptimizationConstruction import CustomPortfolioOptimizationConstructionModel from ATRTrailingStopRiskManagement import ATRTrailingStopRiskManagementModel from QuantConnect.Python import PythonQuandl class PortfolioOptimizationFrameworkAlgorithm(QCAlgorithmFramework): ''' Trading Logic: Modules: - Universe: Manual Universe of tickers - Alpha: Long-only insights are created on a daily basis - Portfolio: Optimal weights are calculated using Portfolio Sharpe Ratio, Sortino, Return, Variance or Risk Parity, and then modified using a combination of technical indicators and macroeconomic factors. Portfolio will be rebalanced on selected date rule.https://www.quantconnect.com/terminal/#live-view-tab - Execution: Immediate Execution with Market Orders - Risk: ATR-based Trailing Stop with a tighter stop that activates when Recent ATR is greater than (Past ATR + buffer) ''' def Initialize(self): ### user-defined inputs --------------------------------------------------------------------------------------------------- self.SetStartDate(2010, 1, 1) # set start date # self.SetEndDate(2018, 12, 31) # set end date self.SetCash(10000) # set strategy cash # EMAIL NOTIFICATIONS -------------------------------------------------- emails = ['sudhir.holla@gmail.com', 'efb@innoquantivity.com'] emailSubject = 'Collective2' # UNIVERSE ------------------------------------------------------------- # select tickers to create the Universe and add indicators and parameters for weight filtering dictParameters = { 'SPY': {'addTicker': [True, 'TQQQ'], # [boolean to add/not add the ticker, ticker to actually trade] 'cashTicker': ['TVIX', 0.10], # [ticker to allocate remaining cash from this ticker, % (0 to 1) of that cash to use for the ticker] 'sma': [200, (-0.10, 0.10), 0], # [period, (lower % threshold, upper % threshold; price vs sma), weight if condition met] 'macd': [(231, 567, 168), 0, 0], # [(fast, slow, signal), score macd vs signal (-1 to 1), weight if condition met] 'yield': [True, 0], # [boolean to activate the yield curve filtering, weight if condition met] 'atrTrailStop': [True, (10, 63, 1), 4, 0.1]}, # [activate, (recentAtrPeriod, pastAtrPeriod, % above), atrMultiple, emergencyAtrMultiple] 'TLT': {'addTicker': [True, 'TMF'], # [boolean to add/not add the ticker, ticker to actually trade] 'cashTicker': ['GSY', 1], # [ticker to allocate remaining cash from this ticker, % (0 to 1) of that cash to use for the ticker] 'sma': [600, (-0.2, 0.2), 0], # [period, (lower % threshold, upper % threshold; price vs sma), weight if condition met] 'macd': [(63, 168, 42), 0, 0], # [(fast, slow, signal), score macd vs signal (-1 to 1), weight if condition met] 'yield': [False, 0], # [boolean to activate the yield curve filtering, weight if condition met] 'atrTrailStop': [True, (10, 63, 0.5), 4, 0.1]}, # [activate, (recentAtrPeriod, pastAtrPeriod, % above), atrMultiple, emergencyAtrMultiple] 'SLV': {'addTicker': [True, 'USLV'], # [boolean to add/not add the ticker, ticker to actually trade] 'cashTicker': ['UDN', 0], # [ticker to allocate remaining cash from this ticker, % (0 to 1) of that cash to use for the ticker] 'sma': [200, (-0.15, 0.15), 0], # [period, (lower % threshold, upper % threshold; price vs sma), weight if condition met] 'macd': [(63, 168, 42), 0, 0.2], # [(fast, slow, signal), score macd vs signal (-1 to 1), weight if condition met] 'yield': [False, 0], # [boolean to activate the yield curve filtering, weight if condition met] 'atrTrailStop': [True, (10, 63, 0.35), 4, 0.1]}, # [activate, (recentAtrPeriod, pastAtrPeriod, % above), atrMultiple, emergencyAtrMultiple] 'GLD': {'addTicker': [True, 'UGLD'], # [boolean to add/not add the ticker, ticker to actually trade] 'cashTicker': ['DEUR', 0], # [ticker to allocate remaining cash from this ticker, % (0 to 1) of that cash to use for the ticker] 'sma': [200, (-0.15, 0.15), 0], # [period, (lower % threshold, upper % threshold; price vs sma), weight if condition met] 'macd': [(63, 168, 42), 0, 0.2], # [(fast, slow, signal), score macd vs signal (-1 to 1), weight if condition met] 'yield': [False, 0], # [boolean to activate the yield curve filtering, weight if condition met] 'atrTrailStop': [True, (10, 63, 0.35), 4, 0.1]}, # [activate, (recentAtrPeriod, pastAtrPeriod, % above), atrMultiple, emergencyAtrMultiple] } # PORTFOLIO ------------------------------------------------------------ # select the percentage of the total cash to be invested # (when higher than 1, it will use leverage) maxTotalAllocationPercent = 1 # (e.g. 3 for 300%, 2 for 200%, 1 for 100%, 0.5 for 50%) # portfolio rebalancing ------------------ # variable to control if we want to rebalance at launch (True), or we want to wait until next rebalancing date (False) rebalanceAtLaunch = False # select the logic for rebalancing period # options are: # - Date rules (for the first trading day of period): Expiry.EndOfDay, Expiry.EndOfWeek, Expiry.EndOfMonth, Expiry.EndOfQuarter, Expiry.EndOfYear rebalancingPeriod = Expiry.EndOfMonth # select the hour of the day to do the rebalancing rebalancingHour = 11 # integer between 10 and 16 (note 16 would be a MarketOnOpen order for next day) # weights optimization ------------------ # select number of lookback days for optimization lookbackOptimization = 63 # select the objective function to optimize the portfolio weights # options are: sharpe, sortino, return, std, riskParity objectiveFunction = 'std' # weights filtering -------------------- activateWeightFiltering = True # activate/deactivate the weights filtering # tickers to allocate remaining cash after filtering (enter {} to just stay in Cash) cashDict = {'IEF': 0, 'SPY' : 0, 'AGG' : 0, 'TLT': 0, 'UBT': 1} # {ticker: % allocation (as a decimal) from remaining cash} # yield curve crisis signal: # crisis signal will happen when yield curve turnes negative in the last lookbackNegativeYield days, # and then in the last lookbackPositiveYield days it receached startCrisisYieldValue lookbackNegativeYield = 180 # number of days to lookback for negative values startCrisisYieldValue = 0.3 # the yield value above which we apply the yield weight condition (e.g. 0.1 0.1% yield) ### ------------------------------------------------------------------------------------------------------------------------- # add benchmark self.SetBenchmark('SPY') if sum(cashDict.values()) > 1: raise ValueError('the sum of cashDict values must be less than or equal to 1') # let's plot the series of treasury yield #yieldPlot = Chart('Chart Yield %') #yieldPlot.AddSeries(Series('Yield %', SeriesType.Line, '%')) #yieldPlot.AddSeries(Series('Zero Line', SeriesType.Line, '%')) #self.AddChart(yieldPlot) # let's plot the series of daily total portfolio exposure % portfolioExposurePlot = Chart('Chart Total Portfolio Exposure %') portfolioExposurePlot.AddSeries(Series('Daily Portfolio Exposure %', SeriesType.Line, '')) self.AddChart(portfolioExposurePlot) # let's plot the series of drawdown % from the most recent high drawdownPlot = Chart('Chart Drawdown %') drawdownPlot.AddSeries(Series('Drawdown %', SeriesType.Line, '%')) self.AddChart(drawdownPlot) plottedTickers = [] # let's plot the series of optimal weights optWeightsPlot = Chart('Chart Optimal Weights %') for ticker in dictParameters.keys(): if dictParameters[ticker]['addTicker'][0]: optWeightsPlot.AddSeries(Series(ticker, SeriesType.Line, '%')) plottedTickers.append(ticker) tradableTicker = dictParameters[ticker]['addTicker'][1] if tradableTicker not in plottedTickers: optWeightsPlot.AddSeries(Series(tradableTicker, SeriesType.Line, '%')) plottedTickers.append(tradableTicker) cashTicker = dictParameters[ticker]['cashTicker'][0] if cashTicker not in plottedTickers: optWeightsPlot.AddSeries(Series(cashTicker, SeriesType.Line, '%')) plottedTickers.append(cashTicker) # add cashDict as well for ticker in cashDict.keys(): if ticker not in plottedTickers: optWeightsPlot.AddSeries(Series(ticker, SeriesType.Line, '%')) self.AddChart(optWeightsPlot) # let's plot the series of sma #smaPlot = Chart('Chart SMA') #for ticker in dictParameters.keys(): # if dictParameters[ticker]['addTicker'][0]: # smaPlot.AddSeries(Series(ticker + ' - price', SeriesType.Line, '')) # smaPlot.AddSeries(Series(ticker + ' - sma', SeriesType.Line, '')) # tradableTicker = dictParameters[ticker]['addTicker'][1] # if tradableTicker != ticker: # smaPlot.AddSeries(Series(tradableTicker + ' - price', SeriesType.Line, '')) # smaPlot.AddSeries(Series(tradableTicker + ' - sma', SeriesType.Line, '')) #self.AddChart(smaPlot) # let's plot the series of macd #macdPlot = Chart('Chart MACD') #for ticker in dictParameters.keys(): # if dictParameters[ticker]['addTicker'][0]: # macdPlot.AddSeries(Series(ticker + ' - macd', SeriesType.Line, '')) # macdPlot.AddSeries(Series(ticker + ' - signal', SeriesType.Line, '')) # tradableTicker = dictParameters[ticker]['addTicker'][1] # if tradableTicker != ticker: # macdPlot.AddSeries(Series(tradableTicker + ' - macd', SeriesType.Line, '')) # macdPlot.AddSeries(Series(tradableTicker + ' - signal', SeriesType.Line, '')) #self.AddChart(macdPlot) if maxTotalAllocationPercent <= 1: # set the brokerage model for slippage and fees self.SetBrokerageModel(AlphaStreamsBrokerageModel()) else: # add leverage self.UniverseSettings.Leverage = maxTotalAllocationPercent + 1 # set requested data resolution and disable fill forward data self.UniverseSettings.Resolution = Resolution.Hour ### select modules -------------------------------------------------------------------------------------------------------- # Universe Selection --------------------------------------------------- self.AddData(QuandlTreasuryRates, 'USTREASURY/YIELD', Resolution.Daily) addedTickers = [] symbols = [] # loop through the tickers and create symbols for the universe for ticker in dictParameters.keys(): if dictParameters[ticker]['addTicker'][0]: symbols.append(Symbol.Create(ticker, SecurityType.Equity, Market.USA)) addedTickers.append(ticker) tradableTicker = dictParameters[ticker]['addTicker'][1] if tradableTicker not in addedTickers: symbols.append(Symbol.Create(tradableTicker, SecurityType.Equity, Market.USA)) addedTickers.append(tradableTicker) cashTicker = dictParameters[ticker]['cashTicker'][0] if cashTicker not in addedTickers: symbols.append(Symbol.Create(cashTicker, SecurityType.Equity, Market.USA)) addedTickers.append(cashTicker) for ticker in cashDict.keys(): if ticker not in addedTickers: symbols.append(Symbol.Create(ticker, SecurityType.Equity, Market.USA)) self.SetUniverseSelection(ManualUniverseSelectionModel(symbols)) # Alpha Creation ------------------------------------------------------- self.SetAlpha(LongOnlyConstantAlphaCreationModel(rebalanceAtLaunch = rebalanceAtLaunch, rebalancingHour = rebalancingHour)) # Portfolio Construction ----------------------------------------------- self.SetPortfolioConstruction(CustomPortfolioOptimizationConstructionModel(emails = emails, emailSubject = emailSubject, maxTotalAllocationPercent = maxTotalAllocationPercent, rebalanceAtLaunch = rebalanceAtLaunch, rebalancingPeriod = rebalancingPeriod, lookbackOptimization = lookbackOptimization, objectiveFunction = objectiveFunction, activateWeightFiltering = activateWeightFiltering, cashDict = cashDict, dictParameters = dictParameters, lookbackNegativeYield = lookbackNegativeYield, startCrisisYieldValue = startCrisisYieldValue)) # Execution ------------------------------------------------------------ self.SetExecution(ImmediateExecutionModel()) # Risk Management ------------------------------------------------------ self.SetRiskManagement(ATRTrailingStopRiskManagementModel(emails = emails, emailSubject = emailSubject, dictParameters = dictParameters, rebalancingPeriod = rebalancingPeriod)) class QuandlTreasuryRates(PythonQuandl): def __init__(self): self.ValueColumnName = 'value'