Overall Statistics
Total Trades
0
Average Win
0%
Average Loss
0%
Compounding Annual Return
0%
Drawdown
0%
Expectancy
0
Net Profit
0%
Sharpe Ratio
0
Probabilistic Sharpe Ratio
0%
Loss Rate
0%
Win Rate
0%
Profit-Loss Ratio
0
Alpha
0
Beta
0
Annual Standard Deviation
0
Annual Variance
0
Information Ratio
0.709
Tracking Error
0.469
Treynor Ratio
0
Total Fees
$0.00
Estimated Strategy Capacity
$0
Lowest Capacity Asset
import matplotlib.pyplot as plt
import matplotlib.colors as colors
import matplotlib.ticker as mticker
import random
import pandas as pd
import numpy as np
from PortfolioOptimizerClass import PortfolioOptimizer

def CalculateRelativeRiskContributions(returnsDf, weights):

    ''' Calculate the Relative Risk Contributions for each asset given returns and weights '''

    covariance = returnsDf.cov()
    rrc = weights * np.dot(weights.T, covariance) / np.dot(weights.T, np.dot(covariance, weights))

    return rrc
    
def CalculateDrawdownSeries(timeSeries):

    ''' Calculate the drawdown series from a time series of cumulative returns '''

    df = timeSeries.to_frame()
    lastPeak = df.iloc[0][0]
    df['Drawdown'] = 1
    for i in range(len(df)):
        if df.iloc[i, 0] < lastPeak:
            df.iloc[i, 1] = df.iloc[i, 0] / lastPeak
        else:
            lastPeak = df.iloc[i, 0]
    
    finalDf = (df['Drawdown'] - 1)
    
    return finalDf
    
def CalculateMonthlyReturns(df):

    ''' Calculate the monthly returns from a dataframe of daily prices '''

    dfStartMonth = (df.groupby([df.index.year, df.index.month])
                    .apply(lambda x: x.head(1))) # group by year and month and take the first value of the month
    dfStartMonth.index = dfStartMonth.index.droplevel(0) # drop the index level we don't need
    dfStartMonth.index = dfStartMonth.index.droplevel(0) # drop the index level we don't need
    dfStartMonth = dfStartMonth.append(df.iloc[-1].to_frame().transpose()) # append last day to dfStartMonth
    dfFinal = dfStartMonth.pct_change().shift(-1).dropna() # calculate pct change, shift series up and drop nas

    return dfFinal
    
def CalculateAnnualizedVolatilityReturn(returnsDf, weights = None, weightsType = 'series', factor = 252):
    
    ''' Calculate the pair annualized volatility (std) and annualized return
    given returns, weights and annualization factor '''
    
    if weights is None:
        annualizedVolatility = returnsDf.std() * np.sqrt(factor)
        annualizedReturn = ((1 + returnsDf.mean())**factor) - 1
    else:
        if weightsType =='series':
            annualizedVolatility = returnsDf.mul(weights).sum(axis = 1).std() * np.sqrt(factor)
            annualizedReturn = ((1 + returnsDf.mul(weights).sum(axis = 1).mean())**factor) - 1
        elif weightsType == 'array':
            covariance = returnsDf.cov()
            annualizedVolatility = np.sqrt( np.dot(weights.T, np.dot(covariance * factor, weights)) )
            annualizedReturn = np.sum( ((1 + returnsDf.mean())**factor - 1) * weights )
        else:
            raise ValueError('weightsType must be one of: series or array')
    
    return annualizedVolatility, annualizedReturn
    
def PlotVolWeightsRrcComparison(plotDict, title, colors, plotSize = (20, 5)):
    
    ''' Plot a comparison across optimizations showing volatility, weights and rrc '''
    
    def ApplyPlotStyle(x, labels):
        x.set_xticklabels(labels, rotation = 45)
        ticks = mticker.FuncFormatter(lambda y, _: '{:.0%}'.format(y))
        x.yaxis.set_major_formatter(ticks)
        x.spines['right'].set_visible(False)
        x.spines['top'].set_visible(False)
        x.xaxis.set_ticks_position('none')
        x.yaxis.set_ticks_position('none')
    
    keysList = sorted(list(plotDict.keys()) * 3)
    n = len(list(plotDict.keys()))
    seriesList = [0, 1, 2] * n

    fig, axs = plt.subplots(n, int(n / 2), figsize = plotSize)
    #fig, axs = plt.subplots(3, 3, figsize = plotSize)
    fig.suptitle(title, size = 15)
    
    for i, ax in enumerate(fig.axes):
        plot = plotDict[keysList[i]][seriesList[i]]
        ax.bar(plot.index, plot.values, color = colors)
        ApplyPlotStyle(ax, plot.index)
        
        if seriesList[i] == 0:
            ax.set_ylabel(keysList[i], rotation = 90, size = 13)
            
        if i == 0:
            ax.set_title('Annualized Volatility', size = 13)
        elif i == 1:
            ax.set_title('Portfolio Weights', size = 13)
        elif i == 2:
            ax.set_title('Relative Risk Contribution', size = 13)
        
    fig.subplots_adjust(bottom = -1)
    
