Overall Statistics |
Total Trades 842 Average Win 0.29% Average Loss -0.19% Compounding Annual Return 8.604% Drawdown 12.600% Expectancy 1.291 Net Profit 227.421% Sharpe Ratio 1.235 Probabilistic Sharpe Ratio 78.823% Loss Rate 9% Win Rate 91% Profit-Loss Ratio 1.53 Alpha 0.073 Beta -0.012 Annual Standard Deviation 0.058 Annual Variance 0.003 Information Ratio -0.069 Tracking Error 0.192 Treynor Ratio -6.097 Total Fees $846.44 |
import pandas as pd import numpy as np from scipy.optimize import minimize class PortfolioOptimizer: ''' Description: Implementation of a custom optimizer that calculates the weights for each asset to optimize a given objective function Details: Optimization can be: - Equal Weighting - Maximize Portfolio Return - Minimize Portfolio Standard Deviation - Mean-Variance (minimize Standard Deviation given a target return) - Maximize Portfolio Sharpe Ratio - Maximize Portfolio Sortino Ratio - Risk Parity Portfolio Constraints: - Weights must be between some given boundaries - Weights must sum to 1 ''' def __init__(self, minWeight = 0, maxWeight = 1): ''' Description: Initialize the CustomPortfolioOptimizer Args: minWeight(float): The lower bound on portfolio weights maxWeight(float): The upper bound on portfolio weights ''' self.minWeight = minWeight self.maxWeight = maxWeight def Optimize(self, objFunction, dailyReturnsDf, targetReturn = None): ''' Description: Perform portfolio optimization given a series of returns Args: objFunction: The objective function to optimize (equalWeighting, maxReturn, minVariance, meanVariance, maxSharpe, maxSortino, riskParity) dailyReturnsDf: DataFrame of historical daily arithmetic returns Returns: Array of double with the portfolio weights (size: K x 1) ''' # initial weights: equally weighted size = dailyReturnsDf.columns.size # K x 1 self.initWeights = np.array(size * [1. / size]) # get sample covariance matrix covariance = dailyReturnsDf.cov() # get the sample covariance matrix of only negative returns for sortino ratio negativeReturnsDf = dailyReturnsDf[dailyReturnsDf < 0] covarianceNegativeReturns = negativeReturnsDf.cov() if objFunction == 'equalWeighting': return self.initWeights bounds = tuple((self.minWeight, self.maxWeight) for x in range(size)) constraints = [{'type': 'eq', 'fun': lambda x: np.sum(x) - 1.0}] if objFunction == 'meanVariance': # if no target return is provided, use the resulting from equal weighting if targetReturn is None: targetReturn = self.CalculateAnnualizedPortfolioReturn(dailyReturnsDf, self.initWeights) constraints.append( {'type': 'eq', 'fun': lambda weights: self.CalculateAnnualizedPortfolioReturn(dailyReturnsDf, weights) - targetReturn} ) opt = minimize(lambda weights: self.ObjectiveFunction(objFunction, dailyReturnsDf, covariance, covarianceNegativeReturns, weights), x0 = self.initWeights, bounds = bounds, constraints = constraints, method = 'SLSQP') return opt['x'] def ObjectiveFunction(self, objFunction, dailyReturnsDf, covariance, covarianceNegativeReturns, weights): ''' Description: Compute the objective function Args: objFunction: The objective function to optimize (equalWeighting, maxReturn, minVariance, meanVariance, maxSharpe, maxSortino, riskParity) dailyReturnsDf: DataFrame of historical daily returns covariance: Sample covariance covarianceNegativeReturns: Sample covariance matrix of only negative returns weights: Portfolio weights ''' if objFunction == 'maxReturn': f = self.CalculateAnnualizedPortfolioReturn(dailyReturnsDf, weights) return -f # convert to negative to be minimized elif objFunction == 'minVariance': f = self.CalculateAnnualizedPortfolioStd(covariance, weights) return f elif objFunction == 'meanVariance': f = self.CalculateAnnualizedPortfolioStd(covariance, weights) return f elif objFunction == 'maxSharpe': f = self.CalculateAnnualizedPortfolioSharpeRatio(dailyReturnsDf, covariance, weights) return -f # convert to negative to be minimized elif objFunction == 'maxSortino': f = self.CalculateAnnualizedPortfolioSortinoRatio(dailyReturnsDf, covarianceNegativeReturns, weights) return -f # convert to negative to be minimized elif objFunction == 'riskParity': f = self.CalculateRiskParityFunction(covariance, weights) return f else: raise ValueError(f'PortfolioOptimizer.ObjectiveFunction: objFunction input has to be one of equalWeighting,' + ' maxReturn, minVariance, meanVariance, maxSharpe, maxSortino or riskParity') def CalculateAnnualizedPortfolioReturn(self, dailyReturnsDf, weights): annualizedPortfolioReturns = np.sum( ((1 + dailyReturnsDf.mean())**252 - 1) * weights ) return annualizedPortfolioReturns def CalculateAnnualizedPortfolioStd(self, covariance, weights): annualizedPortfolioStd = np.sqrt( np.dot(weights.T, np.dot(covariance * 252, weights)) ) if annualizedPortfolioStd == 0: raise ValueError(f'PortfolioOptimizer.CalculateAnnualizedPortfolioStd: annualizedPortfolioStd cannot be zero. Weights: {weights}') return annualizedPortfolioStd def CalculateAnnualizedPortfolioNegativeStd(self, covarianceNegativeReturns, weights): annualizedPortfolioNegativeStd = np.sqrt( np.dot(weights.T, np.dot(covarianceNegativeReturns * 252, weights)) ) if annualizedPortfolioNegativeStd == 0: raise ValueError(f'PortfolioOptimizer.CalculateAnnualizedPortfolioNegativeStd: annualizedPortfolioNegativeStd cannot be zero. Weights: {weights}') return annualizedPortfolioNegativeStd def CalculateAnnualizedPortfolioSharpeRatio(self, dailyReturnsDf, covariance, weights): annualizedPortfolioReturn = self.CalculateAnnualizedPortfolioReturn(dailyReturnsDf, weights) annualizedPortfolioStd = self.CalculateAnnualizedPortfolioStd(covariance, weights) annualizedPortfolioSharpeRatio = annualizedPortfolioReturn / annualizedPortfolioStd return annualizedPortfolioSharpeRatio def CalculateAnnualizedPortfolioSortinoRatio(self, dailyReturnsDf, covarianceNegativeReturns, weights): annualizedPortfolioReturn = self.CalculateAnnualizedPortfolioReturn(dailyReturnsDf, weights) annualizedPortfolioNegativeStd = self.CalculateAnnualizedPortfolioNegativeStd(covarianceNegativeReturns, weights) annualizedPortfolioSortinoRatio = annualizedPortfolioReturn / annualizedPortfolioNegativeStd return annualizedPortfolioSortinoRatio def CalculateRiskParityFunction(self, covariance, weights): ''' Spinu formulation for risk parity portfolio ''' assetsRiskBudget = self.initWeights portfolioVolatility = self.CalculateAnnualizedPortfolioStd(covariance, weights) x = weights / portfolioVolatility riskParity = (np.dot(x.T, np.dot(covariance, x)) / 2) - np.dot(assetsRiskBudget.T, np.log(x)) return riskParity
from OptimalWeightsAlphaCreation import OptimalWeightsAlphaCreationModel class PortfolioOptimizationSystem(QCAlgorithmFramework): def Initialize(self): ### user-defined inputs --------------------------------------------------------------------------------------------------- self.SetStartDate(2006, 1, 1) # Set Start Date #self.SetEndDate(2020, 5, 1) # Set End Date self.SetCash(100000) # Set Strategy Cash # UNIVERSE ------------------------------------------------------------- # select tickers tickers = ['IEF', 'TLT', 'GLD', 'SPY', 'QQQ'] # ALPHA ---------------------------------------------------------------- # select the objective function to optimize the portfolio weights # options are: equalWeighting, maxReturn, minVariance, meanVariance, maxSharpe, maxSortino, riskParity objectiveFunction = 'riskParity' # select number of lookback days for optimization lookbackOptimization = 63 # select the logic for rebalancing period # date rules (for the first trading day of period): Expiry.EndOfDay, Expiry.EndOfWeek, Expiry.EndOfMonth, # Expiry.EndOfQuarter, Expiry.EndOfYear rebalancingPeriod = Expiry.EndOfMonth ### plotting ---------------------------------------------------------------------------------------------------------------- # let's plot the series of daily total portfolio exposure % portfolioExposurePlot = Chart('Chart Total Portfolio Exposure %') portfolioExposurePlot.AddSeries(Series('Daily Portfolio Exposure %', SeriesType.Line, '')) self.AddChart(portfolioExposurePlot) # let's plot the series of drawdown % from the most recent high drawdownPlot = Chart('Chart Drawdown %') drawdownPlot.AddSeries(Series('Drawdown %', SeriesType.Line, '%')) self.AddChart(drawdownPlot) # let's plot the series of optimal weights optWeightsPlot = Chart('Chart Optimal Weights %') for ticker in tickers: optWeightsPlot.AddSeries(Series(ticker, SeriesType.Line, '%')) ### select modules ------------------------------------------------------------------------------------------------------------ # set the brokerage model for slippage and fees self.SetBrokerageModel(AlphaStreamsBrokerageModel()) # set requested data resolution self.UniverseSettings.Resolution = Resolution.Daily # Universe ------------------------------------------------------------- symbols = [] # loop through the tickers list and create symbols for the Universe for ticker in tickers: symbols.append(Symbol.Create(ticker, SecurityType.Equity, Market.USA)) self.SetUniverseSelection(ManualUniverseSelectionModel(symbols)) # Alpha --------------------------------------------------------------- self.SetAlpha(OptimalWeightsAlphaCreationModel(rebalancingPeriod = rebalancingPeriod, objectiveFunction = objectiveFunction, lookbackOptimization = lookbackOptimization)) # Portfolio ------------------------------------------------------------ pcm = InsightWeightingPortfolioConstructionModel(lambda time: rebalancingPeriod(time)) pcm.RebalanceOnInsightChanges = False # disable rebalancing on insights changes (new/expired insights) pcm.RebalanceOnSecurityChanges = False # enable rebalancing on universe changes self.SetPortfolioConstruction(pcm) # Execution ------------------------------------------------------------ self.SetExecution(ImmediateExecutionModel()) # Risk ----------------------------------------------------------------- self.SetRiskManagement(NullRiskManagementModel())
import matplotlib.pyplot as plt import matplotlib.colors as colors import matplotlib.ticker as mticker import random import pandas as pd import numpy as np from PortfolioOptimizerClass import PortfolioOptimizer def CalculateRelativeRiskContributions(returnsDf, weights): ''' Calculate the Relative Risk Contributions for each asset given returns and weights ''' covariance = returnsDf.cov() rrc = weights * np.dot(weights.T, covariance) / np.dot(weights.T, np.dot(covariance, weights)) return rrc def CalculateDrawdownSeries(timeSeries): ''' Calculate the drawdown series from a time series of cumulative returns ''' df = timeSeries.to_frame() lastPeak = df.iloc[0][0] df['Drawdown'] = 1 for i in range(len(df)): if df.iloc[i, 0] < lastPeak: df.iloc[i, 1] = df.iloc[i, 0] / lastPeak else: lastPeak = df.iloc[i, 0] finalDf = (df['Drawdown'] - 1) return finalDf def CalculateMonthlyReturns(df): ''' Calculate the monthly returns from a dataframe of daily prices ''' dfStartMonth = (df.groupby([df.index.year, df.index.month]) .apply(lambda x: x.head(1))) # group by year and month and take the first value of the month dfStartMonth.index = dfStartMonth.index.droplevel(0) # drop the index level we don't need dfStartMonth.index = dfStartMonth.index.droplevel(0) # drop the index level we don't need dfStartMonth = dfStartMonth.append(df.iloc[-1].to_frame().transpose()) # append last day to dfStartMonth dfFinal = dfStartMonth.pct_change().shift(-1).dropna() # calculate pct change, shift series up and drop nas return dfFinal def CalculateAnnualizedVolatilityReturn(returnsDf, weights = None, weightsType = 'series', factor = 252): ''' Calculate the pair annualized volatility (std) and annualized return given returns, weights and annualization factor ''' if weights is None: annualizedVolatility = returnsDf.std() * np.sqrt(factor) annualizedReturn = ((1 + returnsDf.mean())**factor) - 1 else: if weightsType =='series': annualizedVolatility = returnsDf.mul(weights).sum(axis = 1).std() * np.sqrt(factor) annualizedReturn = ((1 + returnsDf.mul(weights).sum(axis = 1).mean())**factor) - 1 elif weightsType == 'array': covariance = returnsDf.cov() annualizedVolatility = np.sqrt( np.dot(weights.T, np.dot(covariance * factor, weights)) ) annualizedReturn = np.sum( ((1 + returnsDf.mean())**factor - 1) * weights ) else: raise ValueError('weightsType must be one of: series or array') return annualizedVolatility, annualizedReturn def PlotVolWeightsRrcComparison(plotDict, title, colors, plotSize = (20, 5)): ''' Plot a comparison across optimizations showing volatility, weights and rrc ''' def ApplyPlotStyle(x, labels): x.set_xticklabels(labels, rotation = 45) ticks = mticker.FuncFormatter(lambda y, _: '{:.0%}'.format(y)) x.yaxis.set_major_formatter(ticks) x.spines['right'].set_visible(False) x.spines['top'].set_visible(False) x.xaxis.set_ticks_position('none') x.yaxis.set_ticks_position('none') keysList = sorted(list(plotDict.keys()) * 3) n = len(list(plotDict.keys())) seriesList = [0, 1, 2] * n fig, axs = plt.subplots(n, int(n / 2), figsize = plotSize) #fig, axs = plt.subplots(3, 3, figsize = plotSize) fig.suptitle(title, size = 15) for i, ax in enumerate(fig.axes): plot = plotDict[keysList[i]][seriesList[i]] ax.bar(plot.index, plot.values, color = colors) ApplyPlotStyle(ax, plot.index) if seriesList[i] == 0: ax.set_ylabel(keysList[i], rotation = 90, size = 13) if i == 0: ax.set_title('Annualized Volatility', size = 13) elif i == 1: ax.set_title('Portfolio Weights', size = 13) elif i == 2: ax.set_title('Relative Risk Contribution', size = 13) fig.subplots_adjust(bottom = -1) def PlotMonthlyWeights(weightsDict, lookbackForTitle, colors, plotSize = (20, 5)): ''' Plot a stackplot of the time series of monthly optimal weights for each objective function ''' def ApplyPlotStyle(x, title): x.set_title(title, fontsize = 13) x.spines['right'].set_visible(False) x.spines['top'].set_visible(False) x.xaxis.set_ticks_position('none') x.yaxis.set_ticks_position('none') keysList = list(weightsDict.keys()) n = len(keysList) fig, axs = plt.subplots(int(n / 2), int(n / 3), figsize = plotSize) fig.suptitle('Portfolio Optimization (' + str(lookbackForTitle) + '-Month Lookback) - Monthly Weights', size = 15) for i, ax in enumerate(fig.axes): plot = weightsDict[keysList[i]] ax.stackplot(plot.index, plot.T, colors = colors) title = keysList[i] if keysList[i] == 'equalWeighting': title = keysList[i] ApplyPlotStyle(ax, title) if i == 0: ax.legend(list(plot.columns), ncol = len(list(plot.columns)), bbox_to_anchor = (1.1, 1.25), loc = 'upper center') fig.subplots_adjust(bottom = -0.1) def GenerateCumRetDf(returnsDf, weightsDict): ''' Generate a dataframe with the time series of cumulative returns for each objective function and individual asset ''' cumRetList = [] columnsList = [] for objFunction in weightsDict: cumRet = returnsDf.mul(weightsDict[objFunction]).sum(axis = 1).add(1).cumprod() - 1 cumRetList.append(cumRet) columnsList.append(objFunction) for ticker in returnsDf: data = returnsDf[ticker] cumRet = data.add(1).cumprod() - 1 cumRetList.append(cumRet) columnsList.append(ticker) cumRetDf = pd.concat(cumRetList, axis = 1) currentColumnsList = list(cumRetDf.columns) cumRetDf = cumRetDf.rename(columns = {currentColumnsList[i]: columnsList[i] for i in range(len(currentColumnsList))}) return cumRetDf def GenerateDrawdownDf(returnsDf, weightsDict): ''' Generate the time series of drawdown for each objective function and individual asset ''' ddList = [] columnsList = [] for objFunction in weightsDict: cumRet = returnsDf.mul(weightsDict[objFunction]).sum(axis = 1).add(1).cumprod() ddSeries = CalculateDrawdownSeries(cumRet) ddSeries.name = None ddList.append(ddSeries) columnsList.append(objFunction) for ticker in returnsDf: data = returnsDf[ticker] cumRet = data.add(1).cumprod() ddSeries = CalculateDrawdownSeries(cumRet) ddSeries.name = None ddList.append(ddSeries) columnsList.append(ticker) ddDf = pd.concat(ddList, axis = 1) currentColumnsList = list(ddDf.columns) ddDf = ddDf.rename(columns = {currentColumnsList[i]: columnsList[i] for i in range(len(currentColumnsList))}) return ddDf def PlotCumRetDD(cumRetDf, ddDf, title, colors, plotSize = (12, 8)): ''' Plot the time series of cumulative returns and drawdown ''' def ApplyPlotStyle(x): ticks = mticker.FuncFormatter(lambda y, _: '{:.0%}'.format(y)) x.yaxis.set_major_formatter(ticks) x.yaxis.grid(True) x.spines['right'].set_visible(False) x.spines['top'].set_visible(False) x.xaxis.set_ticks_position('none') x.yaxis.set_ticks_position('none') fig, (ax1, ax2) = plt.subplots(2, 1, figsize = plotSize) ax1.set_prop_cycle('color', colors) ax1.plot(cumRetDf) ax1.set_title(title, fontsize = 15) ax1.axes.get_xaxis().set_visible(False) ax1.spines['bottom'].set_visible(False) ApplyPlotStyle(ax1) ax2.set_prop_cycle('color', colors) ax2.plot(ddDf) ApplyPlotStyle(ax2) ax1.legend(list(cumRetDf.columns), loc = 'left') fig.tight_layout() def PlotRiskRewardProfile(returnsDf, weightsDict, cumReturnsDf, drawdownDf, title, colors, plotSize = (12, 8)): ''' Plot multiple scatter plots with different risk/reward profiles: 1) Annualized Return/Annualized Volatility 2) Sharpe Ratio/Maximum Drawdown 3) Annualized Return/Maximum Drawdown 4) Final summary table with results ''' def ApplyPlot1Style(x, title): x.set_title(title, fontsize = 15) ticks = mticker.FuncFormatter(lambda y, _: '{:.0%}'.format(y)) x.xaxis.set_major_formatter(ticks) x.yaxis.set_major_formatter(ticks) x.spines['right'].set_visible(False) x.spines['top'].set_visible(False) x.set_ylim(ymin = 0) x.set_xlim(xmin = 0) x.xaxis.label.set_size(12) x.yaxis.label.set_size(12) x.xaxis.set_ticks_position('none') x.yaxis.set_ticks_position('none') def ApplyPlot2Style(x, title): x.set_title(title, fontsize = 15) ticks = mticker.FuncFormatter(lambda y, _: '{:.0%}'.format(y)) x.xaxis.set_major_formatter(ticks) x.spines['right'].set_visible(False) x.spines['top'].set_visible(False) x.set_ylim(ymin = 0) x.set_xlim(xmin = 0) x.xaxis.label.set_size(12) x.yaxis.label.set_size(12) x.xaxis.set_ticks_position('none') x.yaxis.set_ticks_position('none') def ApplyFormat(a, b, c, d, e): a = '{:.0%}'.format(a) b = '{:.0%}'.format(b) c = round(c, 1) d = '{:.0%}'.format(d) e = '{:.0%}'.format(e) return (a, b, c, d, e) fig, (ax1, ax2, ax3) = plt.subplots(3, 1, figsize = plotSize) fig.suptitle(title, fontsize = 15) dataList = [] count = 0 for objFunction in weightsDict: annualizedPortfolioVolatility, annualizedPortfolioReturn = CalculateAnnualizedVolatilityReturn(returnsDf = returnsDf, weights = weightsDict[objFunction], factor = 12) ax1.scatter(annualizedPortfolioVolatility, annualizedPortfolioReturn, color = colors[count], s = 100) ax1.text(annualizedPortfolioVolatility, annualizedPortfolioReturn, objFunction, verticalalignment = 'top', horizontalalignment = 'left', fontsize = 13) cumRet = cumReturnsDf[objFunction][-1] maxDD = max(abs(drawdownDf[objFunction])) ax2.scatter(maxDD, annualizedPortfolioReturn, color = colors[count], s = 100) ax2.text(maxDD, annualizedPortfolioReturn, objFunction, verticalalignment = 'top', horizontalalignment = 'left', fontsize = 13) annualizedPortfolioSharpeRatio = annualizedPortfolioReturn / annualizedPortfolioVolatility ax3.scatter(maxDD, annualizedPortfolioSharpeRatio, color = colors[count], s = 100) ax3.text(maxDD, annualizedPortfolioSharpeRatio, objFunction, verticalalignment = 'top', horizontalalignment = 'left', fontsize = 13) dataTuple = ApplyFormat(annualizedPortfolioReturn, annualizedPortfolioVolatility, annualizedPortfolioSharpeRatio, maxDD, cumRet) dataList.append(dataTuple) count += 1 for ticker in returnsDf: data = returnsDf[ticker] annualizedVolatility, annualizedReturn = CalculateAnnualizedVolatilityReturn(returnsDf = data, weightsType = 'array', factor = 12) ax1.scatter(annualizedVolatility, annualizedReturn, color = 'black', s = 10) ax1.text(annualizedVolatility, annualizedReturn, ticker, verticalalignment = 'top', horizontalalignment = 'left', fontsize = 8) cumRet = cumReturnsDf[[ticker]].iloc[-1][0] maxDD = max(abs(drawdownDf[[ticker]].values))[0] ax2.scatter(maxDD, annualizedReturn, color = 'black', s = 10) ax2.text(maxDD, annualizedReturn, ticker, verticalalignment = 'top', horizontalalignment = 'left', fontsize = 8) annualizedSharpeRatio = annualizedReturn / annualizedVolatility ax3.scatter(maxDD, annualizedSharpeRatio, color = 'black', s = 10) ax3.text(maxDD, annualizedSharpeRatio, ticker, verticalalignment = 'top', horizontalalignment = 'left', fontsize = 8) dataTuple = ApplyFormat(annualizedReturn, annualizedVolatility, annualizedSharpeRatio, maxDD, cumRet) dataList.append(dataTuple) ApplyPlot1Style(ax1, title = 'Return-Volatility') ApplyPlot1Style(ax2, title = 'Return-Max Drawdown') ApplyPlot2Style(ax3, title = 'Sharpe Ratio-Max Drawdown') ax1.set(xlabel = 'Annualized Volatility (Standard Deviation)', ylabel = 'Annualized Return') ax2.set(xlabel = 'Maximum Drawdown', ylabel = 'Annualized Return') ax3.set(xlabel = 'Maximum Drawdown', ylabel = 'Annualized Sharpe Ratio') table = ax3.table(cellText = dataList, rowLabels = list(cumReturnsDf.columns), colLabels = ['Ann. Return', 'Ann. Volatility', 'Sharpe Ratio', 'Max DD', 'Cum. Return'], loc = 4, cellLoc = 'center', bbox = [0.1, -1, 0.9, 0.8]) table.auto_set_font_size(False) table.set_fontsize(10) fig.subplots_adjust(top = 0.8, bottom = -1.2) def PlotReturnVolatilityMultipleLookback(startMonth, returnsDf, weightsDict, title, colors, plotSize = (12, 8)): ''' Plot scatter plot with Return/Volatility for multiple combinations of objective functions and lookback periods ''' def ApplyPlotStyle(x, title): x.set_title(title, fontsize = 15) x.set(xlabel = 'Annualized Volatility (Standard Deviation)', ylabel = 'Annualized Return') ticks = mticker.FuncFormatter(lambda y, _: '{:.0%}'.format(y)) x.xaxis.set_major_formatter(ticks) x.yaxis.set_major_formatter(ticks) x.xaxis.label.set_size(13) x.yaxis.label.set_size(13) x.spines['right'].set_visible(False) x.spines['top'].set_visible(False) x.set_ylim(ymin = 0) x.set_xlim(xmin = 0) plt.figure(figsize = plotSize) keysList = [] for month, objFunctions in weightsDict.items(): adjustedMonthlyReturns = returnsDf[returnsDf.index.isin(startMonth[int(month):])] count = 0 for objFunction, weights in objFunctions.items(): if objFunction == 'equalWeighting' and 'equalWeighting' in keysList: count += 1 continue annualizedPortfolioVolatility, annualizedPortfolioReturn = CalculateAnnualizedVolatilityReturn(returnsDf = adjustedMonthlyReturns, weights = weights, factor = 12) plt.scatter(annualizedPortfolioVolatility, annualizedPortfolioReturn, color = colors[count], s = 100) plt.text(annualizedPortfolioVolatility, annualizedPortfolioReturn, month, verticalalignment = 'top', horizontalalignment = 'left', fontsize = 13) keysList.append(objFunction) count += 1 ApplyPlotStyle(plt.gca(), title = title) uniqueKeys = sorted(set(keysList), key = keysList.index) plt.legend(uniqueKeys, loc = 'left') plt.show() def GetCoordinatesEfficientFrontier(targetReturnsArray, returnsDf): ''' Return the pairs of volatility/return coordinates for the efficient frontier ''' # initialize PortfolioOptimizer class portOpt = PortfolioOptimizer(minWeight = 0, maxWeight = 1) volatilities = [] returns = [] for targetReturn in targetReturnsArray: weights = portOpt.Optimize('meanVariance', returnsDf, targetReturn) annualizedPortfolioVolatility, annualizedPortfolioReturn = CalculateAnnualizedVolatilityReturn(returnsDf, weights, weightsType = 'array', factor = 252) volatilities.append(annualizedPortfolioVolatility) returns.append(annualizedPortfolioReturn) efficientFrontierList = [volatilities, returns] return efficientFrontierList def PlotMeanVarianceSpace(returnsDf, objFunctionList, simulations = 1000, plotSize = (20, 5)): ''' Plot the mean-variance space (and efficient frontier) with simulations of portfolios, individual assets and optimal portfolios ''' # initialize PortfolioOptimizer class portOpt = PortfolioOptimizer(minWeight = 0, maxWeight = 1) # dictionary to store pairs of volatility/return for objective functions and tickers volatilityReturnDict = {} # loop through the objective functions to generate optimal weights and calculate the pairs of volatility/return for each for objfunction in objFunctionList: weights = portOpt.Optimize(objfunction, returnsDf) annualizedPortfolioVolatility, annualizedPortfolioReturn = CalculateAnnualizedVolatilityReturn(returnsDf, weights, weightsType = 'array', factor = 252) volatilityReturnDict[objfunction] = [annualizedPortfolioVolatility, annualizedPortfolioReturn] # loop through the tickers and calculate the pairs of volatility/return for each for ticker in returnsDf: data = returnsDf[ticker] annualizedVolatility, annualizedReturn = CalculateAnnualizedVolatilityReturn(returnsDf = data, weightsType = 'array', factor = 252) volatilityReturnDict[ticker] = [annualizedVolatility, annualizedReturn] # calculate the efficient frontier # make an array of target returns between the minimum return and maximum return to run mean-variance optimization returnsList = [value[1] for key, value in volatilityReturnDict.items()] targetReturnsArray = np.linspace(volatilityReturnDict['minVariance'][1], max(returnsList), 100) efficientFrontierList = GetCoordinatesEfficientFrontier(targetReturnsArray, returnsDf) targetReturnsArray = np.linspace(min(returnsList), volatilityReturnDict['minVariance'][1], 100) beforeEfficientFrontierList = GetCoordinatesEfficientFrontier(targetReturnsArray, returnsDf) # run simulations of portfolios and extract their pairs of volatility/return n = returnsDf.columns.size volatilities = [] returns = [] for i in range(simulations): weights = np.random.dirichlet(np.ones(n), size = 1) weights = weights[0] annualizedPortfolioVolatility, annualizedPortfolioReturn = CalculateAnnualizedVolatilityReturn(returnsDf = returnsDf, weights = weights, weightsType = 'array', factor = 252) volatilities.append(annualizedPortfolioVolatility) returns.append(annualizedPortfolioReturn) simulationsVolatilitiesReturns = [np.array(volatilities), np.array(returns)] # plotting def ApplyPlotStyle(x, title): x.set_title(title, fontsize = 15) x.set(xlabel = 'Annualized Volatility (Standard Deviation)', ylabel = 'Annualized Return') ticks = mticker.FuncFormatter(lambda y, _: '{:.0%}'.format(y)) x.xaxis.set_major_formatter(ticks) x.yaxis.set_major_formatter(ticks) x.xaxis.label.set_size(13) x.yaxis.label.set_size(13) x.spines['right'].set_visible(False) x.spines['top'].set_visible(False) x.xaxis.set_ticks_position('none') x.yaxis.set_ticks_position('none') # initialize figure size plt.figure(figsize = plotSize) # plot simulated portfolios plt.scatter(simulationsVolatilitiesReturns[0], simulationsVolatilitiesReturns[1], c = simulationsVolatilitiesReturns[1] / simulationsVolatilitiesReturns[0], marker = 'o', alpha = 0.1) # plot efficient frontier plt.plot(efficientFrontierList[0], efficientFrontierList[1], '-', c = 'red') plt.plot(beforeEfficientFrontierList[0], beforeEfficientFrontierList[1], '--', c = 'black') # plot optimal portfolios for objFunction, volatilityReturn in volatilityReturnDict.items(): if objFunction == 'maxReturn': offsetText = -0.005 else: offsetText = 0 plt.plot(volatilityReturn[0], volatilityReturn[1], '-o', c = 'black') plt.text(volatilityReturn[0], volatilityReturn[1] + offsetText, objFunction, verticalalignment = 'top', horizontalalignment = 'left', fontsize = 12, c = 'black') ApplyPlotStyle(plt.gca(), title = 'Mean-Variance') plt.colorbar(label = 'Sharpe Ratio', pad = 0.1) plt.show()
from clr import AddReference AddReference("System") AddReference("QuantConnect.Common") AddReference("QuantConnect.Algorithm") AddReference("QuantConnect.Algorithm.Framework") from System import * from QuantConnect import * from QuantConnect.Algorithm import * from QuantConnect.Algorithm.Framework import * from QuantConnect.Algorithm.Framework.Alphas import AlphaModel, Insight, InsightType, InsightDirection import pandas as pd from PortfolioOptimizerClass import PortfolioOptimizer class OptimalWeightsAlphaCreationModel(AlphaModel): def __init__(self, rebalancingPeriod = Expiry.EndOfMonth, objectiveFunction = 'riskParity', lookbackOptimization = 63): self.rebalancingPeriod = rebalancingPeriod self.objectiveFunction = objectiveFunction self.lookbackOptimization = lookbackOptimization self.securities = [] # list to store securities to consider self.portfolioValueHigh = 0 # initialize portfolioValueHigh for drawdown calculation self.portfolioValueHighInitialized = False # initialize portfolioValueHighInitialized for drawdown calculation # initialize PortfolioOptimizer class self.portOpt = PortfolioOptimizer(minWeight = 0, maxWeight = 1) self.nextRebalancingTime = None def Update(self, algorithm, data): # initialize nextRebalancingTime if self.nextRebalancingTime is None: self.nextRebalancingTime = self.rebalancingPeriod(algorithm.Time) insights = [] # list to store the new insights to be created # make sure we only send insights at rebalancing if algorithm.Time >= self.nextRebalancingTime: ### plotting ---------------------------------------------------------------------------------------- currentTotalPortfolioValue = algorithm.Portfolio.TotalPortfolioValue # get current portfolio value # plot the daily total portfolio exposure % totalPortfolioExposure = (algorithm.Portfolio.TotalHoldingsValue / currentTotalPortfolioValue) * 100 algorithm.Plot('Chart Total Portfolio Exposure %', 'Daily Portfolio Exposure %', totalPortfolioExposure) # plot the drawdown % from the most recent high if not self.portfolioValueHighInitialized: self.portfolioHigh = currentTotalPortfolioValue # set initial portfolio value self.portfolioValueHighInitialized = True # update trailing high value of the portfolio if self.portfolioValueHigh < currentTotalPortfolioValue: self.portfolioValueHigh = currentTotalPortfolioValue currentDrawdownPercent = ((float(currentTotalPortfolioValue) / float(self.portfolioValueHigh)) - 1.0) * 100 algorithm.Plot('Chart Drawdown %', 'Drawdown %', currentDrawdownPercent) ### generate insights ------------------------------------------------------------------------------ # calculate optimal weights symbols = [security.Symbol for security in self.securities] weights = self.CalculateOptimalWeights(algorithm, symbols, self.objectiveFunction, self.lookbackOptimization) # insight expiry time and direction insightExpiry = Expiry.EndOfDay(algorithm.Time) insightDirection = InsightDirection.Up # insight direction # loop through securities and generate insights for security in self.securities: weight = weights[str(security.Symbol.Value)] # append the insights list with the prediction and weights for each symbol insights.append(Insight.Price(security.Symbol, insightExpiry, insightDirection, None, None, None, weight)) algorithm.Plot('Chart Optimal Weights %', security.Symbol.Value, float(weight)) self.nextRebalancingTime = self.rebalancingPeriod(algorithm.Time) return insights def CalculateOptimalWeights(self, algorithm, symbols, objectiveFunction, lookbackOptimization): ''' Description: Calculate the optimal weights for each symbol provided Args: algorithm: The algorithm instance that experienced the change in securities symbols: The symbols in current universe objectiveFunction: The objective function for optimization lookbackOptimization: The number of days to look back to calculate optimal weights ''' # get historical close prices historyClosePrices = algorithm.History(symbols, lookbackOptimization, Resolution.Daily)['close'].unstack(level = 0) # calculate daily returns returnsDf = historyClosePrices.pct_change().dropna() # rename the columns in the dataframe in order to have tickers and not symbol strings columnsList = list(returnsDf.columns) returnsDf.rename(columns = {columnsList[i]: algorithm.ActiveSecurities[columnsList[i]].Symbol.Value for i in range(len(columnsList))}, inplace = True) # calculate optimal weights weights = self.portOpt.Optimize(objectiveFunction, returnsDf) # convert the weights to a pandas Series weights = pd.Series(weights, index = returnsDf.columns, name = 'weights') algorithm.Log(weights) return weights def OnSecuritiesChanged(self, algorithm, changes): ''' Description: Event fired each time the we add/remove securities from the data feed Args: algorithm: The algorithm instance that experienced the change in securities changes: The security additions and removals from the algorithm ''' # add new securities for added in changes.AddedSecurities: algorithm.Log('adding symbol: ' + str(added.Symbol)) self.securities.append(added) # remove securities for removed in changes.RemovedSecurities: if removed in self.securities: self.securities.remove(removed)