Overall Statistics |
Total Trades 826 Average Win 2.18% Average Loss -1.14% Compounding Annual Return 40.793% Drawdown 22.300% Expectancy 0.896 Net Profit 8581.545% Sharpe Ratio 1.702 Probabilistic Sharpe Ratio 96.538% Loss Rate 35% Win Rate 65% Profit-Loss Ratio 1.92 Alpha 0 Beta 0 Annual Standard Deviation 0.208 Annual Variance 0.043 Information Ratio 1.702 Tracking Error 0.208 Treynor Ratio 0 Total Fees $15340.67 |
# QUANTCONNECT.COM - Democratizing Finance, Empowering Individuals. # Lean Algorithmic Trading Engine v2.0. Copyright 2014 QuantConnect Corporation. # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. # You may obtain a copy of the License at http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. import pandas as pd import numpy as np from scipy.optimize import minimize class myTrailingStopRiskManagementModel: ''' Credit goes to: Alex Catarino and many of his friends at QuantConnect https://github.com/QuantConnect/Lean/blob/master/Algorithm.Framework/Risk/TrailingStopRiskManagementModel.py Description: Limits the maximum possible loss measured from the highest unrealized profit ''' def __init__(self, maximumDrawdownPercent = 0.08): '''initializes the class Args: maximumDrawdownPercent: The maximum percentage drawdown allowed for algorithm portfolio compared with the highest unrealized profit, defaults to 5% drawdown ''' self.maximumDrawdownPercent = -abs(maximumDrawdownPercent) self.trailingHighs = dict() def setDD(self, maximumDrawdownPercent = 0.08): '''allows to change the drawdown Args: maximumDrawdownPercent: The maximum percentage drawdown allowed for algorithm portfolio compared with the highest unrealized profit, defaults to 5% drawdown ''' self.maximumDrawdownPercent = -abs(maximumDrawdownPercent) def setWTtoZeroIfDDtooHigh(self, algo, targets=None): '''If drawdown is too high, set wt[symbol] to zero algo.wt[symbol] = weights which will be set to 0 in case drawdown exceeds the maximum ''' for kvp in algo.Securities: symbol = kvp.Key security = kvp.Value # Remove from trailingHighs dict if not invested if not security.Invested: self.trailingHighs.pop(symbol, None) continue # Add newly invested securities to trailingHighs dict if symbol not in self.trailingHighs: self.trailingHighs[symbol] = security.Holdings.AveragePrice continue # Check for new highs and update trailingHighs dict if self.trailingHighs[symbol] < security.High: self.trailingHighs[symbol] = security.High continue # Calc the drawdown securityHigh = self.trailingHighs[symbol] drawdown = (security.Low / securityHigh) - 1 # If drawdown is too high, set symbol weight to zero if drawdown < self.maximumDrawdownPercent: algo.wt[symbol] = 0 return class myPortfolioOptimizer: ''' Credit goes to: Emilio Freire / InnoQuantivity https://innoquantivity.com/blogs/inno-blog/portfolio-optimization-quantconnect-research-algorithm https://www.quantconnect.com/forum/discussion/8128/portfolio-optimization-research-amp-algorithm-for-better-workflows/p1/comment-22952 Description: Implementation of a custom optimizer that calculates the weights for each asset to optimize a given objective function Details: Optimization can be: - Equal Weighting - Maximize Portfolio Return - Minimize Portfolio Standard Deviation - Mean-Variance (minimize Standard Deviation given a target return) - Maximize Portfolio Sharpe Ratio - Maximize Portfolio Sortino Ratio - Risk Parity Portfolio Constraints: - Weights must be between some given boundaries - Weights must sum to 1 ''' def __init__(self, minWeight = 0, maxWeight = 1): ''' Description: Initialize the CustomPortfolioOptimizer Args: minWeight(float): The lower bound on portfolio weights maxWeight(float): The upper bound on portfolio weights ''' self.minWeight = minWeight self.maxWeight = maxWeight def CalcWeights(self, algo, symbols, objectiveFunction='riskParity', lookback=63, targetReturn=None): ''' Description: Calculate weights from daily returns, return a pandas Series ''' history = algo.History(symbols, lookback, Resolution.Daily)['close'].unstack(level = 0) returnsDf = history.pct_change().dropna() returnsDf.columns = [algo.AddEquity(i).Symbol.Value for i in list(returnsDf.columns)] weights = self.Optimize(objectiveFunction, returnsDf, targetReturn) return pd.Series(weights, index=returnsDf.columns, name='weights') def Optimize(self, objFunction, dailyReturnsDf, targetReturn = None): ''' Description: Perform portfolio optimization given a series of returns Args: objFunction: The objective function to optimize (equalWeighting, maxReturn, minVariance, meanVariance, maxSharpe, maxSortino, riskParity) dailyReturnsDf: DataFrame of historical daily arithmetic returns Returns: Array of double with the portfolio weights (size: K x 1) ''' # initial weights: equally weighted size = dailyReturnsDf.columns.size # K x 1 self.initWeights = np.array(size * [1. / size]) # get sample covariance matrix covariance = dailyReturnsDf.cov() # get the sample covariance matrix of only negative returns for sortino ratio negativeReturnsDf = dailyReturnsDf[dailyReturnsDf < 0] covarianceNegativeReturns = negativeReturnsDf.cov() if objFunction == 'equalWeighting': return self.initWeights bounds = tuple((self.minWeight, self.maxWeight) for x in range(size)) constraints = [{'type': 'eq', 'fun': lambda x: np.sum(x) - 1.0}] if objFunction == 'meanVariance': # if no target return is provided, use the resulting from equal weighting if targetReturn is None: targetReturn = self.CalculateAnnualizedPortfolioReturn(dailyReturnsDf, self.initWeights) constraints.append( {'type': 'eq', 'fun': lambda weights: self.CalculateAnnualizedPortfolioReturn(dailyReturnsDf, weights) - targetReturn} ) opt = minimize(lambda weights: self.ObjectiveFunction(objFunction, dailyReturnsDf, covariance, covarianceNegativeReturns, weights), x0 = self.initWeights, bounds = bounds, constraints = constraints, method = 'SLSQP') return opt['x'] def ObjectiveFunction(self, objFunction, dailyReturnsDf, covariance, covarianceNegativeReturns, weights): ''' Description: Compute the objective function Args: objFunction: The objective function to optimize (equalWeighting, maxReturn, minVariance, meanVariance, maxSharpe, maxSortino, riskParity) dailyReturnsDf: DataFrame of historical daily returns covariance: Sample covariance covarianceNegativeReturns: Sample covariance matrix of only negative returns weights: Portfolio weights ''' if objFunction == 'maxReturn': f = self.CalculateAnnualizedPortfolioReturn(dailyReturnsDf, weights) return -f # convert to negative to be minimized elif objFunction == 'minVariance': f = self.CalculateAnnualizedPortfolioStd(covariance, weights) return f elif objFunction == 'meanVariance': f = self.CalculateAnnualizedPortfolioStd(covariance, weights) return f elif objFunction == 'maxSharpe': f = self.CalculateAnnualizedPortfolioSharpeRatio(dailyReturnsDf, covariance, weights) return -f # convert to negative to be minimized elif objFunction == 'maxSortino': f = self.CalculateAnnualizedPortfolioSortinoRatio(dailyReturnsDf, covarianceNegativeReturns, weights) return -f # convert to negative to be minimized elif objFunction == 'riskParity': f = self.CalculateRiskParityFunction(covariance, weights) return f else: raise ValueError(f'PortfolioOptimizer.ObjectiveFunction: objFunction input has to be one of equalWeighting,' + ' maxReturn, minVariance, meanVariance, maxSharpe, maxSortino or riskParity') def CalculateAnnualizedPortfolioReturn(self, dailyReturnsDf, weights): annualizedPortfolioReturns = np.sum( ((1 + dailyReturnsDf.mean())**252 - 1) * weights ) return annualizedPortfolioReturns def CalculateAnnualizedPortfolioStd(self, covariance, weights): annualizedPortfolioStd = np.sqrt( np.dot(weights.T, np.dot(covariance * 252, weights)) ) if annualizedPortfolioStd == 0: raise ValueError(f'PortfolioOptimizer.CalculateAnnualizedPortfolioStd: annualizedPortfolioStd cannot be zero. Weights: {weights}') return annualizedPortfolioStd def CalculateAnnualizedPortfolioNegativeStd(self, covarianceNegativeReturns, weights): annualizedPortfolioNegativeStd = np.sqrt( np.dot(weights.T, np.dot(covarianceNegativeReturns * 252, weights)) ) if annualizedPortfolioNegativeStd == 0: raise ValueError(f'PortfolioOptimizer.CalculateAnnualizedPortfolioNegativeStd: annualizedPortfolioNegativeStd cannot be zero. Weights: {weights}') return annualizedPortfolioNegativeStd def CalculateAnnualizedPortfolioSharpeRatio(self, dailyReturnsDf, covariance, weights): annualizedPortfolioReturn = self.CalculateAnnualizedPortfolioReturn(dailyReturnsDf, weights) annualizedPortfolioStd = self.CalculateAnnualizedPortfolioStd(covariance, weights) annualizedPortfolioSharpeRatio = annualizedPortfolioReturn / annualizedPortfolioStd return annualizedPortfolioSharpeRatio def CalculateAnnualizedPortfolioSortinoRatio(self, dailyReturnsDf, covarianceNegativeReturns, weights): annualizedPortfolioReturn = self.CalculateAnnualizedPortfolioReturn(dailyReturnsDf, weights) annualizedPortfolioNegativeStd = self.CalculateAnnualizedPortfolioNegativeStd(covarianceNegativeReturns, weights) annualizedPortfolioSortinoRatio = annualizedPortfolioReturn / annualizedPortfolioNegativeStd return annualizedPortfolioSortinoRatio def CalculateRiskParityFunction(self, covariance, weights): ''' Spinu formulation for risk parity portfolio ''' assetsRiskBudget = self.initWeights portfolioVolatility = self.CalculateAnnualizedPortfolioStd(covariance, weights) x = weights / portfolioVolatility riskParity = (np.dot(x.T, np.dot(covariance, x)) / 2) - np.dot(assetsRiskBudget.T, np.log(x)) return riskParity
''' Intersection of ROC comparison using OUT_DAY approach by Vladimir v1.3 (with dynamic selector for fundamental factors and momentum) inspired by Peter Guenther, Tentor Testivis, Dan Whitnable, Thomas Chang, Miko M, Leandro Maia Updates by Frank Schikarski: - Trailing Stop Loss based on logic from Quant Connect - Adaptiion of the level of Trailing Stop Loss following Vlad's regime logic - Portfolio Optimization adapted based on logic from Emilio Freire - Weighted Fundamentals logic from some other nice person ''' import numpy as np import pandas as pd from QuantConnect.Data.UniverseSelection import * from helpers import myPortfolioOptimizer from helpers import myTrailingStopRiskManagementModel # --------------------------------------------------------------------------------------------------------------------------------------------- BONDS = ['TLT']; VOLA = 126; BASE_RET = 85; STK_MOM = 126; N_COARSE = 100; N_FACTOR = 20; N_MOM = 5; LEV = 1.00; REBA = 84; PFO = 0; TSL = 0.11 # --------------------------------------------------------------------------------------------------------------------------------------------- class Fundamental_Factors_Momentum_ROC_Comparison_OUT_DAY(QCAlgorithm): def Initialize(self): self.SetStartDate(2008, 1, 1) self.SetEndDate(2021, 1, 13) self.InitCash = 100000 self.SetCash(self.InitCash) self.MKT = self.AddEquity("SPY", Resolution.Hour).Symbol self.mkt = [] self.SetBrokerageModel(BrokerageName.InteractiveBrokersBrokerage, AccountType.Margin) res = Resolution.Hour self.BONDS = [self.AddEquity(ticker, res).Symbol for ticker in BONDS] self.INI_WAIT_DAYS = 15 self.wait_days = self.INI_WAIT_DAYS self.GLD = self.AddEquity('GLD', res).Symbol self.SLV = self.AddEquity('SLV', res).Symbol self.XLU = self.AddEquity('XLU', res).Symbol self.XLI = self.AddEquity('XLI', res).Symbol self.UUP = self.AddEquity('UUP', res).Symbol self.DBB = self.AddEquity('DBB', res).Symbol self.pairs = [self.GLD, self.SLV, self.XLU, self.XLI, self.UUP, self.DBB] self.bull = 1 self.bull_prior = 0 self.count = 0 self.outday = (-self.INI_WAIT_DAYS+1) self.SetWarmUp(timedelta(max(VOLA, BASE_RET, STK_MOM, REBA))) # <- changed self.UniverseSettings.Resolution = res self.AddUniverse(self.CoarseFilter, self.FineFilter) self.data = {} self.UpdateFineFilter = 0 self.symbols = None self.RebalanceCount = 0 self.wt = {} self.Schedule.On(self.DateRules.EveryDay(), self.TimeRules.AfterMarketOpen('SPY', 30), self.daily_check) if TSL != 0: # <- added self.tsl_1 = 0.15 # <- added self.tsl_2 = TSL # <- added self.tsl_3 = 0.20 # <- added self.tsl_4 = 0.20 # <- added self.tsl = myTrailingStopRiskManagementModel(maximumDrawdownPercent=self.tsl_2) # <- added self.Schedule.On(self.DateRules.EveryDay(), self.TimeRules.Every(TimeSpan.FromMinutes(60)), self.stop_loss) # <- added if PFO != 0: # <- added self.pfo = myPortfolioOptimizer(minWeight=0, maxWeight=1) # <- added self.weights_stks = self.pfo.CalcWeights(self, symbols=self.BONDS) # <- added self.weights_bnds = self.pfo.CalcWeights(self, symbols=self.BONDS) # <- added symbols = [self.MKT] + self.pairs for symbol in symbols: self.consolidator = TradeBarConsolidator(timedelta(days=1)) self.consolidator.DataConsolidated += self.consolidation_handler self.SubscriptionManager.AddConsolidator(symbol, self.consolidator) self.history = self.History(symbols, VOLA, Resolution.Daily) if self.history.empty or 'close' not in self.history.columns: return self.history = self.history['close'].unstack(level=0).dropna() def consolidation_handler(self, sender, consolidated): self.history.loc[consolidated.EndTime, consolidated.Symbol] = consolidated.Close self.history = self.history.iloc[-VOLA:] def derive_vola_waitdays(self): sigma = 0.6 * np.log1p(self.history[[self.MKT]].pct_change()).std() * np.sqrt(252) wait_days = int(sigma * BASE_RET) period = int((1.0 - sigma) * BASE_RET) return wait_days, period def CoarseFilter(self, coarse): if not (((self.count - self.RebalanceCount) == REBA) or (self.count == self.outday + self.wait_days - 1)): self.UpdateFineFilter = 0 return Universe.Unchanged self.UpdateFineFilter = 1 selected = [x for x in coarse if x.HasFundamentalData and float(x.Price) > 5. ] filtered = sorted(selected, key=lambda x: x.DollarVolume, reverse=True) return [x.Symbol for x in filtered[:N_COARSE]] def FineFilter(self, fundamental): if self.UpdateFineFilter == 0: return Universe.Unchanged filtered_fundamental = [x for x in fundamental if x.SecurityReference.IsPrimaryShare and x.SecurityReference.SecurityType == "ST00000001" and x.SecurityReference.IsDepositaryReceipt == 0 and x.CompanyReference.IsLimitedPartnership == 0 and x.EarningReports.BasicAverageShares.ThreeMonths > 0 and float(x.EarningReports.BasicAverageShares.ThreeMonths) * x.Price > 2e9 and x.ValuationRatios.EVToEBITDA and x.ValuationRatios.PricetoEBITDA and x.ValuationRatios.PERatio #and x.EarningReports.TotalDividendPerShare.ThreeMonths ] # https://www.quantconnect.com/docs/data-library/fundamentals#Fundamentals-Reference-Tables # sorting: reverse=False means "longing highest", reverse=True means "longing lowest" s1 = sorted(filtered_fundamental, key=lambda x: x.ValuationRatios.EVToEBITDA, reverse=False) # <- added s2 = sorted(filtered_fundamental, key=lambda x: x.ValuationRatios.PricetoEBITDA, reverse=False) # <- added s3 = sorted(filtered_fundamental, key=lambda x: x.ValuationRatios.PERatio, reverse=False) # <- added dict = {} for i, elem in enumerate(s1): # <- added i1 = i # <- added i2 = s2.index(elem) # <- added i3 = s3.index(elem) # <- added score = sum([i1 * 0.85, i2 * 0.10, i3 * 0.05]) # <- added dict[elem] = score # <- added top = sorted(dict.items(), key = lambda x: x[1], reverse=True)[:N_FACTOR] # <- changed self.symbols = [x[0].Symbol for x in top] # <- changed self.UpdateFineFilter = 0 self.RebalanceCount = self.count return self.symbols def OnSecuritiesChanged(self, changes): addedSymbols = [] for security in changes.AddedSecurities: addedSymbols.append(security.Symbol) if security.Symbol not in self.data: self.data[security.Symbol] = SymbolData(security.Symbol, STK_MOM, self) if len(addedSymbols) > 0: history = self.History(addedSymbols, 1 + STK_MOM, Resolution.Daily).loc[addedSymbols] for symbol in addedSymbols: try: self.data[symbol].Warmup(history.loc[symbol]) except: self.Debug(str(symbol)) continue def daily_check(self): self.wait_days, period = self.derive_vola_waitdays() r = self.history.pct_change(period).iloc[-1] bear = ((r[self.SLV] < r[self.GLD]) and (r[self.XLI] < r[self.XLU]) and (r[self.DBB] < r[self.UUP])) down2x = ((r[self.SLV] < r[self.GLD]) and (r[self.XLI] < r[self.XLU])) or \ ((r[self.XLI] < r[self.XLU]) and (r[self.DBB] < r[self.UUP])) or \ ((r[self.SLV] < r[self.GLD]) and (r[self.DBB] < r[self.UUP])) # <- changed up2x = ((r[self.SLV] > r[self.GLD]) and (r[self.XLI] > r[self.XLU])) or \ ((r[self.XLI] > r[self.XLU]) and (r[self.DBB] > r[self.UUP])) or \ ((r[self.SLV] > r[self.GLD]) and (r[self.DBB] > r[self.UUP])) # <- changed up3x = ((r[self.SLV] > r[self.GLD]) and (r[self.XLI] > r[self.XLU]) and (r[self.DBB] > r[self.UUP])) # <- changed if bear: self.bull = False self.outday = self.count if (self.count >= self.outday + self.wait_days): self.bull = True self.wt_stk = LEV if self.bull else 0 self.wt_bnd = 0 if self.bull else LEV if bear: self.trade_out() if (self.bull and not self.bull_prior) or (self.bull and (self.count==self.RebalanceCount)): self.trade_in() if TSL != 0: # <- added if bear: # <- added self.tsl.setDD(maximumDrawdownPercent = self.tsl_1) # <- added if down2x: # <- added self.tsl.setDD(maximumDrawdownPercent = self.tsl_2) # <- added elif up2x: # <- added self.tsl.setDD(maximumDrawdownPercent = self.tsl_3) # <- added elif up3x: # <- added self.tsl.setDD(maximumDrawdownPercent = self.tsl_4) # <- added self.bull_prior = self.bull self.count += 1 def trade_out(self): for sec in self.Portfolio.Keys: if sec not in self.BONDS: self.wt[sec] = 0 for sec in self.BONDS: self.wt[sec] = self.wt_bnd/len(self.BONDS) for sec, weight in self.wt.items(): if weight == 0 and self.Portfolio[sec].IsLong: self.Liquidate(sec) for sec, weight in self.wt.items(): if weight != 0: self.SetHoldings(sec, weight) def trade_in(self): if self.symbols is None: return stocks = self.calc_return(self.symbols).iloc[:N_MOM].index for sec in self.Portfolio.Keys: if sec not in stocks: self.wt[sec] = 0 if PFO == 0: # <- added for sec in stocks: self.wt[sec] = self.wt_stk/N_MOM else: # <- added self.weights_stks = self.pfo.CalcWeights(self, symbols=stocks.values.tolist()) # <- added for sec in stocks: # <- added if (self.weights_stks.index==sec.Value).any(): # <- added self.wt[sec] = self.wt_stk * self.weights_stks.loc[self.weights_stks.index==sec.Value][0] # <- added else: # <- added self.wt[sec] = 0 # <- added for sec, weight in self.wt.items(): self.SetHoldings(sec, weight) def calc_return(self, stocks): ret = {} for symbol in stocks: try: ret[symbol] = self.data[symbol].Roc.Current.Value except: self.Debug(str(symbol)) continue df_ret = pd.DataFrame.from_dict(ret, orient='index') df_ret.columns = ['return'] sort_return = df_ret.sort_values(by = ['return'], ascending = False) return sort_return def stop_loss(self): # <- added if self.symbols is None: return # <- added if TSL != 0: # <- added self.tsl.setWTtoZeroIfDDtooHigh(self) # <- added for sec, weight in self.wt.items(): # <- added if weight == 0 and self.Portfolio[sec].IsLong: # <- added self.Liquidate(sec) # <- added self.RebalanceCount = self.count + 2 # <- added def OnEndOfDay(self): mkt_price = self.Securities[self.MKT].Close self.mkt.append(mkt_price) mkt_perf = self.InitCash * self.mkt[-1] / self.mkt[0] self.Plot('Strategy Equity', self.MKT, mkt_perf) account_leverage = self.Portfolio.TotalHoldingsValue / self.Portfolio.TotalPortfolioValue self.Plot('Holdings', 'leverage', round(account_leverage, 2)) self.Plot('Holdings', 'Target Leverage', LEV) class SymbolData(object): def __init__(self, symbol, roc, algorithm): self.Symbol = symbol self.Roc = RateOfChange(roc) self.algorithm = algorithm self.consolidator = algorithm.ResolveConsolidator(symbol, Resolution.Daily) algorithm.RegisterIndicator(symbol, self.Roc, self.consolidator) def Warmup(self, history): for index, row in history.iterrows(): self.Roc.Update(index, row['close'])