def PlotMonthlyWeights(weightsDict, lookbackForTitle, colors, plotSize = (20, 5)):
    
    ''' Plot a stackplot of the time series of monthly optimal weights for each objective function '''
    
    def ApplyPlotStyle(x, title):
        x.set_title(title, fontsize = 13)
        x.spines['right'].set_visible(False)
        x.spines['top'].set_visible(False)
        x.xaxis.set_ticks_position('none')
        x.yaxis.set_ticks_position('none')
    
    keysList = list(weightsDict.keys())
    n = len(keysList)

    fig, axs = plt.subplots(int(n / 2), int(n / 3), figsize = plotSize)
    fig.suptitle('Portfolio Optimization (' + str(lookbackForTitle) + '-Month Lookback) - Monthly Weights', size = 15)
    
    for i, ax in enumerate(fig.axes):
        plot = weightsDict[keysList[i]]
        ax.stackplot(plot.index, plot.T, colors = colors)
        title = keysList[i]
        if keysList[i] == 'equalWeighting':
            title = keysList[i]
        ApplyPlotStyle(ax, title)
        if i == 0:
            ax.legend(list(plot.columns), ncol = len(list(plot.columns)), bbox_to_anchor = (1.1, 1.25), loc = 'upper center')
        
    fig.subplots_adjust(bottom = -0.1)
    
def GenerateCumRetDf(returnsDf, weightsDict):
    
    ''' Generate a dataframe with the time series of cumulative returns
    for each objective function and individual asset '''
    
    cumRetList = []
    columnsList = []

    for objFunction in weightsDict:
        cumRet = returnsDf.mul(weightsDict[objFunction]).sum(axis = 1).add(1).cumprod() - 1
        cumRetList.append(cumRet)
        columnsList.append(objFunction)
    
    for ticker in returnsDf:
        data = returnsDf[ticker]
        cumRet = data.add(1).cumprod() - 1
        cumRetList.append(cumRet)
        columnsList.append(ticker)

    cumRetDf = pd.concat(cumRetList, axis = 1)
    currentColumnsList = list(cumRetDf.columns)
    cumRetDf = cumRetDf.rename(columns = {currentColumnsList[i]: columnsList[i] for i in range(len(currentColumnsList))})
    
    return cumRetDf
    
def GenerateDrawdownDf(returnsDf, weightsDict):
    
    ''' Generate the time series of drawdown for each objective function and individual asset '''
    
    ddList = []
    columnsList = []
    
    for objFunction in weightsDict:
        cumRet = returnsDf.mul(weightsDict[objFunction]).sum(axis = 1).add(1).cumprod()
        ddSeries = CalculateDrawdownSeries(cumRet)
        ddSeries.name = None
        ddList.append(ddSeries)
        columnsList.append(objFunction)
        
    for ticker in returnsDf:
        data = returnsDf[ticker]
        cumRet = data.add(1).cumprod()
        ddSeries = CalculateDrawdownSeries(cumRet)
        ddSeries.name = None
        ddList.append(ddSeries)
        columnsList.append(ticker)

    ddDf = pd.concat(ddList, axis = 1)
    currentColumnsList = list(ddDf.columns)
    ddDf = ddDf.rename(columns = {currentColumnsList[i]: columnsList[i] for i in range(len(currentColumnsList))})
    
    return ddDf
    
def PlotCumRetDD(cumRetDf, ddDf, title, colors, plotSize = (12, 8)):
    
    ''' Plot the time series of cumulative returns and drawdown '''
    
    def ApplyPlotStyle(x):
        ticks = mticker.FuncFormatter(lambda y, _: '{:.0%}'.format(y))
        x.yaxis.set_major_formatter(ticks)
        x.yaxis.grid(True)
        x.spines['right'].set_visible(False)
        x.spines['top'].set_visible(False)
        x.xaxis.set_ticks_position('none')
        x.yaxis.set_ticks_position('none')

    fig, (ax1, ax2) = plt.subplots(2, 1, figsize = plotSize)
    
    ax1.set_prop_cycle('color', colors)
    ax1.plot(cumRetDf)
    ax1.set_title(title, fontsize = 15)
    ax1.axes.get_xaxis().set_visible(False)
    ax1.spines['bottom'].set_visible(False)
    ApplyPlotStyle(ax1)
    
    ax2.set_prop_cycle('color', colors)
    ax2.plot(ddDf)
    ApplyPlotStyle(ax2)

    ax1.legend(list(cumRetDf.columns), loc = 'left')
    fig.tight_layout()
    
