Overall Statistics |
Total Trades 8491 Average Win 0.09% Average Loss -0.03% Compounding Annual Return 18.908% Drawdown 13.800% Expectancy 2.086 Net Profit 961.640% Sharpe Ratio 1.724 Probabilistic Sharpe Ratio 99.250% Loss Rate 30% Win Rate 70% Profit-Loss Ratio 3.39 Alpha 0.14 Beta 0.174 Annual Standard Deviation 0.092 Annual Variance 0.008 Information Ratio 0.306 Tracking Error 0.175 Treynor Ratio 0.907 Total Fees $9748.85 Estimated Strategy Capacity $5200000.00 Lowest Capacity Asset IEF SGNKIKYGE9NP |
import numpy as np from scipy.optimize import minimize class CustomPortfolioOptimizer: ''' Description: Implementation of a custom optimizer that calculates the weights for each asset to optimize a given objective function Details: Optimization can be: - Maximize Portfolio Sharpe Ratio - Maximize Portfolio Sortino Ratio - Maximize Portfolio Return - Minimize Portfolio Standard Deviation - Risk Parity Portfolio Constraints: - Weights must be between some given boundaries - Weights must sum to 1 ''' def __init__(self, minWeight = -1, maxWeight = 1, objFunction = 'std'): ''' Description: Initialize the CustomPortfolioOptimizer Args: minWeight(float): The lower bound on portfolio weights maxWeight(float): The upper bound on portfolio weights objFunction: The objective function to optimize (sharpe, sortino, return, std, riskParity) ''' self.minWeight = minWeight self.maxWeight = maxWeight self.objFunction = objFunction def Optimize(self, historicalLogReturns): ''' Description: Perform portfolio optimization using a provided matrix of historical returns and covariance (optional) Args: historicalLogReturns: Matrix of historical log-returns where each column represents a security and each row log-returns for the given date/time (size: K x N) Returns: Array of double with the portfolio weights (size: K x 1) ''' # get sample covariance matrix covariance = historicalLogReturns.cov() # get the sample covariance matrix of only negative returns for sortino ratio historicalNegativeLogReturns = historicalLogReturns[historicalLogReturns < 0] covarianceNegativeReturns = historicalNegativeLogReturns.cov() size = historicalLogReturns.columns.size # K x 1 x0 = np.array(size * [1. / size]) # apply equality constraints constraints = ({'type': 'eq', 'fun': lambda weights: self.GetBudgetConstraint(weights)}) opt = minimize(lambda weights: self.ObjectiveFunction(weights, historicalLogReturns, covariance, covarianceNegativeReturns), # Objective function x0, # Initial guess bounds = self.GetBoundaryConditions(size), # Bounds for variables constraints = constraints, # Constraints definition method = 'SLSQP') # Optimization method: Sequential Least Squares Programming return opt['x'] def ObjectiveFunction(self, weights, historicalLogReturns, covariance, covarianceNegativeReturns): ''' Description: Compute the objective function Args: weights: Portfolio weights historicalLogReturns: Matrix of historical log-returns covariance: Covariance matrix of historical log-returns ''' # calculate the annual return of portfolio annualizedPortfolioReturns = np.sum(historicalLogReturns.mean() * 252 * weights) # calculate the annual standard deviation of portfolio annualizedPortfolioStd = np.sqrt( np.dot(weights.T, np.dot(covariance * 252, weights)) ) annualizedPortfolioNegativeStd = np.sqrt( np.dot(weights.T, np.dot(covarianceNegativeReturns * 252, weights)) ) if annualizedPortfolioStd == 0 or annualizedPortfolioNegativeStd == 0: raise ValueError(f'CustomPortfolioOptimizer.ObjectiveFunction: annualizedPortfolioStd/annualizedPortfolioNegativeStd cannot be zero. Weights: {weights}') # calculate annual sharpe ratio of portfolio annualizedPortfolioSharpeRatio = (annualizedPortfolioReturns / annualizedPortfolioStd) # calculate annual sortino ratio of portfolio annualizedPortfolioSortinoRatio = (annualizedPortfolioReturns / annualizedPortfolioNegativeStd) # Spuni's formulation for risk parity portfolio size = historicalLogReturns.columns.size assetsRiskBudget = np.array(size * [1. / size]) portfolioVolatility = np.sqrt( np.dot(weights.T, np.dot(covariance, weights)) ) x = weights / portfolioVolatility riskParity = (np.dot(x.T, np.dot(covariance, x)) / 2) - np.dot(assetsRiskBudget.T, np.log(x)) if self.objFunction == 'sharpe': return -annualizedPortfolioSharpeRatio # convert to negative to be minimized elif self.objFunction == 'sortino': return -annualizedPortfolioSortinoRatio # convert to negative to be minimized elif self.objFunction == 'return': return -annualizedPortfolioReturns # convert to negative to be minimized elif self.objFunction == 'std': return annualizedPortfolioStd elif self.objFunction == 'riskParity': return riskParity else: raise ValueError(f'CustomPortfolioOptimizer.ObjectiveFunction: objFunction input has to be one of sharpe, sortino, return, std or riskParity') def GetBoundaryConditions(self, size): ''' Create the boundary condition for the portfolio weights ''' return tuple((self.minWeight, self.maxWeight) for x in range(size)) def GetBudgetConstraint(self, weights): ''' Define a budget constraint: the sum of the weights equal to 1 ''' return np.sum(weights) - 1
""" Based on 'In & Out' strategy by Peter Guenther 10-04-2020 expanded/inspired by Tentor Testivis, Dan Whitnable (Quantopian), Vladimir, and Thomas Chang. https://www.quantopian.com/posts/new-strategy-in-and-out """ # Import packages import numpy as np import pandas as pd import scipy as sc from QuantConnect.DataSource import * #from PortfolioOptimizer import * from optimizer import CustomPortfolioOptimizer class InOut(QCAlgorithm): def Initialize(self): self.SetStartDate(2008, 1, 1) # Set Start Date self.SetCash(100000) # Set Strategy Cash self.UniverseSettings.Resolution = Resolution.Daily # Feed-in constants self.INI_WAIT_DAYS = 15 # out for 3 trading weeks res = Resolution.Minute self.vix = 'CBOE/VIX' self.vxv = 'CBOE/VXV' self.AddData(QuandlVix, self.vix, Resolution.Daily) self.AddData(Quandl, self.vxv, Resolution.Daily) self.AddData(QuandlTreasuryRates, 'USTREASURY/YIELD', Resolution.Daily) self.SetWarmUp(100, Resolution.Daily) self.vix_sma_long = self.SMA(self.vix, 15, Resolution.Daily) self.vxv_sma_long = self.SMA(self.vxv, 15, Resolution.Daily) self.ratio_long = IndicatorExtensions.Over(self.vxv_sma_long, self.vix_sma_long) self.MRKT = self.AddEquity('QQQ', res).Symbol self.SPY = self.AddEquity('SPY', res).Symbol self.TLT = self.AddEquity('TLT', res).Symbol self.IEF = self.AddEquity('IEF', res).Symbol self.IEI = self.AddEquity('IEI', res).Symbol self.SetWarmup(200) self.spySMA = self.SMA("SPY", 150, Resolution.Daily) # Market and list of signals based on ETFs self.PRDC = self.AddEquity('XLI', res).Symbol # production (industrials) self.METL = self.AddEquity('DBB', res).Symbol # input prices (metals) self.NRES = self.AddEquity('IGE', res).Symbol # input prices (natural res) self.DEBT = self.AddEquity('SHY', res).Symbol # cost of debt (bond yield) self.USDX = self.AddEquity('UUP', res).Symbol # safe haven (USD) self.GOLD = self.AddEquity('GLD', res).Symbol # gold self.SLVA = self.AddEquity('SLV', res).Symbol # VS silver self.UTIL = self.AddEquity('XLU', res).Symbol # utilities self.SHCU = self.AddEquity('FXF', res).Symbol # safe haven (CHF) self.RICU = self.AddEquity('FXA', res).Symbol # risk currency (AUD) self.INDU = self.PRDC # vs industrials self.FORPAIRS = [self.GOLD, self.SLVA, self.UTIL, self.SHCU, self.RICU] self.SIGNALS = [self.PRDC, self.METL, self.NRES, self.DEBT, self.USDX] # 'In' and 'out' holdings incl. weights self.HLD_IN = {self.MRKT: 1.0} self.HLD_OUT = {self.TLT: 0, self.IEF: 1} self.dictParameters = { 'SPY': {'addTicker': [True, 'QQQ'], # [boolean to add/not add the ticker, ticker to actually trade] 'sma': [200, (-0.10, 0.10), 0], # [period, (lower % threshold, upper % threshold; price vs sma), weight if condition met] 'macd': [(231, 567, 168), 0, 0.3], # [(fast, slow, signal), score macd vs signal (-1 to 1), weight if condition met] 'yield': [True, 0], # [boolean to activate the yield curve filtering, weight if condition met] 'atrTrailStop': [True, (10, 63, 1), 6, 0.1]}, # [activate, (recentAtrPeriod, pastAtrPeriod, % above), atrMultiple, emergencyAtrMultiple] 'TLT': {'addTicker': [True, 'TLT'], # [boolean to add/not add the ticker, ticker to actually trade] 'sma': [600, (-0.2, 0.2), 0], # [period, (lower % threshold, upper % threshold; price vs sma), weight if condition met] 'macd': [(63, 168, 42), 0, 0], # [(fast, slow, signal), score macd vs signal (-1 to 1), weight if condition met] 'yield': [False, 0], # [boolean to activate the yield curve filtering, weight if condition met] 'atrTrailStop': [True, (10, 63, 0.35), 6, 0.1]}, # [activate, (recentAtrPeriod, pastAtrPeriod, % above), atrMultiple, emergencyAtrMultiple] } self.StocksWeightInOut = 0 self.BondsWeightInOut = 0 self.StocksWeightVolatility = 0 self.BondsWeightVolatility = 0 self.StocksWeightMA = 0 self.BondsWeightMA = 0 self.StocksWeightPO = 0 self.BondsWeightPO = 0 self.StocksWeight = 0 self.BondsWeight = 0 self.CashWeight = 0 # Initialize variables ## 'In'/'out' indicator self.be_in = 1 ## Day count variables self.dcount = 0 # count of total days since start self.outday = 0 # dcount when self.be_in=0 ## Flexi wait days self.WDadjvar = self.INI_WAIT_DAYS self.InAndOutFactor = 0.35 self.VolatilityFactor = 0.15 self.MovingAverageFactor = 0.15 self.POFactor = 0.35 self.lookbackOptimization = 63 self.activateWeightFiltering = True # activate/deactivate the weights filtering self.lookbackNegativeYield = 147 # number of days to lookback for negative values self.startCrisisYieldValue = 0.3 # the yield value above which we apply the yield weight condition (e.g. 0.1 0.1% yield) self.Schedule.On(self.DateRules.MonthStart(),self.TimeRules.AfterMarketOpen('SPY', 10), self.MovingAverageTrade) self.Schedule.On(self.DateRules.MonthStart(),self.TimeRules.AfterMarketOpen('SPY', 20), self.PortfolioOptimizerTrade) self.Schedule.On(self.DateRules.EveryDay(),self.TimeRules.AfterMarketOpen('SPY', 30), self.rebalance_when_out_of_the_market) self.Schedule.On(self.DateRules.WeekEnd(),self.TimeRules.AfterMarketOpen('SPY', 60), self.rebalance_when_in_the_market) self.Schedule.On(self.DateRules.EveryDay(),self.TimeRules.AfterMarketOpen('SPY', 90), self.VolatilityTrade) self.Schedule.On(self.DateRules.EveryDay(),self.TimeRules.AfterMarketOpen('SPY', 120), self.placeTrades) WeightsPlot = Chart('Weights') WeightsPlot.AddSeries(Series('Stocks', SeriesType.Line, '%')) WeightsPlot.AddSeries(Series('Bonds', SeriesType.Line, '%')) WeightsPlot.AddSeries(Series('Cash', SeriesType.Line, '%')) self.AddChart(WeightsPlot) def placeTrades(self): self.StocksWeight = self.StocksWeightInOut + self.StocksWeightVolatility + self.StocksWeightMA + self.StocksWeightPO self.BondsWeight = self.BondsWeightInOut + self.BondsWeightVolatility + self.BondsWeightMA + self.BondsWeightPO self.totalLeverage = self.InAndOutFactor + self.VolatilityFactor + self.MovingAverageFactor + self.POFactor self.CashWeight = round((self.totalLeverage - self.StocksWeight - self.BondsWeight)*100)/100 self.Log('InOut Weights -- StocksWeight : ' + str(self.StocksWeightInOut) + ' BondsWeight: ' + str(self.BondsWeightInOut)) self.Log('Volatility Weights -- StocksWeight : ' + str(self.StocksWeightVolatility) + ' BondsWeight: ' + str(self.BondsWeightVolatility)) self.Log('MA Weights -- StocksWeight : ' + str(self.StocksWeightMA) + ' BondsWeight: ' + str(self.BondsWeightMA)) self.Log('Overall Weights -- StocksWeight : ' + str(self.StocksWeight) + ' BondsWeight: ' + str(self.BondsWeight) + ' CashWeight: ' + str(self.CashWeight)) self.SetHoldings("QQQ", self.StocksWeight) self.SetHoldings("TLT", self.BondsWeight) self.SetHoldings("IEF", self.CashWeight) self.Plot("Weights", "Stocks", self.StocksWeight) self.Plot("Weights", "Bonds", self.BondsWeight) self.Plot("Weights", "Cash", self.CashWeight) #self.Plot("PO Weights", "Stocks", self.StocksWeightPO/self.POFactor) #self.Plot("PO Weights", "Bonds", self.BondsWeightPO/self.POFactor) def rebalance_when_out_of_the_market(self): # Returns sample to detect extreme observations hist = self.History( self.SIGNALS + [self.MRKT] + self.FORPAIRS, 252, Resolution.Daily)['close'].unstack(level=0).dropna() # hist_shift = hist.rolling(66).apply(lambda x: x[:11].mean()) hist_shift = hist.apply(lambda x: (x.shift(65) + x.shift(64) + x.shift(63) + x.shift(62) + x.shift( 61) + x.shift(60) + x.shift(59) + x.shift(58) + x.shift(57) + x.shift(56) + x.shift(55)) / 11) #hist_shift = hist.apply(lambda x: (x.shift(63) + x.shift(63) + x.shift(63) + x.shift(63) + x.shift( # 63) + x.shift(63) + x.shift(63) + x.shift(63) + x.shift(63) + x.shift(63) + x.shift(63)) / 11) returns_sample = (hist / hist_shift - 1) # Reverse code USDX: sort largest changes to bottom returns_sample[self.USDX] = returns_sample[self.USDX] * (-1) # For pairs, take returns differential, reverse coded returns_sample['G_S'] = -(returns_sample[self.GOLD] - returns_sample[self.SLVA]) returns_sample['U_I'] = -(returns_sample[self.UTIL] - returns_sample[self.INDU]) returns_sample['C_A'] = -(returns_sample[self.SHCU] - returns_sample[self.RICU]) self.pairlist = ['G_S', 'U_I', 'C_A'] # Extreme observations; statist. significance = 1% pctl_b = np.nanpercentile(returns_sample, 1, axis=0) extreme_b = returns_sample.iloc[-1] < pctl_b # Determine waitdays empirically via safe haven excess returns, 50% decay self.WDadjvar = int( max(0.50 * self.WDadjvar, self.INI_WAIT_DAYS * max(1, #returns_sample[self.GOLD].iloc[-1] / returns_sample[self.SLVA].iloc[-1], #returns_sample[self.UTIL].iloc[-1] / returns_sample[self.INDU].iloc[-1], #returns_sample[self.SHCU].iloc[-1] / returns_sample[self.RICU].iloc[-1] np.where((returns_sample[self.GOLD].iloc[-1]>0) & (returns_sample[self.SLVA].iloc[-1]<0) & (returns_sample[self.SLVA].iloc[-2]>0), self.INI_WAIT_DAYS, 1), np.where((returns_sample[self.UTIL].iloc[-1]>0) & (returns_sample[self.INDU].iloc[-1]<0) & (returns_sample[self.INDU].iloc[-2]>0), self.INI_WAIT_DAYS, 1), np.where((returns_sample[self.SHCU].iloc[-1]>0) & (returns_sample[self.RICU].iloc[-1]<0) & (returns_sample[self.RICU].iloc[-2]>0), self.INI_WAIT_DAYS, 1) )) ) adjwaitdays = min(60, self.WDadjvar) # self.Debug('{}'.format(self.WDadjvar)) # Determine whether 'in' or 'out' of the market if (extreme_b[self.SIGNALS + self.pairlist]).any(): self.be_in = False self.outday = self.dcount if self.dcount >= self.outday + adjwaitdays: self.be_in = True self.dcount += 1 # Swap to 'out' assets if applicable if not self.be_in: # Close 'In' holdings for asset, weight in self.HLD_IN.items(): self.StocksWeightInOut = 0 for asset, weight in self.HLD_OUT.items(): self.BondsWeightInOut = weight*self.InAndOutFactor def rebalance_when_in_the_market(self): # Swap to 'in' assets if applicable if self.be_in: # Close 'Out' holdings for asset, weight in self.HLD_OUT.items(): self.BondsWeightInOut = 0 for asset, weight in self.HLD_IN.items(): self.StocksWeightInOut = weight*self.InAndOutFactor def VolatilityTrade(self): if not (self.vix_sma_long.IsReady or self.vxv_sma_long.IsReady or self.ratio_long.IsReady): return if self.ratio_long.Current.Value >= 1.25: self.BondsWeightVolatility = 0 self.StocksWeightVolatility = 1*self.VolatilityFactor elif self.ratio_long.Current.Value >= 0.923: self.BondsWeightVolatility = 0.2*self.VolatilityFactor self.StocksWeightVolatility = 0.8*self.VolatilityFactor else: self.BondsWeightVolatility = 0.8*self.VolatilityFactor self.StocksWeightVolatility = 0.2*self.VolatilityFactor def MovingAverageTrade(self): if self.spySMA is None or not self.spySMA.IsReady: return if self.Securities["SPY"].Price >= self.spySMA.Current.Value*1.10: self.BondsWeightMA = 0.2*self.MovingAverageFactor self.StocksWeightMA = 0.8*self.MovingAverageFactor elif self.Securities["SPY"].Price >= self.spySMA.Current.Value: self.BondsWeightMA = 0.2*self.MovingAverageFactor self.StocksWeightMA = 0.8*self.MovingAverageFactor else: self.BondsWeightMA = 0.8*self.MovingAverageFactor self.StocksWeightMA = 0.2*self.MovingAverageFactor def PortfolioOptimizerTrade(self): # initialize the optimizer calculationSymbols = [] weights = {} self.optimizer = CustomPortfolioOptimizer(minWeight = 0, maxWeight = 1, objFunction = "std") for ticker in self.dictParameters.keys(): calculationSymbols.append(self.Symbol(ticker)) history = self.History(calculationSymbols, 900, Resolution.Daily) calculations = {} for symbol in calculationSymbols: calculations[symbol] = SymbolData(symbol, dictParameters = self.dictParameters) calculations[symbol].CalculateLogReturnSeries(history, self.lookbackOptimization) calculations[symbol].UpdateIndicators(history) logReturnsDict = { symbol.Value: symbolData.logReturnSeries for symbol, symbolData in calculations.items() } logReturnsDf = pd.DataFrame(logReturnsDict) listTickers = list(logReturnsDf.columns) listOptWeights = self.optimizer.Optimize(historicalLogReturns = logReturnsDf) # create dictionary with the optimal weights by symbol weights = {listTickers[i]: listOptWeights[i] for i in range(len(listTickers))} # avoid very small numbers and make them 0 for ticker, weight in weights.items(): if weight <= 1e-10: weights[ticker] = 0 filteredWeights = self.FilterOptimalWeights(calculations, weights) self.BondsWeightPO = filteredWeights["TLT"]*self.POFactor self.StocksWeightPO = filteredWeights["SPY"]*self.POFactor def FilterOptimalWeights(self, calculations, optWeights): # check the yield condition ----------------------------------------------------------------- # get the last six months of historical USTREASURY/YIELD values histYield = self.History(['USTREASURY/YIELD'], self.lookbackNegativeYield + 1, Resolution.Daily).loc['USTREASURY/YIELD'] tenYr = histYield['10 yr'] # get the 10-year yield threeMo = histYield['3 mo'] # get the 3-month yield tenYrMinusThreeMo = tenYr - threeMo # calculate the difference between the two indexNegative = tenYrMinusThreeMo[tenYrMinusThreeMo < 0].head(1).index # check if there was actually some negative yield values if len(indexNegative) > 0: cutOff = indexNegative[0] # filter the series for days after that day with negative value afterNegative = tenYrMinusThreeMo[tenYrMinusThreeMo.index > cutOff] # check if at some point it reached our startCrisisYieldValue if len(afterNegative) > 0 and max(afterNegative) > self.startCrisisYieldValue: self.yieldSignalCrisis = True else: self.yieldSignalCrisis = False else: self.yieldSignalCrisis = False # ------------------------------------------------------------------------------------------- # empty dicitonary to store weights weights = {} # loop through calculations and check conditions for weight filtering ------------------------ for symbol, symbolData in calculations.items(): if symbolData.SMA.IsReady and symbolData.MACD.IsReady: currentPrice = self.ActiveSecurities[symbol].Price # check if sma condition is met and act accordingly ---------------------------------- smaLowerBoundCondition = self.dictParameters[symbol.Value]['sma'][1][0] smaUpperBoundCondition = self.dictParameters[symbol.Value]['sma'][1][1] smaConditionWeight = self.dictParameters[symbol.Value]['sma'][2] if (currentPrice <= symbolData.SMA.Current.Value * (1 + smaLowerBoundCondition) or currentPrice >= symbolData.SMA.Current.Value * (1 + smaUpperBoundCondition)): weights[symbol.Value] = min(optWeights[symbol.Value], smaConditionWeight) else: weights[symbol.Value] = optWeights[symbol.Value] smaModifiedWeight = weights[symbol.Value] # check if macd condition is met and act accordingly ---------------------------------- macdCondition = self.dictParameters[symbol.Value]['macd'][1] macdConditionWeight = self.dictParameters[symbol.Value]['macd'][2] # calculate our macd vs signal score between -1 and 1 macdMinusSignal = symbolData.MACD.Current.Value - symbolData.MACD.Signal.Current.Value macdVsSignalScore = macdMinusSignal / (1 + abs(macdMinusSignal)) if macdVsSignalScore <= macdCondition: weights[symbol.Value] = min(smaModifiedWeight, macdConditionWeight) else: weights[symbol.Value] = smaModifiedWeight macdModifiedWeight = weights[symbol.Value] # check if yield condition is met and act accordingly ---------------------------------- activateYield = self.dictParameters[symbol.Value]['yield'][0] yieldConditionWeight = self.dictParameters[symbol.Value]['yield'][1] if self.yieldSignalCrisis and activateYield: weights[symbol.Value] = min(macdModifiedWeight, yieldConditionWeight) else: weights[symbol.Value] = macdModifiedWeight else: weights[symbol.Value] = 0 return weights class SymbolData: ''' Contain data specific to a symbol required by this model ''' def __init__(self, symbol, dictParameters): self.Symbol = symbol self.logReturnSeries = None smaPeriod = dictParameters[symbol.Value]['sma'][0] self.SMA = SimpleMovingAverage(smaPeriod) macdFastPeriod = dictParameters[self.Symbol.Value]['macd'][0][0] macdSlowPeriod = dictParameters[self.Symbol.Value]['macd'][0][1] macdSignalPeriod = dictParameters[self.Symbol.Value]['macd'][0][2] self.MACD = MovingAverageConvergenceDivergence(macdFastPeriod, macdSlowPeriod, macdSignalPeriod, MovingAverageType.Exponential) def CalculateLogReturnSeries(self, history, lookbackOptimization): ''' Calculate the log-returns series for each security ''' tempLogReturnSeries = np.log(1 + history.loc[str(self.Symbol)]['close'].pct_change(periods = 2).dropna()) # 1-day log-returns self.logReturnSeries = tempLogReturnSeries[-lookbackOptimization:] def UpdateIndicators(self, history): ''' Update the indicators with historical data ''' for index, row in history.loc[str(self.Symbol)].iterrows(): self.SMA.Update(index, row['close']) self.MACD.Update(index, row['close']) class QuandlVix(PythonQuandl): def __init__(self): self.ValueColumnName = "Close" class QuandlTreasuryRates(PythonQuandl): def __init__(self): self.ValueColumnName = 'value'