def PlotRiskRewardProfile(returnsDf, weightsDict, cumReturnsDf, drawdownDf, title, colors, plotSize = (12, 8)):
    
    '''
    Plot multiple scatter plots with different risk/reward profiles:
        1) Annualized Return/Annualized Volatility
        2) Sharpe Ratio/Maximum Drawdown
        3) Annualized Return/Maximum Drawdown
        4) Final summary table with results
    '''
    
    def ApplyPlot1Style(x, title):
        x.set_title(title, fontsize = 15)
        ticks = mticker.FuncFormatter(lambda y, _: '{:.0%}'.format(y))
        x.xaxis.set_major_formatter(ticks)
        x.yaxis.set_major_formatter(ticks)
        x.spines['right'].set_visible(False)
        x.spines['top'].set_visible(False)
        x.set_ylim(ymin = 0)
        x.set_xlim(xmin = 0)
        x.xaxis.label.set_size(12)
        x.yaxis.label.set_size(12)
        x.xaxis.set_ticks_position('none')
        x.yaxis.set_ticks_position('none')

    def ApplyPlot2Style(x, title):
        x.set_title(title, fontsize = 15)
        ticks = mticker.FuncFormatter(lambda y, _: '{:.0%}'.format(y))
        x.xaxis.set_major_formatter(ticks)
        x.spines['right'].set_visible(False)
        x.spines['top'].set_visible(False)
        x.set_ylim(ymin = 0)
        x.set_xlim(xmin = 0)
        x.xaxis.label.set_size(12)
        x.yaxis.label.set_size(12)
        x.xaxis.set_ticks_position('none')
        x.yaxis.set_ticks_position('none')

    def ApplyFormat(a, b, c, d, e):
        a = '{:.0%}'.format(a)
        b = '{:.0%}'.format(b)
        c = round(c, 1)
        d = '{:.0%}'.format(d)
        e = '{:.0%}'.format(e)

        return (a, b, c, d, e)

    fig, (ax1, ax2, ax3) = plt.subplots(3, 1, figsize = plotSize)
    fig.suptitle(title, fontsize = 15)

    dataList = []
    count = 0
    for objFunction in weightsDict:
        annualizedPortfolioVolatility, annualizedPortfolioReturn = CalculateAnnualizedVolatilityReturn(returnsDf = returnsDf,
                                                                                                        weights = weightsDict[objFunction],
                                                                                                        factor = 12)
        ax1.scatter(annualizedPortfolioVolatility, annualizedPortfolioReturn, color = colors[count], s = 100)
        ax1.text(annualizedPortfolioVolatility, annualizedPortfolioReturn, objFunction, verticalalignment = 'top',
                 horizontalalignment = 'left', fontsize = 13)

        cumRet = cumReturnsDf[objFunction][-1]
        maxDD = max(abs(drawdownDf[objFunction]))
        ax2.scatter(maxDD, annualizedPortfolioReturn, color = colors[count], s = 100)
        ax2.text(maxDD, annualizedPortfolioReturn, objFunction, verticalalignment = 'top',
                 horizontalalignment = 'left', fontsize = 13)
                 
        annualizedPortfolioSharpeRatio = annualizedPortfolioReturn / annualizedPortfolioVolatility
        ax3.scatter(maxDD, annualizedPortfolioSharpeRatio, color = colors[count], s = 100)
        ax3.text(maxDD, annualizedPortfolioSharpeRatio, objFunction, verticalalignment = 'top',
                 horizontalalignment = 'left', fontsize = 13)

        dataTuple = ApplyFormat(annualizedPortfolioReturn, annualizedPortfolioVolatility, annualizedPortfolioSharpeRatio, maxDD, cumRet)
        dataList.append(dataTuple)
        
        count += 1

    for ticker in returnsDf:
        data = returnsDf[ticker]
        annualizedVolatility, annualizedReturn = CalculateAnnualizedVolatilityReturn(returnsDf = data, weightsType = 'array', factor = 12)
        ax1.scatter(annualizedVolatility, annualizedReturn, color = 'black', s = 10)
        ax1.text(annualizedVolatility, annualizedReturn, ticker, verticalalignment = 'top',
                 horizontalalignment = 'left', fontsize = 8)
    
        cumRet = cumReturnsDf[[ticker]].iloc[-1][0]
        maxDD = max(abs(drawdownDf[[ticker]].values))[0]    
        ax2.scatter(maxDD, annualizedReturn, color = 'black', s = 10)
        ax2.text(maxDD, annualizedReturn, ticker, verticalalignment = 'top',
                 horizontalalignment = 'left', fontsize = 8)

        annualizedSharpeRatio = annualizedReturn / annualizedVolatility
        ax3.scatter(maxDD, annualizedSharpeRatio, color = 'black', s = 10)
        ax3.text(maxDD, annualizedSharpeRatio, ticker, verticalalignment = 'top',
                 horizontalalignment = 'left', fontsize = 8)

        dataTuple = ApplyFormat(annualizedReturn, annualizedVolatility, annualizedSharpeRatio, maxDD, cumRet)
        dataList.append(dataTuple)

    ApplyPlot1Style(ax1, title = 'Return-Volatility')
    ApplyPlot1Style(ax2, title = 'Return-Max Drawdown')
    ApplyPlot2Style(ax3, title = 'Sharpe Ratio-Max Drawdown')
    ax1.set(xlabel = 'Annualized Volatility (Standard Deviation)', ylabel = 'Annualized Return')
    ax2.set(xlabel = 'Maximum Drawdown', ylabel = 'Annualized Return')
    ax3.set(xlabel = 'Maximum Drawdown', ylabel = 'Annualized Sharpe Ratio')

    table = ax3.table(cellText = dataList,
                rowLabels = list(cumReturnsDf.columns),
                colLabels = ['Ann. Return', 'Ann. Volatility', 'Sharpe Ratio', 'Max DD', 'Cum. Return'],
                loc = 4, cellLoc = 'center', bbox = [0.1, -1, 0.9, 0.8])

    table.auto_set_font_size(False)
    table.set_fontsize(10)
    
    fig.subplots_adjust(top = 0.8, bottom = -1.2)
    
def PlotReturnVolatilityMultipleLookback(startMonth, returnsDf, weightsDict, title, colors, plotSize = (12, 8)):
    
    ''' Plot scatter plot with Return/Volatility for multiple combinations of objective functions and lookback periods '''
    
    def ApplyPlotStyle(x, title):
        x.set_title(title, fontsize = 15)
        x.set(xlabel = 'Annualized Volatility (Standard Deviation)', ylabel = 'Annualized Return')
        ticks = mticker.FuncFormatter(lambda y, _: '{:.0%}'.format(y))
        x.xaxis.set_major_formatter(ticks)
        x.yaxis.set_major_formatter(ticks)
        x.xaxis.label.set_size(13)
        x.yaxis.label.set_size(13)
        x.spines['right'].set_visible(False)
        x.spines['top'].set_visible(False)
        x.set_ylim(ymin = 0)
        x.set_xlim(xmin = 0)
    
    plt.figure(figsize = plotSize)
    
    keysList = []
    for month, objFunctions in weightsDict.items():
        adjustedMonthlyReturns = returnsDf[returnsDf.index.isin(startMonth[int(month):])]
        
        count = 0
        for objFunction, weights in objFunctions.items():
            if objFunction == 'equalWeighting' and 'equalWeighting' in keysList:
                count += 1
                continue

            annualizedPortfolioVolatility, annualizedPortfolioReturn = CalculateAnnualizedVolatilityReturn(returnsDf = adjustedMonthlyReturns,
                                                                                                            weights = weights, factor = 12)
            plt.scatter(annualizedPortfolioVolatility, annualizedPortfolioReturn, color = colors[count], s = 100)
            plt.text(annualizedPortfolioVolatility, annualizedPortfolioReturn, month, verticalalignment = 'top',
                     horizontalalignment = 'left', fontsize = 13)
            keysList.append(objFunction)
            count += 1
    
    ApplyPlotStyle(plt.gca(), title = title)
    uniqueKeys = sorted(set(keysList), key = keysList.index)
    plt.legend(uniqueKeys, loc = 'left')
    plt.show()

def GetCoordinatesEfficientFrontier(targetReturnsArray, returnsDf):
        
    ''' Return the pairs of volatility/return coordinates for the efficient frontier '''
    
    # initialize PortfolioOptimizer class
    portOpt = PortfolioOptimizer(minWeight = 0, maxWeight = 1)
    
    volatilities = []
    returns = []
    for targetReturn in targetReturnsArray:
        weights = portOpt.Optimize('meanVariance', returnsDf, targetReturn)
        annualizedPortfolioVolatility, annualizedPortfolioReturn = CalculateAnnualizedVolatilityReturn(returnsDf,
                                                                                                       weights,
                                                                                                       weightsType = 'array',
                                                                                                       factor = 252)
        volatilities.append(annualizedPortfolioVolatility)
        returns.append(annualizedPortfolioReturn)
        
    efficientFrontierList = [volatilities, returns]
    
    return efficientFrontierList
    
def PlotMeanVarianceSpace(returnsDf, objFunctionList, simulations = 1000, plotSize = (20, 5)):
    
    ''' Plot the mean-variance space (and efficient frontier)
    with simulations of portfolios, individual assets and optimal portfolios '''
    
    # initialize PortfolioOptimizer class
    portOpt = PortfolioOptimizer(minWeight = 0, maxWeight = 1)
    
    # dictionary to store pairs of volatility/return for objective functions and tickers
    volatilityReturnDict = {}
    
    # loop through the objective functions to generate optimal weights and calculate the pairs of volatility/return for each
    for objfunction in objFunctionList:
        weights = portOpt.Optimize(objfunction, returnsDf)
        annualizedPortfolioVolatility, annualizedPortfolioReturn = CalculateAnnualizedVolatilityReturn(returnsDf,
                                                                                                       weights,
                                                                                                       weightsType = 'array',
                                                                                                       factor = 252)
        volatilityReturnDict[objfunction] = [annualizedPortfolioVolatility, annualizedPortfolioReturn]
    
    # loop through the tickers and calculate the pairs of volatility/return for each
    for ticker in returnsDf:
        data = returnsDf[ticker]
        annualizedVolatility, annualizedReturn = CalculateAnnualizedVolatilityReturn(returnsDf = data, weightsType = 'array',
                                                                                     factor = 252)
        volatilityReturnDict[ticker] = [annualizedVolatility, annualizedReturn]

    # calculate the efficient frontier
    # make an array of target returns between the minimum return and maximum return to run mean-variance optimization
    returnsList = [value[1] for key, value in volatilityReturnDict.items()]
    
    targetReturnsArray = np.linspace(volatilityReturnDict['minVariance'][1], max(returnsList), 100)
    efficientFrontierList = GetCoordinatesEfficientFrontier(targetReturnsArray, returnsDf)
    
    targetReturnsArray = np.linspace(min(returnsList), volatilityReturnDict['minVariance'][1], 100)
    beforeEfficientFrontierList = GetCoordinatesEfficientFrontier(targetReturnsArray, returnsDf)
    
    # run simulations of portfolios and extract their pairs of volatility/return
    n = returnsDf.columns.size
    volatilities = []
    returns = []
    
    for i in range(simulations):
        weights = np.random.dirichlet(np.ones(n), size = 1)
        weights = weights[0]
        annualizedPortfolioVolatility, annualizedPortfolioReturn = CalculateAnnualizedVolatilityReturn(returnsDf = returnsDf, weights = weights,
                                                                                                        weightsType = 'array', factor = 252)
        volatilities.append(annualizedPortfolioVolatility)
        returns.append(annualizedPortfolioReturn)

    simulationsVolatilitiesReturns = [np.array(volatilities), np.array(returns)]
    
    # plotting
    def ApplyPlotStyle(x, title):
        x.set_title(title, fontsize = 15)
        x.set(xlabel = 'Annualized Volatility (Standard Deviation)', ylabel = 'Annualized Return')
        ticks = mticker.FuncFormatter(lambda y, _: '{:.0%}'.format(y))
        x.xaxis.set_major_formatter(ticks)
        x.yaxis.set_major_formatter(ticks)
        x.xaxis.label.set_size(13)
        x.yaxis.label.set_size(13)
        x.spines['right'].set_visible(False)
        x.spines['top'].set_visible(False)
        x.xaxis.set_ticks_position('none')
        x.yaxis.set_ticks_position('none')
        
    # initialize figure size
    plt.figure(figsize = plotSize)
    
    # plot simulated portfolios
    plt.scatter(simulationsVolatilitiesReturns[0], simulationsVolatilitiesReturns[1],
                c = simulationsVolatilitiesReturns[1] / simulationsVolatilitiesReturns[0],
                marker = 'o', alpha = 0.1)
    
    # plot efficient frontier
    plt.plot(efficientFrontierList[0], efficientFrontierList[1], '-', c = 'red')
    plt.plot(beforeEfficientFrontierList[0], beforeEfficientFrontierList[1], '--', c = 'black')
    
    # plot optimal portfolios
    for objFunction, volatilityReturn in volatilityReturnDict.items():
        if objFunction == 'maxReturn':
            offsetText = -0.005
        else:
            offsetText = 0
        plt.plot(volatilityReturn[0], volatilityReturn[1], '-o', c = 'black')
        plt.text(volatilityReturn[0], volatilityReturn[1] + offsetText, objFunction, verticalalignment = 'top',
                horizontalalignment = 'left', fontsize = 12, c = 'black')
    
    ApplyPlotStyle(plt.gca(), title = 'Mean-Variance')
    plt.colorbar(label = 'Sharpe Ratio', pad = 0.1)
    plt.show()
from clr import AddReference
AddReference("System")
AddReference("QuantConnect.Common")
AddReference("QuantConnect.Algorithm")
AddReference("QuantConnect.Algorithm.Framework")

from System import *
from QuantConnect import *
from QuantConnect.Algorithm import *
from QuantConnect.Algorithm.Framework import *
from QuantConnect.Algorithm.Framework.Alphas import AlphaModel, Insight, InsightType, InsightDirection

import pandas as pd
from PortfolioOptimizerClass import PortfolioOptimizer

class OptimalWeightsAlphaCreationModel(AlphaModel):

    def __init__(self, rebalancingPeriod = Expiry.EndOfMonth, objectiveFunction = 'riskParity', lookbackOptimization = 63):
        
        self.rebalancingPeriod  = rebalancingPeriod
        self.objectiveFunction  = objectiveFunction
        self.lookbackOptimization = lookbackOptimization
        
        self.securities = [] # list to store securities to consider
        self.portfolioValueHigh = 0 # initialize portfolioValueHigh for drawdown calculation
        self.portfolioValueHighInitialized = False # initialize portfolioValueHighInitialized for drawdown calculation
        
        # initialize PortfolioOptimizer class
        self.portOpt = PortfolioOptimizer(minWeight = 0, maxWeight = 1)

        self.nextRebalancingTime = None
        
    def Update(self, algorithm, data):
        
        # initialize nextRebalancingTime
        if self.nextRebalancingTime is None:
            self.nextRebalancingTime = self.rebalancingPeriod(algorithm.Time)
        
        insights = [] # list to store the new insights to be created
        
        # make sure we only send insights at rebalancing
        if algorithm.Time >= self.nextRebalancingTime:
            
            ### plotting ----------------------------------------------------------------------------------------
            currentTotalPortfolioValue = algorithm.Portfolio.TotalPortfolioValue # get current portfolio value
            
            # plot the daily total portfolio exposure %
            totalPortfolioExposure = (algorithm.Portfolio.TotalHoldingsValue / currentTotalPortfolioValue) * 100
            algorithm.Plot('Chart Total Portfolio Exposure %', 'Daily Portfolio Exposure %', totalPortfolioExposure)
            
            # plot the drawdown % from the most recent high
            if not self.portfolioValueHighInitialized:
                self.portfolioHigh = currentTotalPortfolioValue # set initial portfolio value
                self.portfolioValueHighInitialized = True
                
            # update trailing high value of the portfolio
            if self.portfolioValueHigh < currentTotalPortfolioValue:
                self.portfolioValueHigh = currentTotalPortfolioValue
    
            currentDrawdownPercent = ((float(currentTotalPortfolioValue) / float(self.portfolioValueHigh)) - 1.0) * 100
            algorithm.Plot('Chart Drawdown %', 'Drawdown %', currentDrawdownPercent)
        
            ### generate insights ------------------------------------------------------------------------------
            
            # calculate optimal weights
            symbols = [security.Symbol for security in self.securities]
            weights = self.CalculateOptimalWeights(algorithm, symbols, self.objectiveFunction, self.lookbackOptimization)
            
            # insight expiry time and direction
            insightExpiry = Expiry.EndOfDay(algorithm.Time)
            insightDirection = InsightDirection.Up # insight direction
            
            # loop through securities and generate insights
            for security in self.securities:
                weight = weights[str(security.Symbol.Value)]
                
                # append the insights list with the prediction and weights for each symbol
                insights.append(Insight.Price(security.Symbol, insightExpiry, insightDirection,
                                None, None, None, weight))
                                
                algorithm.Plot('Chart Optimal Weights %', security.Symbol.Value, float(weight))
                
            self.nextRebalancingTime = self.rebalancingPeriod(algorithm.Time)
            
        return insights
        
    def CalculateOptimalWeights(self, algorithm, symbols, objectiveFunction, lookbackOptimization):
            
        '''
        Description:
            Calculate the optimal weights for each symbol provided
        Args:
            algorithm: The algorithm instance that experienced the change in securities
            symbols: The symbols in current universe
            objectiveFunction: The objective function for optimization
            lookbackOptimization: The number of days to look back to calculate optimal weights
        '''
        
        # get historical close prices
        historyClosePrices = algorithm.History(symbols, lookbackOptimization, Resolution.Daily)['close'].unstack(level = 0)
        
        # calculate daily returns
        returnsDf = historyClosePrices.pct_change().dropna()
        # rename the columns in the dataframe in order to have tickers and not symbol strings
        columnsList = list(returnsDf.columns)
        returnsDf.rename(columns = {columnsList[i]: algorithm.ActiveSecurities[columnsList[i]].Symbol.Value for i in range(len(columnsList))}, inplace = True)
        
        # calculate optimal weights
        weights = self.portOpt.Optimize(objectiveFunction, returnsDf)
        # convert the weights to a pandas Series
        weights = pd.Series(weights, index = returnsDf.columns, name = 'weights')
        
        algorithm.Log(f"{algorithm.Time}|| Current Weights {weights}")
        
        return weights
        
    def OnSecuritiesChanged(self, algorithm, changes):
        
        '''
        Description:
            Event fired each time the we add/remove securities from the data feed
        Args:
            algorithm: The algorithm instance that experienced the change in securities
            changes: The security additions and removals from the algorithm
        '''
        
        # add new securities
        for added in changes.AddedSecurities:
            algorithm.Log(f"{algorithm.Time}|| Added symbol: {str(added.Symbol)}")
            self.securities.append(added)

        # remove securities
        for removed in changes.RemovedSecurities:
            if removed in self.securities:
                algorithm.Log(f"{algorithm.Time}|| Removed symbol: {str(added.Symbol)}")
                self.securities.remove(removed)
import pandas as pd
import numpy as np
from scipy.optimize import minimize

class PortfolioOptimizer:
    
    '''
    Description:
        Implementation of a custom optimizer that calculates the weights for each asset to optimize a given objective function
    Details:
        Optimization can be:
            - Equal Weighting
            - Maximize Portfolio Return
            - Minimize Portfolio Standard Deviation
            - Mean-Variance (minimize Standard Deviation given a target return)
            - Maximize Portfolio Sharpe Ratio
            - Maximize Portfolio Sortino Ratio
            - Risk Parity Portfolio
        Constraints:
            - Weights must be between some given boundaries
            - Weights must sum to 1
    '''
    
    def __init__(self, 
                 minWeight = 0,
                 maxWeight = 1):
                     
        '''
        Description:
            Initialize the CustomPortfolioOptimizer
        Args:
            minWeight(float): The lower bound on portfolio weights
            maxWeight(float): The upper bound on portfolio weights
        '''
        
        self.minWeight = minWeight
        self.maxWeight = maxWeight

    def Optimize(self, objFunction, dailyReturnsDf, targetReturn = None):
        
        '''
        Description:
            Perform portfolio optimization given a series of returns
        Args:
            objFunction: The objective function to optimize (equalWeighting, maxReturn, minVariance, meanVariance, maxSharpe, maxSortino, riskParity)
            dailyReturnsDf: DataFrame of historical daily arithmetic returns
        Returns:
            Array of double with the portfolio weights (size: K x 1)
        '''
        

        # initial weights: equally weighted
        size = dailyReturnsDf.columns.size # K x 1
        # if size <= 0: return []
        self.initWeights = np.array(size * [1. / size])

        # get sample covariance matrix
        covariance = dailyReturnsDf.cov()
        # get the sample covariance matrix of only negative returns for sortino ratio
        negativeReturnsDf = dailyReturnsDf[dailyReturnsDf < 0]
        covarianceNegativeReturns = negativeReturnsDf.cov()
        
        # get the sample covariance matrix of only positive returns for volatility skewness
        positiveReturnsDf = dailyReturnsDf[dailyReturnsDf > 0]
        covariancePositiveReturns = positiveReturnsDf.cov()
        
        if objFunction == 'equalWeighting':
            return self.initWeights
        
        bounds = tuple((self.minWeight, self.maxWeight) for x in range(size))
        constraints = [{'type': 'eq', 'fun': lambda x: np.sum(x) - 1.0}]
        
        if objFunction == 'meanVariance':

            # if no target return is provided, use the resulting from equal weighting
            if targetReturn is None:
                targetReturn = self.CalculateAnnualizedPortfolioReturn(dailyReturnsDf, self.initWeights)

            constraints.append( {'type': 'eq', 'fun': lambda weights:
                                self.CalculateAnnualizedPortfolioReturn(dailyReturnsDf, weights) - targetReturn} )

        opt = minimize(lambda weights: self.ObjectiveFunction(objFunction, dailyReturnsDf,
                                                                covariance, covarianceNegativeReturns,
                                                                covariancePositiveReturns, weights),
                        x0 = self.initWeights,
                        bounds = bounds,
                        constraints = constraints,
                        method = 'SLSQP')

        return opt['x']

    def ObjectiveFunction(self, objFunction, dailyReturnsDf, covariance, covarianceNegativeReturns, covariancePositiveReturns, weights):
        
        '''
        Description:
            Compute the objective function
        Args:
            objFunction: The objective function to optimize (equalWeighting, maxReturn, minVariance, meanVariance,
                                                                maxSharpe, maxSortino, riskParity)
            dailyReturnsDf: DataFrame of historical daily returns
            covariance: Sample covariance
            covarianceNegativeReturns: Sample covariance matrix of only negative returns
            weights: Portfolio weights
        '''
    
        if objFunction == 'maxReturn':
            f = self.CalculateAnnualizedPortfolioReturn(dailyReturnsDf, weights)
            return -f # convert to negative to be minimized
        elif objFunction == 'minVariance':
            f = self.CalculateAnnualizedPortfolioStd(covariance, weights)
            return f
        elif objFunction == 'meanVariance':
            f = self.CalculateAnnualizedPortfolioStd(covariance, weights)
            return f
        elif objFunction == 'maxSharpe':
            f = self.CalculateAnnualizedPortfolioSharpeRatio(dailyReturnsDf, covariance, weights)
            return -f # convert to negative to be minimized
        elif objFunction == 'maxSortino':
            f = self.CalculateAnnualizedPortfolioSortinoRatio(dailyReturnsDf, covarianceNegativeReturns, weights)
            return -f # convert to negative to be minimized
        elif objFunction == 'volSkewness':
            f = self.CalculateVolatilitySkewness(dailyReturnsDf, covarianceNegativeReturns, covariancePositiveReturns, weights)
            return -f # convert to negative to be minimized
            
            
        elif objFunction == 'riskParity':
            f = self.CalculateRiskParityFunction(covariance, weights)
            return f
        else:
            raise ValueError(f'PortfolioOptimizer.ObjectiveFunction: objFunction input has to be one of equalWeighting,'
             + ' maxReturn, minVariance, meanVariance, maxSharpe, maxSortino or riskParity')
        
    def CalculateAnnualizedPortfolioReturn(self, dailyReturnsDf, weights):
        
        annualizedPortfolioReturns = np.sum( ((1 + dailyReturnsDf.mean())**252 - 1) * weights )
        
        return annualizedPortfolioReturns
            
    def CalculateAnnualizedPortfolioStd(self, covariance, weights):
        
        annualizedPortfolioStd = np.sqrt( np.dot(weights.T, np.dot(covariance * 252, weights)) )
        
        if annualizedPortfolioStd == 0:
            raise ValueError(f'PortfolioOptimizer.CalculateAnnualizedPortfolioStd: annualizedPortfolioStd cannot be zero. Weights: {weights}')
            
        return annualizedPortfolioStd
    
    def CalculateAnnualizedPortfolioNegativeStd(self, covarianceNegativeReturns, weights):
    
        annualizedPortfolioNegativeStd = np.sqrt( np.dot(weights.T, np.dot(covarianceNegativeReturns * 252, weights)) )        
    
        if annualizedPortfolioNegativeStd == 0:
            raise ValueError(f'PortfolioOptimizer.CalculateAnnualizedPortfolioNegativeStd: annualizedPortfolioNegativeStd cannot be zero. Weights: {weights}')
        
        return annualizedPortfolioNegativeStd
        
    def CalculateAnnualizedPortfolioSharpeRatio(self, dailyReturnsDf, covariance, weights):
        
        annualizedPortfolioReturn = self.CalculateAnnualizedPortfolioReturn(dailyReturnsDf, weights)
        annualizedPortfolioStd = self.CalculateAnnualizedPortfolioStd(covariance, weights)
        annualizedPortfolioSharpeRatio = annualizedPortfolioReturn / annualizedPortfolioStd
            
        return annualizedPortfolioSharpeRatio
    
    def CalculateAnnualizedPortfolioSortinoRatio(self, dailyReturnsDf, covarianceNegativeReturns, weights):
        
        annualizedPortfolioReturn = self.CalculateAnnualizedPortfolioReturn(dailyReturnsDf, weights)
        annualizedPortfolioNegativeStd = self.CalculateAnnualizedPortfolioNegativeStd(covarianceNegativeReturns, weights)
        annualizedPortfolioSortinoRatio = annualizedPortfolioReturn / annualizedPortfolioNegativeStd
            
        return annualizedPortfolioSortinoRatio


    def CalculateVolatilitySkewness(self, dailyReturnsDf, covarianceNegativeReturns, covariancePositiveReturns, weights):
        
        return covariancePositiveReturns / covarianceNegativeReturns

    def CalculateRiskParityFunction(self, covariance, weights):
        
        ''' Spinu formulation for risk parity portfolio '''
        
        assetsRiskBudget = self.initWeights
        portfolioVolatility = self.CalculateAnnualizedPortfolioStd(covariance, weights)
        
        x = weights / portfolioVolatility
        riskParity = (np.dot(x.T, np.dot(covariance, x)) / 2) - np.dot(assetsRiskBudget.T, np.log(x))
            
        return riskParity
# https://www.quantconnect.com/forum/discussion/8128/portfolio-optimization-research-amp-algorithm-for-better-workflows/p1

from OptimalWeightsAlphaCreation import OptimalWeightsAlphaCreationModel

class PortfolioOptimizationSystem(QCAlgorithmFramework):

    def Initialize(self):
        
        ### user-defined inputs ---------------------------------------------------------------------------------------------------
        
        self.SetStartDate(2021, 10, 1)  # Set Start Date
        #self.SetEndDate(2020, 5, 1)     # Set End Date
        self.SetCash(100000)            # Set Strategy Cash
        
        # UNIVERSE -------------------------------------------------------------
        
        # select tickers
        # tickers = ['IEF', 'TLT', 'GLD', 'SPY', 'QQQ']
        tickers = ['BTCUSD', 'ETHUSD', 'SOLUSD']
        
        # ALPHA ----------------------------------------------------------------
        
        # select the objective function to optimize the portfolio weights
        # options are: equalWeighting, maxReturn, minVariance, meanVariance, maxSharpe, maxSortino, riskParity, volSkewness
        objectiveFunction = 'riskParity'
        
        # select number of lookback days for optimization
        lookbackOptimization = 63
        
        # select the logic for rebalancing period
        # date rules (for the first trading day of period): Expiry.EndOfDay, Expiry.EndOfWeek, Expiry.EndOfMonth,
        #                                                   Expiry.EndOfQuarter, Expiry.EndOfYear
        rebalancingPeriod = Expiry.EndOfMonth
        
        ### plotting ----------------------------------------------------------------------------------------------------------------
        
        # let's plot the series of daily total portfolio exposure %
        portfolioExposurePlot = Chart('Chart Total Portfolio Exposure %')
        portfolioExposurePlot.AddSeries(Series('Daily Portfolio Exposure %', SeriesType.Line, ''))
        self.AddChart(portfolioExposurePlot)
        
        # let's plot the series of drawdown % from the most recent high
        drawdownPlot = Chart('Chart Drawdown %')
        drawdownPlot.AddSeries(Series('Drawdown %', SeriesType.Line, '%'))
        self.AddChart(drawdownPlot)
        
        
        self.benchmarkSymbol = "BTCUSD" 
        self.SetBrokerageModel(BrokerageName.Bitfinex, AccountType.Margin)
        self.AddCrypto(self.benchmarkSymbol, Resolution.Daily)
        self.SetBenchmark(Symbol.Create("BTCUSD", SecurityType.Crypto, Market.Bitfinex))   

        
        # let's plot the series of optimal weights
        optWeightsPlot = Chart('Chart Optimal Weights %')
        for ticker in tickers:
            optWeightsPlot.AddSeries(Series(ticker, SeriesType.Line, '%'))
        
        ### select modules ------------------------------------------------------------------------------------------------------------
        
        # set the brokerage model for slippage and fees
        # self.SetBrokerageModel(AlphaStreamsBrokerageModel())
        
        # set requested data resolution
        self.UniverseSettings.Resolution = Resolution.Daily
        
        # Universe -------------------------------------------------------------
        symbols = []
        # loop through the tickers list and create symbols for the Universe
        for ticker in tickers:
            # symbols.append(Symbol.Create(ticker, SecurityType.Equity, Market.USA))
            symbols.append(Symbol.Create(ticker, SecurityType.Crypto, Market.Bitfinex))            
        self.SetUniverseSelection(ManualUniverseSelectionModel(symbols))
        
        # Alpha ---------------------------------------------------------------
        self.SetAlpha(OptimalWeightsAlphaCreationModel(rebalancingPeriod = rebalancingPeriod,
                                                        objectiveFunction = objectiveFunction,
                                                        lookbackOptimization = lookbackOptimization))
        
        # Portfolio ------------------------------------------------------------
        pcm = InsightWeightingPortfolioConstructionModel(lambda time: rebalancingPeriod(time))
        pcm.RebalanceOnInsightChanges = False # disable rebalancing on insights changes (new/expired insights)
        pcm.RebalanceOnSecurityChanges = False # enable rebalancing on universe changes
        self.SetPortfolioConstruction(pcm)
        
        # Execution ------------------------------------------------------------
        self.SetExecution(ImmediateExecutionModel())
        
        # Risk -----------------------------------------------------------------
        self.SetRiskManagement(NullRiskManagementModel())