Overall Statistics |
Total Trades 345 Average Win 0.24% Average Loss -0.28% Compounding Annual Return 11.835% Drawdown 12.300% Expectancy 0.109 Net Profit 4.609% Sharpe Ratio 0.66 Probabilistic Sharpe Ratio 40.423% Loss Rate 40% Win Rate 60% Profit-Loss Ratio 0.86 Alpha -0.294 Beta 0.761 Annual Standard Deviation 0.214 Annual Variance 0.046 Information Ratio -2.513 Tracking Error 0.171 Treynor Ratio 0.185 Total Fees $361.55 |
from clr import AddReference AddReference("QuantConnect.Algorithm.Framework") AddReference("QuantConnect.Indicators") AddReference("QuantConnect.Common") from QuantConnect import * from QuantConnect.Indicators import * from QuantConnect.Algorithm.Framework.Alphas import * import numpy as np import pandas as pd import scipy as sc class Alpha_InOut: def __init__(self, variables, *args, **kwargs): self.Name = "Alpha_InOut" self.var = variables # Feed-in constants self.INI_WAIT_DAYS = 15 # out for 3 trading weeks # stock selection self.STKSEL = 'QQQ' self.MRKT = 'SPY' self.TLT = 'TLT' self.IEF = 'IEF' self.stocks = [self.STKSEL, self.TLT, self.IEF] # Market and list of signals based on ETFs self.PRDC = 'XLI' # production (industrials) self.METL = 'DBB' # input prices (metals) self.NRES = 'IGE' # input prices (natural res) self.DEBT = 'SHY' # cost of debt (bond yield) self.USDX = 'UUP' # safe haven (USD) self.GOLD = 'GLD' # gold self.SLVA = 'SLV' # VS silver self.UTIL = 'XLU' # utilities self.SHCU = 'FXF' # safe haven (CHF) self.RICU = 'FXA' # risk currency (AUD) self.INDU = self.PRDC # vs industrials self.FORPAIRS = [self.GOLD, self.SLVA, self.UTIL, self.SHCU, self.RICU] self.SIGNALS = [self.PRDC, self.METL, self.NRES, self.DEBT, self.USDX] # 'In' and 'out' holdings incl. weights self.HLD_IN = {self.STKSEL: 1.0} self.HLD_OUT = {self.TLT: .5, self.IEF: .5} # Initialize variables ## 'In'/'out' indicator self.be_in = 1 ## Day count variables self.dcount = 0 # count of total days since start self.outday = 0 # dcount when self.be_in=0 ## Flexi wait days self.WDadjvar = self.INI_WAIT_DAYS self.allocation = {} for symbol in self.stocks: self.allocation[symbol] = 0.0 def Update(self, algorithm, data): insights = [] if not self.var.Update_InOut: return insights # Returns sample to detect extreme observations hist = self.var.History( self.SIGNALS + [self.MRKT] + self.FORPAIRS, 252, Resolution.Daily)['close'].unstack(level=0).dropna() # hist_shift = hist.rolling(66).apply(lambda x: x[:11].mean()) hist_shift = hist.apply(lambda x: (x.shift(65) + x.shift(64) + x.shift(63) + x.shift(62) + x.shift( 61) + x.shift(60) + x.shift(59) + x.shift(58) + x.shift(57) + x.shift(56) + x.shift(55)) / 11) returns_sample = (hist / hist_shift - 1) # Reverse code USDX: sort largest changes to bottom returns_sample[self.USDX] = returns_sample[self.USDX] * (-1) # For pairs, take returns differential, reverse coded returns_sample['G_S'] = -(returns_sample[self.GOLD] - returns_sample[self.SLVA]) returns_sample['U_I'] = -(returns_sample[self.UTIL] - returns_sample[self.INDU]) returns_sample['C_A'] = -(returns_sample[self.SHCU] - returns_sample[self.RICU]) self.pairlist = ['G_S', 'U_I', 'C_A'] # Extreme observations; statist. significance = 1% pctl_b = np.nanpercentile(returns_sample, 1, axis=0) extreme_b = returns_sample.iloc[-1] < pctl_b # Determine waitdays empirically via safe haven excess returns, 50% decay self.WDadjvar = int( max(0.50 * self.WDadjvar, self.INI_WAIT_DAYS * max(1, #returns_sample[self.GOLD].iloc[-1] / returns_sample[self.SLVA].iloc[-1], #returns_sample[self.UTIL].iloc[-1] / returns_sample[self.INDU].iloc[-1], #returns_sample[self.SHCU].iloc[-1] / returns_sample[self.RICU].iloc[-1] np.where((returns_sample[self.GOLD].iloc[-1]>0) & (returns_sample[self.SLVA].iloc[-1]<0) & (returns_sample[self.SLVA].iloc[-2]>0), self.INI_WAIT_DAYS, 1), np.where((returns_sample[self.UTIL].iloc[-1]>0) & (returns_sample[self.INDU].iloc[-1]<0) & (returns_sample[self.INDU].iloc[-2]>0), self.INI_WAIT_DAYS, 1), np.where((returns_sample[self.SHCU].iloc[-1]>0) & (returns_sample[self.RICU].iloc[-1]<0) & (returns_sample[self.RICU].iloc[-2]>0), self.INI_WAIT_DAYS, 1) )) ) adjwaitdays = min(60, self.WDadjvar) # self.Debug('{}'.format(self.WDadjvar)) # Determine whether 'in' or 'out' of the market if (extreme_b[self.SIGNALS + self.pairlist]).any(): self.be_in = False self.outday = self.dcount if self.dcount >= self.outday + adjwaitdays: self.be_in = True self.dcount += 1 for symbol in self.stocks: # Swap to 'out' assets if applicable if not self.be_in: if symbol in self.HLD_IN: weight = 0 direction = InsightDirection.Flat if symbol in self.HLD_OUT: weight = self.HLD_OUT[symbol] direction = InsightDirection.Up # Swap to 'in' assets if applicable if self.be_in: if symbol in self.HLD_IN: weight = self.HLD_IN[symbol] direction = InsightDirection.Up if symbol in self.HLD_OUT: weight = 0 direction = InsightDirection.Flat if self.allocation[symbol] != weight: #insights.append(Insight(symbol, predictionInterval, InsightType.Price, stock_data[symbol][3], stock_data[symbol][2], weight, sourceModel="Test")) insights.append(Insight(symbol, timedelta(days=30), InsightType.Price, direction, None, None, self.Name, weight)) self.allocation[symbol] = weight self.var.Update_InOut = False return Insight.Group(insights) def OnSecuritiesChanged(self, algorithm, changes): pass
from clr import AddReference AddReference("QuantConnect.Common") AddReference("QuantConnect.Algorithm") AddReference("QuantConnect.Algorithm.Framework") AddReference("QuantConnect.Indicators") from QuantConnect import * from QuantConnect.Indicators import * from QuantConnect.Algorithm import * from QuantConnect.Algorithm.Framework import * from QuantConnect.Algorithm.Framework.Alphas import * import numpy as np import pandas as pd from datetime import datetime, timedelta from scipy.stats import linregress import decimal as d class Alpha_AdapVol: def __init__(self, variables, *args, **kwargs): self.Name = "Alpha_AdapVol" self.allocation = {} self.rising = {} self.var = variables #self.securities = [self.var.Symbol(x) for x in self.var.assets_AdapVol] self.securities = [ "SPY", "TLT", ] self.back_period = 21 * 12 + 1 # 3 months self.vol_period = 252 # days for calc vol self.target_vol = 0.08 #.2 oRIGINAL self.delta = 0.10 #.05 original min rebalancing self.w = 0. self.x = np.asarray(range(self.vol_period)) self.fastPeriod = 50 self.slowPeriod = 200 self.resolution = Resolution.Daily self.symbolDataBySymbol = {} self.lookback = 10 #for RoC def Update(self, algorithm, data): insights = [] if not self.var.Update_AdapVol: return insights # get all weights try: # get daily returns for period = self.back_period allPrices = algorithm.History(self.securities, self.back_period, Resolution.Daily).close.unstack(level=0) #RoC_history = algorithm.History(self.securities, self.lookback, self.resolution) self.w = 1. / len(self.securities) pos = {} # calculate alpha for EWM for symbol in self.securities: if symbol not in allPrices: continue ''' symbolData = self.symbolDataBySymbol.get(symbol) if symbolData is None: # create fast/slow EMAs symbolData = SymbolData(symbol, self.lookback) #symbolData.Fast = algorithm.EMA(symbol, self.fastPeriod, self.resolution) #symbolData.Slow = algorithm.EMA(symbol, self.slowPeriod, self.resolution) if symbolData is None: Continue self.symbolDataBySymbol[symbol] = symbolData symbolData.RegisterIndicators(algorithm, self.resolution) symbolData.WarmUpIndicators(RoC_history.loc[symbol]) ''' prices = allPrices[symbol] change = prices.pct_change().dropna() try: rsq = self.rsquared(self.x, prices[-self.vol_period:]) alpha = max(0.5, np.exp(-10. * (1. - rsq))) vol = change.ewm(alpha=alpha).std() # alpha = 2/(span+1) = 1-exp(log(0.5)/halflife) ann_vol = np.float(vol.tail(1)) * np.sqrt(252) except: continue weight = (self.target_vol / ann_vol).clip(0.0, 1.0) # NB: self.w = 1/no_assets pos[symbol] = weight msg = f"{symbol}: {pos[symbol]}, rsqr: {rsq}, alpha: {alpha}, ann_vol = {ann_vol}" #algorithm.Log(msg) except Exception as e: msg = f'Exception: {e}' algorithm.Log(msg) return ''' for symbol, symbolData in self.symbolDataBySymbol.items(): #if symbolData.Fast.IsReady and symbolData.Slow.IsReady: #symbolData.FastIsOverSlow = symbolData.Fast > symbolData.Slow pass total = sum(pos.values()) if total < 1.0: pos[algorithm.Symbol('SHY')] = (1.0-total) ''' for symbol, weight in pos.items(): new_weight = weight ''' price = algorithm.ActiveSecurities[symbol].Price if price == 0: continue ''' # gauge if needs to trade (new weight vs. current one > self.delta) if not symbol in self.allocation: curr_weight = 0 self.allocation[symbol] = 0 else: curr_weight = self.allocation[symbol] #rising1 = self.symbolDataBySymbol[symbol].FastIsOverSlow #rising1 = True #rising2 = self.symbolDataBySymbol[symbol].ROC_current > 0 #if not rising1 or not rising2: #new_weight = 0 #pass shall_trade = abs(new_weight - curr_weight) > self.delta or new_weight == 0 if curr_weight == 0 and new_weight == 0: shall_trade == False if shall_trade: # store old weights self.allocation[symbol] = new_weight direction = InsightDirection.Flat if self.allocation[symbol] > 0: direction = InsightDirection.Up else: direction = InsightDirection.Flat insights.append(Insight(symbol, timedelta(days=30), InsightType.Price, direction, None, None, self.Name, new_weight)) msg = f"{symbol} -- weight: {new_weight:.2f} (old weight was: {curr_weight:.2f})" #algorithm.Log(msg) self.var.Update_AdapVol = False return Insight.Group( insights ) ###################################### def rsquared(self, x, y): # slope, intercept, r_value, p_value, std_err _, _, r_value, _, _ = linregress(x, y) return r_value**2 def OnSecuritiesChanged(self, algorithm, changes): pass ''' class SymbolData: def __init__(self, symbol, lookback): self.Symbol = symbol self.ROC = RateOfChange('{}.ROC({})'.format(symbol, lookback), lookback) self.Consolidator = None self.previous = 0 self.Fast = None self.Slow = None self.FastIsOverSlow = False def RegisterIndicators(self, algorithm, resolution): self.Consolidator = algorithm.ResolveConsolidator(self.Symbol, resolution) algorithm.RegisterIndicator(self.Symbol, self.ROC, self.Consolidator) def RemoveConsolidators(self, algorithm): if self.Consolidator is not None: algorithm.SubscriptionManager.RemoveConsolidator(self.Symbol, self.Consolidator) def WarmUpIndicators(self, history): for tuple in history.itertuples(): self.ROC.Update(tuple.Index, tuple.close) @property def ROC_current(self): return float(self.ROC.Current.Value) @property def CanEmit(self): if self.previous == self.ROC.Samples: return False self.previous = self.ROC.Samples return self.ROC.IsReady def __str__(self, **kwargs): return '{}: {:.2%}'.format(self.ROC.Name, (1 + self.Return)**252 - 1) @property def SlowIsOverFast(self): return not self.FastIsOverSlow '''
from clr import AddReference AddReference("System") AddReference("QuantConnect.Common") AddReference("QuantConnect.Algorithm.Framework") from System import * from QuantConnect import * from QuantConnect.Orders import * from QuantConnect.Algorithm.Framework.Execution import ExecutionModel from QuantConnect.Algorithm.Framework.Portfolio import PortfolioTargetCollection from datetime import datetime, timedelta from pytz import utc UTCMIN = datetime.min.replace(tzinfo=utc) class Zero_Leverage_Execution_Model(ExecutionModel): def __init__(self, variables, resolution = Resolution.Daily, min_order = 500): self.targetsCollection = PortfolioTargetCollection() self.finished = False self.rebalancingTime = UTCMIN self.rebalancingPeriod = Extensions.ToTimeSpan(resolution) self.min_order_value = min_order self.var = variables def Execute(self, algorithm, targets): #Run until all orders are completed once per day ''' if (self.finished and algorithm.UtcTime <= self.rebalancingTime) or not algorithm.Securities["QQQ"].Exchange.ExchangeOpen: return elif algorithm.UtcTime > self.rebalancingTime: self.finished = False self.rebalancingTime = algorithm.UtcTime + self.rebalancingPeriod ''' #if (self.finished and algorithm.UtcTime <= self.rebalancingTime) or not algorithm.Securities["SPY"].Exchange.ExchangeOpen: if (self.finished and not self.var.execute) or not algorithm.Securities["SPY"].Exchange.ExchangeOpen: return #elif algorithm.UtcTime > self.rebalancingTime: elif self.var.execute: self.finished = False self.var.execute = False #self.rebalancingTime = algorithm.UtcTime + self.rebalancingPeriod self.targetsCollection.AddRange(targets) open_orders = False bought = False for target in self.targetsCollection.OrderByMarginImpact(algorithm): open_quantity = sum([x.Quantity for x in algorithm.Transactions.GetOpenOrders(target.Symbol)]) existing = algorithm.Securities[target.Symbol].Holdings.Quantity + open_quantity qty_to_order = int(round(target.Quantity) - existing) if existing == 0: pct_diff = 1 else: pct_diff = abs(qty_to_order)/abs(existing) value = abs(qty_to_order) * algorithm.Securities[target.Symbol].Price portfolio_value = float(algorithm.Portfolio.TotalPortfolioValue) if portfolio_value == 0: port_pct_diff = 1 else: port_pct_diff = value/portfolio_value cash = algorithm.Portfolio.Cash + algorithm.Portfolio.UnsettledCash #Only sell if it exeeds min order value or if selling everything, to prevent being left with a tiny amount of stock unable to sell if qty_to_order <= -1.0 or target.Quantity == 0: if value < self.min_order_value or port_pct_diff < .005: self.targetsCollection.Remove(target.Symbol) continue algorithm.MarketOrder(target.Symbol, qty_to_order) #algorithm.Debug("Sell = " + str(target.Symbol) + " > " + str(quantity)) open_orders = True #Only buy if it exeeds min order value and no sells have also processed in this same instant to avoid leverage elif qty_to_order >= 1.0 and open_orders == False: #Don't buy stocks if there are still open sell orders to prevent leverage for order in algorithm.Transactions.GetOpenOrders(): if order.Quantity < 0: open_orders = True if value < self.min_order_value or port_pct_diff < .005 or value > cash: self.targetsCollection.Remove(target.Symbol) continue if not open_orders: algorithm.MarketOrder(target.Symbol, qty_to_order) #algorithm.Debug("Buy = " + str(target.Symbol) + " > " + str(quantity)) bought = True for order in algorithm.Transactions.GetOpenOrders(): open_orders = True self.targetsCollection.ClearFulfilled(algorithm) pass if self.targetsCollection.Count == 0 and not open_orders and not bought: #algorithm.Debug("execution finished at " + str(algorithm.UtcTime)) self.finished = True
# QUANTCONNECT.COM - Democratizing Finance, Empowering Individuals. # Lean Algorithmic Trading Engine v2.0. Copyright 2014 QuantConnect Corporation. # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. # You may obtain a copy of the License at http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. from clr import AddReference AddReference("System") AddReference("QuantConnect.Common") AddReference("QuantConnect.Algorithm") AddReference("QuantConnect.Algorithm.Framework") from QuantConnect import * from QuantConnect.Algorithm import * from QuantConnect.Algorithm.Framework import * from QuantConnect.Algorithm.Framework.Portfolio import PortfolioTarget from QuantConnect.Algorithm.Framework.Risk import RiskManagementModel from datetime import datetime, timedelta from pytz import utc UTCMIN = datetime.min.replace(tzinfo=utc) class MaximumDrawdownPercentPerSecurity(RiskManagementModel): '''Provides an implementation of IRiskManagementModel that limits the drawdown per holding to the specified percentage''' def __init__(self, variables, maximumDrawdownPercent = 1000, maximumUnrealizedProfitPercent = 1000): '''Initializes a new instance of the MaximumDrawdownPercentPerSecurity class Args: maximumDrawdownPercent: The maximum percentage drawdown allowed for any single security holding''' self.maximumDrawdownPercent = -abs(maximumDrawdownPercent) self.maximumUnrealizedProfitPercent = abs(maximumUnrealizedProfitPercent) self.checkTime = UTCMIN #initialize with Current time self.time_between_checks = Extensions.ToTimeSpan(Resolution.Minute)*10 #10 minutes per risk check def ManageRisk(self, algorithm, targets): '''Manages the algorithm's risk at each time step Args: algorithm: The algorithm instance targets: The current portfolio targets to be assessed for risk''' targets = [] MarketOpen = algorithm.Securities["SPY"].Exchange.ExchangeOpen if (algorithm.UtcTime <= self.checkTime or MarketOpen == False): return targets for kvp in algorithm.Securities: security = kvp.Value if not security.Invested: continue #price = security.Holdings.Price #close = algorithm.History(algorithm.Symbol(str(security)), 5, Resolution.Daily).close[-1] #if close == 0: continue #pnl = (price - close)/close pnl = security.Holdings.UnrealizedProfitPercent if pnl < self.maximumDrawdownPercent or pnl > self.maximumUnrealizedProfitPercent: # liquidate targets.append(PortfolioTarget(security.Symbol, 0)) self.checkTime = algorithm.UtcTime + self.time_between_checks return targets
from System import * from QuantConnect import * from QuantConnect.Orders import * from QuantConnect.Algorithm import * from QuantConnect.Algorithm.Framework import * from QuantConnect.Algorithm.Framework.Execution import * from QuantConnect.Algorithm.Framework.Selection import * from QuantConnect.Algorithm.Framework.Alphas import * #from Alpha_VAA import Alpha_VAA #from Alpha_ACR import Alpha_ACR #from Alpha_ZScore import Alpha_ZScore #from Alpha_CAA import Alpha_CAA from Alpha_AdapVol import Alpha_AdapVol from Alpha_SimpleRebalance import Alpha_SimpleRebalance from Alpha_InOut import Alpha_InOut from Insight_Weighted_Portfolio import Insight_Weighted_Portfolio #from Optimized_Portfolio_Model import Optimized_Portfolio_Model #from Risk_Management_Model import MaximumDrawdownPercentPerSecurity from Risk_Synthetic_Leverage import Risk_Synthetic_Leverage from Zero_Leverage_Execution_Model import Zero_Leverage_Execution_Model class Modular_Framework(QCAlgorithmFramework): '''Modular Framework Algorithm.''' def Initialize(self): """ General Algorithm Parameters """ self.UniverseSettings.Resolution = Resolution.Hour #Set Data Resolution self.SetStartDate(2020, 6, 26) #Set Start Date #self.SetEndDate(2020, 5, 4) #Set End Date self.SetCash(100000) #Set Strategy Cash self.SetBrokerageModel(BrokerageName.InteractiveBrokersBrokerage, AccountType.Margin) #Set Brokerage Model self.AddEquity("SPY") self.SetBenchmark("SPY") self.SetWarmup(252) #252 self.SetTimeZone("America/New_York") """ Portfolio Construction Parameters """ self.max_leverage = .99 self.reserved = 750 #Reserved cash will not trade with self.normalize = True #noramalize leverage (attempt to stay at max leverage always) """ Portfolio Execution Parameters """ self.Minimum_order_value = 500 #This reduces fees by trading only when a large reblance is needed """ Portfolio Risk Management Parameters """ #self.maximumDrawdownPercent = 0.0017 #self.maximumUnrealizedProfitPercent = 0.0013 self.drawdown_leverageMax = 0.03 self.drawdown_leverageMin = 0.06 """ Universe """ self.assets_VAA = [ "SPY", #SPDR S&P 500 Trust ETF "VEA", #Vanguard FTSE Developed Markets ETF "VTI", #Vanguard Total Stock Market ETF "AGG", #iShares Barclays Aggregate Bond Fund "HYD", #VanEck Vectors High-Yield Municipal ETF "VMBS", #Vanguard Mortgage-Backed Securities ETF "BKLN", #Invesco Senior Loan ETF "JNK", #SPDR Barclays High Yield Bond ETF "VT", ##Vanguard Total World Stock Index Fund "VWO", #Vanguard FTSE Emerging Markets ETF "TLT", #iShares 20+ Year Treasury Bond ETF ] self.assets_ACR = [ "VOE", "VDC", "XLP", "IJR", "TLT", "TIP", "DBC", "SHY", "XLB", #Materials "XLY", #Consumer Cyclical "XLF", #Financials "IYR", #ISHARES Real Estate "XLP", #Consumer Defensive "XLV", #Healthcare "XLU", #Utilities "XLE", #Energy "XLI", #Industrials "XLK", #Tech "SPY", ] self.assets_ZScore = [ "SPY", "TLT", "MUB", "TIP", "XLP", ] self.assets_CAA = [ "SPY", "IJR", "EEM", "EFA", "GLD", "TLT", "VNQ", "XLE", "XLK", "XLY", "XLU", "XLP", "XLI", "XLF", "XLV", "XLB" ] self.assets_AdapVol = [ "SPY", "TLT", ] self.assets_SimpleRebalance = [ "QQQ", "TLT", "EFZ", "IEF", "VXX", ] self.assets_InOut = [ "QQQ", "SPY", "TLT", "IEF", "XLI", "DBB", "IGE", "SHY", "UUP", "GLD", "SLV", "XLU", "FXF", "FXA", ] self.assets_Leveraged = [ "TQQQ", "TMF", "TYD", "SPXL", #"DRN", #"UGLD", ] self.universe = self.assets_AdapVol + self.assets_SimpleRebalance + self.assets_Leveraged + self.assets_InOut #self.assets_CAA #self.assets_VAA #self.assets_ACR #self.assets_ZScore self.universe = (list(set(self.universe))) universe = [ Symbol.Create(symbol, SecurityType.Equity, Market.USA) for symbol in self.universe ] """ Rebalancing """ self.first_run = True self.rebalance = False self.execute = False self.updating = False self.Update_AdapVol = False self.Update_VAA = False self.Update_ZScore = False self.Update_ACR = False self.Update_CAA = False self.Update_SimpleRebalance = False self.Update_InOut = False #dateRule = self.DateRules.EveryDay("SPY") #dateRule = self.DateRules.WeekStart("SPY") #dateRule = self.DateRules.MonthStart("SPY") #timerule = self.TimeRules.At(13,15) #timerule = self.TimeRules.At(9,30) timerule = self.TimeRules.AfterMarketOpen("SPY", 0) self.Schedule.On(self.DateRules.EveryDay("SPY"), timerule, self.First_Run_Update_Alphas) #self.Schedule.On(self.DateRules.EveryDay("SPY"), self.TimeRules.At(9,30), self.Execute_rebalance) self.Schedule.On(self.DateRules.EveryDay("SPY"), self.TimeRules.AfterMarketOpen("SPY", 60), self.Execute_rebalance) self.Schedule.On(self.DateRules.WeekStart("SPY"), timerule, self.Update_alpha_AdapVol) #self.Schedule.On(self.DateRules.EveryDay("SPY"), timerule, self.Update_alpha_VAA) #self.Schedule.On(self.DateRules.EveryDay("SPY"), timerule, self.Update_alpha_ZScore) #self.Schedule.On(self.DateRules.EveryDay("SPY"), timerule, self.Update_alpha_ACR) #self.Schedule.On(self.DateRules.EveryDay("SPY"), timerule, self.Update_alpha_CAA) self.Schedule.On(self.DateRules.WeekStart("SPY"), timerule, self.Update_alpha_SimpleRebalance) self.Schedule.On(self.DateRules.EveryDay("SPY"), timerule, self.Update_alpha_InOut) """ Framework Parameters """ ############################################################################ ############################################################################ #SET UNIVERSE MODEL ############################################################################ ############################################################################ self.SetUniverseSelection( ManualUniverseSelectionModel(universe) ) ############################################################################ ############################################################################ #SET ALPHA MODEL ############################################################################ ############################################################################ self.alphas = [ #Alpha_VAA(self), #depreciated 2020 Alpha_AdapVol(self), #Alpha_ACR(self), #depreciated, high drawdown #Alpha_ZScore(self), #depreciated 2020 #Alpha_CAA(self), Alpha_SimpleRebalance(self), Alpha_InOut(self), ] self.SetAlpha( CompositeAlphaModel( *self.alphas ) ) ############################################################################ ############################################################################ #SET PORTFOLIO CONSTRUCTION MODEL ############################################################################ ############################################################################ self.SetPortfolioConstruction( Insight_Weighted_Portfolio( self, Resolution.Daily, self.reserved, self.max_leverage, len(self.alphas), self.normalize, ) ) ''' objective function for portfolio optimizer options are: equal (Equal Weighting), return (Maximize Portfolio Return), std (Minimize Portfolio Standard Deviation), sharpe (Maximize Portfolio Sharpe Ratio), no_optimization (use alpha's weighting) ''' ''' objectiveFunction = 'no_optimization' # rebalancing period (to enable rebalancing enter an integer for number of calendar days, e.g. 1, 7, 30, 365) rebalancingParam = 1 self.SetPortfolioConstruction( Optimized_Portfolio_Model( objectiveFunction = objectiveFunction, rebalancingParam = rebalancingParam, reserve = self.reserved, leverage = self.max_leverage, num_alphas = len(self.alphas), normalize = self.normalize, ) ) ''' ############################################################################ ############################################################################ #SET RISK MANAGEMENT ############################################################################ ############################################################################ self.SetRiskManagement( Risk_Synthetic_Leverage( self, self.drawdown_leverageMax, self.drawdown_leverageMin, ) ) #self.SetRiskManagement( MaximumDrawdownPercentPerSecurity(self.maximumDrawdownPercent,self.maximumUnrealizedProfitPercent) ) #self.SetRiskManagement( NullRiskManagementModel() ) #self.SetRiskManagement(CompositeRiskManagementModel(MaximumUnrealizedProfitPercentPerSecurity(self.maximumUnrealizedProfitPercent),MaximumDrawdownPercentPerSecurity(self.maximumDrawdownPercent))) ############################################################################ ############################################################################ #SET EXECUTION MODEL ############################################################################ ############################################################################ self.SetExecution( Zero_Leverage_Execution_Model( self, Resolution.Daily, self.Minimum_order_value, ) ) #### END INITIALIZATION #### def Update_alpha_SimpleRebalance(self): self.Update_SimpleRebalance = True self.updating = True def Update_alpha_AdapVol(self): self.Update_AdapVol = True self.updating = True def Update_alpha_VAA(self): self.Update_VAA = True self.updating = True def Update_alpha_ZScore(self): self.Update_ZScore = True self.updating = True def Update_alpha_ACR(self): self.Update_ACR = True self.updating = True def Update_alpha_CAA(self): self.Update_CAA = True self.updating = True def Update_alpha_InOut(self): self.Update_InOut = True self.updating = True def First_Run_Update_Alphas(self): if self.first_run: self.Update_CAA = True self.Update_ACR = True self.Update_ZScore = True self.Update_VAA = True self.Update_AdapVol = True self.Update_SimpleRebalance = True self.Update_InOut = True self.updating = True self.first_run = False def Execute_rebalance(self): if self.updating: #self.Debug("execute/Rebalance initiated at " + str(self.UtcTime)) self.rebalance = True self.execute = True
from clr import AddReference AddReference("QuantConnect.Algorithm.Framework") AddReference("QuantConnect.Indicators") AddReference("QuantConnect.Common") from QuantConnect import * from QuantConnect.Indicators import * from QuantConnect.Algorithm.Framework.Alphas import * #from datetime import datetime, timedelta #from pytz import utc #UTCMIN = datetime.min.replace(tzinfo=utc) import numpy as np class Alpha_VAA: def __init__(self, variables, *args, **kwargs): self.Name = "Alpha_VAA" self.var = variables """ VAA parameters """ self.periods = [42,21,63,126,252] self.weights = [0, 12, 6, 3, 1] self.resolution = Resolution.Daily #self.rebalancingTime = UTCMIN self.days_predicted = 5 #self.rebalancingPeriod = Extensions.ToTimeSpan(self.resolution)*self.days_predicted self.GrowthSymbols = [ "SPY", #SPDR S&P 500 Trust ETF "VEA", #Vanguard FTSE Developed Markets ETF "VTI", #Vanguard Total Stock Market ETF "VT", #Vanguard Total World Stock Index Fund "VWO", #Vanguard FTSE Emerging Markets ETF "AGG", #iShares Barclays Aggregate Bond ETF ] self.SafetySymbols = [ "HYD", #VanEck Vectors High-Yield Municipal ETF "VMBS", #Vanguard Mortgage-Backed Securities ETF "BKLN", #Invesco Senior Loan ETF "JNK", #SPDR Barclays High Yield Bond ETF "TLT", #iShares 20+ Year Treasury Bond ETF ] #Variable Definitions self.symbolDataBySymbol = {} self.allocation = {} self.stocks = self.GrowthSymbols + self.SafetySymbols for symbol in self.stocks: self.allocation[symbol] = 0.0 def Update(self, algorithm, data): insights = [] ''' if (algorithm.UtcTime <= self.rebalancingTime): return insights ''' if not self.var.Update_VAA: return insights ##Using a weighted average, compute the score for each risky asset. growthdata = [] for symbol in self.GrowthSymbols: if not symbol in self.symbolDataBySymbol.keys(): continue temp_values = [] for i, period in enumerate(self.periods): if self.symbolDataBySymbol[symbol][i].CanEmit: temp_values.append(self.symbolDataBySymbol[symbol][i].Return) else: temp_values.append(0.0) score = sum([i*j for i,j in zip(temp_values,self.weights)]) growthdata.append([symbol, score, temp_values[0]]) orderedGrowthScores = sorted(growthdata, key=lambda x: x[1], reverse=True) #algorithm.Debug(orderedGrowthScores) ##Using a weighted average, compute the score for each risk-free asset. ##This approach overweights the front month momentum value and progressively underweights older momentum values safetydata = [] for symbol in self.SafetySymbols: #algorithm.Debug(symbol) #algorithm.Debug(self.symbolDataBySymbol.keys()) if not symbol in self.symbolDataBySymbol.keys(): continue #algorithm.Debug("2") temp_values = [] for i, period in enumerate(self.periods): #algorithm.Debug("3") if self.symbolDataBySymbol[symbol][i].CanEmit: #algorithm.Debug("4") temp_values.append(self.symbolDataBySymbol[symbol][i].Return) else: temp_values.append(0.0) #algorithm.Debug("5") #score = np.average(temp_values, weights = self.weights) score = sum([i*j for i,j in zip(temp_values,self.weights)]) safetydata.append([symbol, score, temp_values[0]]) orderedSafeScores = sorted(safetydata, key=lambda x: x[1], reverse=True) if safetydata == [] or growthdata == []: return insights #algorithm.Debug(orderedSafeScores) ##Count the number of risky assets with negative momentum scores and store in N. If all four of the offensive assets exhibit positive momentum scores, ##select the offensive asset with the highest score and allocate 100% of the portfolio to that asset at the close count = 0 negative_flag = False for stock in orderedGrowthScores: if stock[1] <= 0.0: count += 1 if count > 0: negative_flag = True top_growth = orderedGrowthScores[0][0] second_growth = orderedGrowthScores[1][0] top_safe = orderedSafeScores[0][0] second_safe = orderedSafeScores[1][0] for data_item in (orderedGrowthScores + orderedSafeScores): symbol = data_item[0] weight = 0 if symbol == top_growth: if negative_flag: weight = 0.1 else: weight = 0.5 elif symbol == second_growth: if negative_flag: weight = 0.1 else: weight = 0.5 elif symbol == top_safe: if negative_flag: weight = 0.4 else: weight = 0.0 elif symbol == second_safe: if negative_flag: weight = 0.4 else: weight = 0.0 else: weight = 0 direction = InsightDirection.Flat if weight > 0: direction = InsightDirection.Up else: direction = InsightDirection.Flat if self.allocation[symbol] != weight: #insights.append(Insight(symbol, predictionInterval, InsightType.Price, stock_data[symbol][3], stock_data[symbol][2], weight, sourceModel="Test")) insights.append(Insight(symbol, timedelta(days=30), InsightType.Price, direction, None, None, self.Name, weight)) self.allocation[symbol] = weight #self.rebalancingTime = algorithm.UtcTime + self.rebalancingPeriod self.var.Update_VAA = False #return insights return Insight.Group( insights ) def OnSecuritiesChanged(self, algorithm, changes): algorithm.Debug("Securities Changed. Added " + str(changes.AddedSecurities)) symbols = [ x.Symbol for x in changes.AddedSecurities if str(x.Symbol) in self.stocks ] history = algorithm.History(symbols, (np.max(self.periods)), Resolution.Daily) if history.empty: algorithm.Debug("History Error") return for removed in changes.RemovedSecurities: if removed.Symbol in self.stocks: symbolData_temp = self.symbolDataBySymbol.pop(removed.Symbol, None) if symbolData_temp is not None: symbolData_temp.RemoveConsolidators(algorithm) for stock in history.index.levels[0]: symbol = SymbolCache.GetSymbol(stock) algorithm.Debug("Getting Data for " + str(symbol)) if str(symbol) not in self.symbolDataBySymbol.keys(): algorithm.Debug("registering symboldata for " + str(symbol)) symbolData_temp = [] for period in self.periods: #algorithm.Debug("C") tempData = SymbolData(symbol, period) tempData.RegisterIndicators(algorithm, Resolution.Daily) tempData.WarmUpIndicators(history.loc[stock]) symbolData_temp.append(tempData) self.symbolDataBySymbol[str(symbol)] = symbolData_temp #algorithm.Debug(symbolData_temp) class SymbolData: def __init__(self, symbol, lookback): self.Symbol = symbol self.MOM = Momentum('{}.MOM({})'.format(symbol, lookback), lookback) self.Consolidator = None self.previous = 0 def RegisterIndicators(self, algorithm, resolution): #algorithm.Debug("Register Indicators. Alpha") self.Consolidator = algorithm.ResolveConsolidator(self.Symbol, resolution) algorithm.RegisterIndicator(self.Symbol, self.MOM, self.Consolidator) def RemoveConsolidators(self, algorithm): if self.Consolidator is not None: algorithm.SubscriptionManager.RemoveConsolidator(self.Symbol, self.Consolidator) def WarmUpIndicators(self, history): for tuple in history.itertuples(): self.MOM.Update(tuple.Index, tuple.close) @property def Return(self): return float(self.MOM.Current.Value) @property def CanEmit(self): if self.previous == self.MOM.Samples: return False self.previous = self.MOM.Samples return self.MOM.IsReady ''' def __str__(self, **kwargs): return '{}: {:.2%}'.format(self.MOM.Name, (1 + self.Return)**252 - 1) '''
from clr import AddReference AddReference("QuantConnect.Algorithm.Framework") AddReference("QuantConnect.Indicators") AddReference("QuantConnect.Common") import numpy as np import pandas as pd import math from QuantConnect import * from QuantConnect.Indicators import * from QuantConnect.Algorithm.Framework.Alphas import * from datetime import datetime, timedelta from pytz import utc UTCMIN = datetime.min.replace(tzinfo=utc) class Alpha_ZScore: def __init__(self, variables, *args, **kwargs): self.Name = "Alpha_ZScore" self.var = variables #zscore assets self.stocks = [ "SPY", "TLT", ] """ zscore parameters """ self.fixed_wt_pct = 0.0 #.50 self.fixed_wt = { } self.vol_factor = 0.50 #TLT = .5, SPY = 1 self.ext_factor = 4.0 #move too extreme, leave asset self.lookback = 252 self.allocation = {} #Variable Definitions self.resolution = Resolution.Daily self.rebalancingTime = UTCMIN self.days_predicted = 5 self.rebalancingPeriod = Extensions.ToTimeSpan(self.resolution)*self.days_predicted for symbol in self.stocks: self.allocation[symbol] = 0.0 def Update(self, algorithm, data): insights = [] if not self.var.Update_ZScore: return insights safestock = "TLT" history = algorithm.History([safestock], self.lookback, Resolution.Daily) mean = history['close'].mean() sigma = history['close'].std() price = float(algorithm.Securities[safestock].Price) if sigma != 0.0: z = (price - mean) / sigma**self.vol_factor else: z = 0.0 if -self.ext_factor <= z <= self.ext_factor: tlt_target = 1.0/(1+math.exp(-1.2*z))*.75 # Pure momentum adding 1 to the sin wave to prevent shorting else: tlt_target = 0.0 spy_target = (1.0-tlt_target) allocation_temp = {} magnitude = {} direction = InsightDirection.Flat for sid in self.stocks: allocation_temp[sid] = 0.0 if sid in self.fixed_wt: allocation_temp[sid] = self.fixed_wt[sid] * self.fixed_wt_pct allocation_temp[safestock] += tlt_target * (1.0 - self.fixed_wt_pct) allocation_temp["SPY"] += spy_target * (1.0 - self.fixed_wt_pct) #predictionInterval = Time.Multiply(Extensions.ToTimeSpan(self.resolution), self.days_predicted) #expected = algorithm.History(self.stocks, 5, Resolution.Daily)["close"].unstack(level=0).dropna().pct_change().dropna().mean() + 1.0 for symbol in self.allocation: if (self.allocation[symbol] != allocation_temp[symbol]) or (self.allocation[symbol] != 0.0) or (allocation_temp[symbol] != 0.0): self.allocation[symbol] = allocation_temp[symbol] direction = InsightDirection.Flat if self.allocation[symbol] > 0: direction = InsightDirection.Up else: direction = InsightDirection.Flat #insights.append(Insight(symbol, predictionInterval, InsightType.Price, direction[symbol], magnitude[symbol], self.allocation[symbol], sourceModel="Test")) insights.append(Insight(symbol, timedelta(days=30), InsightType.Price, direction, None, None, self.Name, self.allocation[symbol])) #algorithm.Debug(str(symbol) + " = " + str(self.allocation[symbol])) #insights = [] #Remove me #return insights self.rebalancingTime = algorithm.UtcTime + self.rebalancingPeriod self.var.Update_ZScore = False return Insight.Group( insights ) def OnSecuritiesChanged(self, algorithm, changes): pass
from clr import AddReference AddReference("QuantConnect.Algorithm.Framework") AddReference("QuantConnect.Indicators") AddReference("QuantConnect.Common") from QuantConnect import * from QuantConnect.Indicators import * from QuantConnect.Algorithm.Framework.Alphas import * import numpy as np import pandas as pd import itertools from datetime import datetime, timedelta from pytz import utc UTCMIN = datetime.min.replace(tzinfo=utc) class Alpha_ACR: def __init__(self, variables, *args, **kwargs): self.Name = "Alpha_ACR" self.ACR_assets = [ "VOE", "VDC", "XLP", "IJR", "SPY", #NEW #"XLU", #NEW #"IYR", #NEW #"XLB", #NEW ] self.ACR_bonds = [ "TLT", "TIP", "DBC", "SHY", ] self.ACR_sectors = [ "XLB", #Materials "XLY", #Consumer Cyclical "XLF", #Financials "IYR", #ISHARES Real Estate "XLP", #Consumer Defensive "XLV", #Healthcare "XLU", #Utilities "XLE", #Energy "XLI", #Industrials "XLK", #Tech ] self.ACR_fixed = [ "SPY", ] #self.Hedge = ["VXX",] self.stocks = self.ACR_assets+self.ACR_bonds+self.ACR_fixed # + self.Hedge self.stocks_all = self.ACR_assets+self.ACR_bonds+self.ACR_fixed+self.ACR_sectors """ ACR (Asset Class Rotation) parameters """ self.ACR_sector_step = 13 #12% step change = all bonds if 9 of 11 sectors down self.ACR_asset_step = 20 #20% step change self.allocation = {} self.ACR_fixed_weight = [ 0.0, #SPY ] #Variable Definitions self.resolution = Resolution.Daily self.rebalancingTime = UTCMIN self.days_predicted = 5 self.rebalancingPeriod = Extensions.ToTimeSpan(self.resolution)*self.days_predicted self.var = variables #for stock in self.ACR_sectors: #self.var.AddEquity(stock) self.symbolData = {} for symbol in self.stocks: self.allocation[symbol] = 0.0 def Update(self, algorithm, data): #algorithm.Debug("Updating Alpha Model.") insights = [] if not self.var.Update_ACR: return insights if not all([data.Bars.ContainsKey(symbol) for symbol in self.stocks_all]): return insights for sid in self.stocks: if not sid in self.allocation: self.allocation[sid] = 0.0 ACR_assets_weight = np.zeros(len(self.ACR_assets)) ACR_bonds_data = pd.DataFrame(0, columns=['Weight','Ratio','20Day','60Day'],index=self.ACR_bonds) ACR_sectors_data = pd.DataFrame(0, columns=['Ratio','20Day','200Day'],index=self.ACR_sectors) """ Determine sector trends and calculate weight to assets/bonds """ ACR_sectors_data.loc[:,'20Day'] = algorithm.History(self.ACR_sectors, 20, Resolution.Daily)["close"].unstack(level=0).mean() ACR_sectors_data.loc[:, '200Day'] = algorithm.History(self.ACR_sectors, 200, Resolution.Daily)["close"].unstack(level=0).mean() ACR_sectors_data['Ratio'] = ACR_sectors_data['20Day']/ACR_sectors_data['200Day'] - 1 ACR_bonds_weight = len(ACR_sectors_data[ACR_sectors_data['Ratio'] < 0]) * self.ACR_sector_step/100.0 if ACR_bonds_weight > 1.0: ACR_bonds_weight = 1.0 ACR_bonds_weight = ACR_bonds_weight * (1-sum(self.ACR_fixed_weight)) """ Determine bond trends and which duration to be in """ if ACR_bonds_weight > 0.0: ACR_bonds_data.loc[:,'20Day'] = algorithm.History(self.ACR_bonds, 20, Resolution.Daily)["close"].unstack(level=0).mean() ACR_bonds_data.loc[:, '60Day'] = algorithm.History(self.ACR_bonds, 60, Resolution.Daily)["close"].unstack(level=0).mean() ACR_bonds_data['Ratio'] = ACR_bonds_data['20Day']/ACR_bonds_data['60Day'] - 1 ACR_bonds_data['Weight'] = 0 ACR_bonds_data.loc[ACR_bonds_data['Ratio'].idxmax(), 'Weight'] = ACR_bonds_weight #log.info(self.ACR_bonds_data) returns = algorithm.History(self.ACR_assets, 126, Resolution.Daily)["close"].unstack(level=0).dropna().pct_change().dropna() + 1.0 expected = algorithm.History(self.stocks, 20, Resolution.Daily)["close"].unstack(level=0).dropna().pct_change().dropna().mean() + 1.0 """ Create portfolio combinations """ n = len(self.ACR_assets) steps = [x/100.0 for x in range(0,101,int(self.ACR_asset_step))] a = [steps for x in range(n)] b = list(itertools.product(*a)) x = [sum(i) for i in b] port = pd.DataFrame(b) port['Sum'] = x port = port[port.Sum == 1] del port['Sum'] """ Score and Weight portoflio """ port_returns = pd.DataFrame(np.dot(returns, port.T), index=returns.index) port_metrics = self.ACR_get_specs(port_returns) port_metrics = self.ACR_score(port_metrics) port_metrics['Z3'] = port_metrics.ZMean\ -port_metrics.ZSTD\ -port_metrics.ZDownSide\ +port_metrics.ZAnnualized\ -port_metrics.ZMax_Draw\ -port_metrics.ZSTD10\ +port_metrics.ZMean10\ +port_metrics.ZMinimum15 port_metrics = port_metrics.sort_values(by='Z3', ascending=False) portfolios = port portfolios.columns = list(returns.columns.values) best = pd.concat([pd.DataFrame(portfolios.iloc[port_metrics['Z3'].idxmax()]).T]) #log.info(best.loc[:, (best != 0).any(axis=0)].T) best = pd.DataFrame(portfolios.iloc[port_metrics['Z3'].idxmax()]) #log.info(best) ACR_assets_weight = [i[0]*(1-ACR_bonds_weight-sum(self.ACR_fixed_weight)) for i in best.values] predictionInterval = Time.Multiply(Extensions.ToTimeSpan(self.resolution), self.days_predicted) allocation_temp = {} magnitude = {} direction = {} risk_weight = 0 for x in range(n): allocation_temp[self.var.Symbol(portfolios.iloc[port_metrics['Z3'].idxmax()].keys()[x]).Value] = portfolios.iloc[port_metrics['Z3'].idxmax()][x]#*.95 #expected_temp[self.ACR_assets[x]] = risk_weight += ACR_assets_weight[x] for stock in self.stocks: magnitude[stock] = 0.0075 #self.allocation[self.Hedge[0]] = risk_weight * .05 for stock in self.ACR_bonds: allocation_temp[stock] = ACR_bonds_data.loc[stock, 'Weight'] for x in range(len(self.ACR_fixed)): allocation_temp[self.ACR_fixed[x]] = self.ACR_fixed_weight[x] for symbol in self.allocation: if (self.allocation[symbol] != allocation_temp[symbol]) or (self.allocation[symbol] != 0.0) or (allocation_temp[symbol] != 0.0): #if allocation_temp[symbol] > self.allocation[symbol]: self.allocation[symbol] = allocation_temp[symbol] direction = InsightDirection.Flat if self.allocation[symbol] > 0: direction = InsightDirection.Up else: direction = InsightDirection.Flat #insights.append(Insight(symbol, predictionInterval, InsightType.Price, InsightDirection.Up, magnitude[symbol], self.allocation[symbol], sourceModel="Test")) insights.append(Insight(symbol, timedelta(days=30), InsightType.Price, direction, None, None, self.Name, self.allocation[symbol])) #algorithm.Debug(str(symbol) + " = " + str(self.allocation[symbol])) self.rebalancingTime = algorithm.UtcTime + self.rebalancingPeriod self.var.Update_ACR = False return Insight.Group( insights ) def ACR_drawdown(self, returns): mat = returns.cumprod().values [n, m] = np.shape(mat) maxes = np.maximum.accumulate(np.array(mat)) for i in range(0,n): for j in range(m): mat[i,j] = mat[i,j] / maxes[i,j] df = pd.DataFrame(mat) df[df > 1] = 1 return df def ACR_moving_returns(self, returns, w): mat = returns.values [n, m] = np.shape(mat) ret = np.zeros(shape = (n-w+1,m)) for i in range(w-1,n): for j in range(m): ret[i-w+1,j] = np.power(np.prod(mat[(i-w+1):i+1,j]),(1.0/w))- 1.0 return pd.DataFrame(ret) def ACR_get_specs(self, returns): metrics = pd.DataFrame((returns.mean()),columns=['Mean']) - 1.0 metrics['STD'] = pd.DataFrame((returns.std())) metrics['Annualized'] = np.power(returns.cumprod().values.tolist()[-1],1.0/len(returns))- 1.0 downside = returns.copy(deep=True) - 1 downside[downside > 0] = 0 downside = downside ** 2 metrics['DownSide'] = pd.DataFrame(downside.mean() ** 0.5) draw = self.ACR_drawdown(returns) metrics['Max_Draw'] = 1.0 - draw.min().values ret15 = self.ACR_moving_returns(returns,21) metrics['Minimum15'] = ret15.min().values ret10 = self.ACR_moving_returns(returns,21) metrics['Mean10'] = ret10.mean().values metrics['STD10'] = ret10.std().values return metrics def ACR_zscore(self, stocks, var, var_save): stocks[var_save] = (stocks[var] - stocks[var].mean())/stocks[var].std(ddof=0) return stocks def ACR_score(self, metrics): metrics = self.ACR_zscore(metrics, 'Mean', 'ZMean') metrics = self.ACR_zscore(metrics, 'STD', 'ZSTD') metrics = self.ACR_zscore(metrics, 'Annualized', 'ZAnnualized') metrics = self.ACR_zscore(metrics, 'DownSide', 'ZDownSide') metrics = self.ACR_zscore(metrics, 'Max_Draw', 'ZMax_Draw') metrics = self.ACR_zscore(metrics, 'Minimum15', 'ZMinimum15') metrics = self.ACR_zscore(metrics, 'STD10', 'ZSTD10') metrics = self.ACR_zscore(metrics, 'Mean10', 'ZMean10') return metrics def OnSecuritiesChanged(self, algorithm, changes): pass
from clr import AddReference AddReference("QuantConnect.Common") AddReference("QuantConnect.Algorithm.Framework") from QuantConnect import Resolution, Extensions from QuantConnect.Algorithm.Framework.Alphas import InsightCollection, InsightDirection from QuantConnect.Algorithm.Framework.Portfolio import PortfolioConstructionModel, PortfolioTarget from itertools import groupby from datetime import datetime from pytz import utc UTCMIN = datetime.min.replace(tzinfo=utc) import numpy as np class Insight_Weighted_Portfolio(PortfolioConstructionModel): def __init__(self, variables, resolution = Resolution.Daily, reserve = 0, leverage = 1.0, num_alphas = 1.0, normalize = False): self.insightCollection = InsightCollection() self.removedSymbols = [] self.addedSymbols = [] self.rebalancingTime = UTCMIN self.rebalancingPeriod = Extensions.ToTimeSpan(resolution) self.allocation = {} self.reserved = reserve self.max_leverage = leverage self.normalize = normalize self.num_alphas = num_alphas self.var = variables self.insight_list = [] self.symbol_list = [] def CreateTargets(self, algorithm, insights): targets = [] self.insightCollection.AddRange(insights) # Create Sell target for each security that was removed from the universe if self.removedSymbols is not None: universeDeselectionTargets = [ PortfolioTarget(symbol, 0) for symbol in self.removedSymbols ] targets.extend(universeDeselectionTargets) self.removedSymbols = None if self.addedSymbols is not None: for symbol in self.addedSymbols: self.allocation[symbol] = {} self.addedSymbols = None #if algorithm.UtcTime <= self.rebalancingTime: if not self.var.rebalance: return targets # Get insight that haven't expired of each symbol that is still in the universe activeInsights = self.insightCollection.GetActiveInsights(algorithm.UtcTime) # Get the last generated active insight for each symbol lastActiveInsights = [] for symbol, g in groupby(activeInsights, lambda x: x.Symbol): lastActiveInsights.append(sorted(g, key = lambda x: x.GeneratedTimeUtc)[-1]) #Gets Adjusted Portfolio value port_value_adjusted = ((float(algorithm.Portfolio.TotalPortfolioValue)* self.max_leverage) - self.reserved) for insight in lastActiveInsights: if insight in self.insight_list: #continue pass #self.insight_list.append(insight) symbol = insight.Symbol alpha = insight.SourceModel if insight.Weight: allocation = insight.Weight else: allocation = 0 #Calculate required rebalance amount for each allocation if self.num_alphas == 0: return targets else: goal_value = (port_value_adjusted * allocation / self.num_alphas) #Adjusted % allocation if float(algorithm.Portfolio.TotalPortfolioValue) == 0.0: return targets else: try: self.allocation[symbol][alpha] = goal_value/float(algorithm.Portfolio.TotalPortfolioValue) except: pass #algorithm.Log(str(symbol) + " " + str(alpha)) combined_weight = {} for symbol in self.allocation.keys(): wt = sum(list(self.allocation[symbol].values())) combined_weight[symbol] = round(wt*200.0)/200.0 #option to normalize target_portfolio to fill all availible leverage original_wt = sum(np.abs(list(combined_weight.values()))) if original_wt == 0.0: factor = 0.0 else: if self.normalize: factor = self.max_leverage/original_wt elif original_wt > self.max_leverage: factor = self.max_leverage/original_wt else: factor = 1.0 for symbol in combined_weight.keys(): combined_weight[symbol] = combined_weight[symbol]*factor if algorithm.Securities[symbol].Price == 0.0: continue target = PortfolioTarget.Percent(algorithm, symbol, combined_weight[symbol]) if target != None: targets.append(target) #Log final desired allocation if str(symbol) in self.var.assets_Leveraged: continue algorithm.Log(str(symbol) + " : " + str(round(combined_weight[symbol]*100,1)) + "% = " + str(int(target.Quantity)) + " shares.") else: pass #algorithm.Debug("Target Error on " + str(symbol) + ". Should be " + str(round(combined_weight[symbol]*100,1)) + "%") #self.rebalancingTime = algorithm.UtcTime + self.rebalancingPeriod #algorithm.Debug("rebalance finished at " + str(algorithm.UtcTime)) self.var.rebalance = False return targets def OnSecuritiesChanged(self, algorithm, changes): self.addedSymbols = [x.Symbol for x in changes.AddedSecurities] # Get removed symbol and invalidate them in the insight collection self.removedSymbols = [x.Symbol for x in changes.RemovedSecurities] self.insightCollection.Clear(self.removedSymbols)
from clr import AddReference AddReference("QuantConnect.Common") AddReference("QuantConnect.Algorithm.Framework") from QuantConnect import Resolution, Extensions from QuantConnect.Algorithm.Framework.Alphas import * from QuantConnect.Algorithm.Framework.Portfolio import * from itertools import groupby from datetime import datetime, timedelta from pytz import utc UTCMIN = datetime.min.replace(tzinfo=utc) import numpy as np import pandas as pd from scipy.optimize import minimize class Optimized_Portfolio_Model(PortfolioConstructionModel): ''' Description: Allocate optimal weights to each security in order to optimize the portfolio objective function provided Details: - The target percent holdings of each security is 1/N where N is the number of securities with active Up/Down insights - For InsightDirection.Up, long targets are returned - For InsightDirection.Down, short targets are returned - For InsightDirection.Flat, closing position targets are returned ''' def __init__(self, objectiveFunction = 'std', rebalancingParam = False, reserve = 0, leverage = 1.0, num_alphas = 1.0, normalize = False): ''' Description: Initialize a new instance of CustomOptimizationPortfolioConstructionModel Args: objectiveFunction: The function to optimize. If set to 'equal', it will just perform equal weighting rebalancingParam: Integer indicating the number of days for rebalancing (default set to False, no rebalance) - Independent of this parameter, the portfolio will be rebalanced when a security is added/removed/changed direction ''' self.optWeights = None self.objectiveFunction = objectiveFunction self.insightCollection = InsightCollection() self.removedSymbols = [] self.nextExpiryTime = UTCMIN self.rebalancingTime = UTCMIN self.allocation = {} self.reserved = reserve self.max_leverage = leverage self.normalize = normalize self.num_alphas = num_alphas if objectiveFunction != 'equal': # minWeight set to 0 to ensure long only weights self.optimizer = CustomPortfolioOptimizer(minWeight = 0, maxWeight = self.max_leverage, objFunction = objectiveFunction) # initialize the optimize # if the rebalancing parameter is not False but a positive integer # convert rebalancingParam to timedelta and create rebalancingFunc if rebalancingParam > 0: self.rebalancing = True rebalancingParam = timedelta(days = rebalancingParam) self.rebalancingFunc = lambda dt: dt + rebalancingParam else: self.rebalancing = rebalancingParam def CreateTargets(self, algorithm, insights): ''' Description: Create portfolio targets from the specified insights Args: algorithm: The algorithm instance insights: The insights to create portfolio targets from Returns: An enumerable of portfolio targets to be sent to the execution model ''' targets = [] # check if we have new insights coming from the alpha model or if some existing insights have expired # or if we have removed symbols from the universe if (len(insights) == 0 and algorithm.UtcTime <= self.nextExpiryTime and self.removedSymbols is None): return targets # here we get the new insights and add them to our insight collection for insight in insights: self.insightCollection.Add(insight) # create flatten target for each security that was removed from the universe if self.removedSymbols is not None: universeDeselectionTargets = [ PortfolioTarget(symbol, 0) for symbol in self.removedSymbols ] targets.extend(universeDeselectionTargets) self.removedSymbols = None # get insight that haven't expired of each symbol that is still in the universe activeInsights = self.insightCollection.GetActiveInsights(algorithm.UtcTime) # get the last generated active insight for each symbol lastActiveInsights = [] for symbol, g in groupby(activeInsights, lambda x: x.Symbol): lastActiveInsights.append(sorted(g, key = lambda x: x.GeneratedTimeUtc)[-1]) # check if we actually want to create new targets for the securities (check function ShouldCreateTargets for details) if self.rebalancing and algorithm.UtcTime >= self.rebalancingTime: #check for empty insights if lastActiveInsights == []: return targets #Gets Adjusted Portfolio value port_value_adjusted = ((float(algorithm.Portfolio.TotalPortfolioValue)* self.max_leverage) - self.reserved) for insight in lastActiveInsights: symbol = insight.Symbol alpha = insight.SourceModel if insight.Weight: allocation = insight.Weight else: allocation = 0 #Calculate required rebalance amount for each allocation if self.num_alphas == 0: return targets else: goal_value = (port_value_adjusted * allocation / self.num_alphas) #Adjusted % allocation if float(algorithm.Portfolio.TotalPortfolioValue) == 0.0: return targets else: if symbol not in self.allocation: self.allocation[symbol] = 0 self.allocation[symbol] += goal_value/float(algorithm.Portfolio.TotalPortfolioValue) #self.allocation[symbol][alpha] = goal_value/float(algorithm.Portfolio.TotalPortfolioValue) combined_weight = {} for symbol in self.allocation.keys(): #if isinstance(self.allocation[symbol],float): # return targets wt = self.allocation[symbol] if abs(wt) >= 1e-2: combined_weight[symbol] = round(wt*20.0)/20.0 wt_abs = sum(np.abs(list(combined_weight.values()))) factor = 1.0 if wt_abs > self.max_leverage: factor = self.max_leverage/wt_abs elif self.normalize and wt_abs > 0: factor = self.max_leverage/wt_abs for symbol in combined_weight.keys(): combined_weight[symbol] = combined_weight[symbol]*factor # symbols with active insights lastActiveSymbols = list(combined_weight.keys()) # get historical data for all symbols for the last 253 trading days (to get 252 returns) history = algorithm.History(lastActiveSymbols, 22, Resolution.Daily) #253 # empty dictionary for calculations calculations = {} # iterate over all symbols and perform calculations for symbol in lastActiveSymbols: if (str(symbol) not in history.index or history.loc[str(symbol)].get('close') is None or history.loc[str(symbol)].get('close').isna().any()): algorithm.Log('(Portfolio) no historical data for: ' + str(symbol.Value)) continue else: # add symbol to calculations calculations[symbol] = SymbolData(symbol) try: # get series of log-returns calculations[symbol].CalculateLogReturnSeries(history) except Exception: algorithm.Log('(Portfolio) removing from calculations due to CalculateLogReturnSeries failing: ' + str(symbol.Value)) calculations.pop(symbol) continue # determine target percent for the given insights (check function DetermineTargetPercent for details) self.optWeights = self.DetermineTargetPercent(calculations, lastActiveInsights, initial_wt = combined_weight) #errorSymbols = {} if not self.optWeights.isnull().values.any(): algorithm.Log('(Portfolio) optimal weights: ' + str(self.optWeights)) for symbol in self.allocation: if str(symbol) in self.optWeights: # avoid very small numbers and make them 0 if self.optWeights[str(symbol)] <= 1e-2: self.optWeights[str(symbol)] = 0 #algorithm.Plot('Optimal Allocation', symbol.Value, float(self.optWeights[str(symbol)])) self.allocation[symbol] = self.optWeights[str(symbol)] target = PortfolioTarget.Percent(algorithm, symbol, self.allocation[symbol]) if not target is None: targets.append(target) #else: # errorSymbols[symbol] = symbol else: self.allocation[symbol] = 0.0 target = PortfolioTarget.Percent(algorithm, symbol, self.allocation[symbol]) if not target is None: targets.append(target) #else: # errorSymbols[symbol] = symbol # update rebalancing time if self.rebalancing: self.rebalancingTime = self.rebalancingFunc(algorithm.UtcTime) # get expired insights and create flatten targets for each symbol expiredInsights = self.insightCollection.RemoveExpiredInsights(algorithm.UtcTime) expiredTargets = [] for symbol, f in groupby(expiredInsights, lambda x: x.Symbol): if not self.insightCollection.HasActiveInsights(symbol, algorithm.UtcTime): expiredTargets.append(PortfolioTarget(symbol, 0)) continue targets.extend(expiredTargets) # here we update the next expiry date in the insight collection self.nextExpiryTime = self.insightCollection.GetNextExpiryTime() if self.nextExpiryTime is None: self.nextExpiryTime = UTCMIN return targets def ShouldCreateTargets(self, algorithm, optWeights, lastActiveInsights): ''' Description: Determine whether we should rebalance the portfolio to keep equal weighting when: - It is time to rebalance regardless - We want to include some new security in the portfolio - We want to modify the direction of some existing security Args: optWeights: Series containing the current optimal weight for each security lastActiveInsights: The last active insights to check ''' # it is time to rebalance if self.rebalancing and algorithm.UtcTime >= self.rebalancingTime: return True for insight in lastActiveInsights: # if there is an insight for a new security that's not invested and it has no existing optimal weight, then rebalance if (not algorithm.Portfolio[insight.Symbol].Invested and insight.Direction != InsightDirection.Flat and str(insight.Symbol) not in optWeights): return True # if there is an insight to close a long position, then rebalance elif algorithm.Portfolio[insight.Symbol].IsLong and insight.Direction != InsightDirection.Up: return True # if there is an insight to close a short position, then rebalance elif algorithm.Portfolio[insight.Symbol].IsShort and insight.Direction != InsightDirection.Down: return True else: continue return False def DetermineTargetPercent(self, calculations, lastActiveInsights, initial_wt): ''' Description: Determine the target percent for each symbol provided Args: calculations: Dictionary with calculations for symbols lastActiveInsights: Dictionary with calculations for symbols ''' if self.objectiveFunction == 'equal': # give equal weighting to each security count = 0 for symbol, wt in initial_wt: if abs(wt) > 0: count += 1 percent = 0 if count == 0 else 1.0 / count result = {} for symbol in initial_wt: if wt < 0: result[str(symbol)] = -percent else: result[str(symbol)] = percent weights = pd.Series(result) return weights elif self.objectiveFunction == 'no_optimization': result = {} for symbol in initial_wt: result[str(symbol)] = initial_wt[symbol] weights = pd.Series(result) return weights else: # create a dictionary keyed by the symbols in calculations with a pandas.Series as value to create a dataframe of log-returns logReturnsDict = { str(symbol): symbolData.logReturnSeries for symbol, symbolData in calculations.items() } logReturnsDf = pd.DataFrame(logReturnsDict) # portfolio optimizer finds the optimal weights for the given data weights = self.optimizer.Optimize(logReturnsDf, initial_wt, covariance = None) weights = pd.Series(weights, index = logReturnsDf.columns) return weights def OnSecuritiesChanged(self, algorithm, changes): ''' Description: Event fired each time the we add/remove securities from the data feed Args: algorithm: The algorithm instance that experienced the change in securities changes: The security additions and removals from the algorithm ''' # get removed symbol and invalidate them in the insight collection self.removedSymbols = [x.Symbol for x in changes.RemovedSecurities] self.insightCollection.Clear(self.removedSymbols) class SymbolData: ''' Contain data specific to a symbol required by this model ''' def __init__(self, symbol): self.Symbol = symbol self.logReturnSeries = None def CalculateLogReturnSeries(self, history): ''' Calculate the log-returns series for each security ''' self.logReturnSeries = np.log(1 + history.loc[str(self.Symbol)]['close'].pct_change(periods = 1).dropna()) # 1-day log-returns ### class containing the CustomPortfolioOptimizer ----------------------------------------------------------------------------------------- class CustomPortfolioOptimizer: ''' Description: Implementation of a custom optimizer that calculates the weights for each asset to optimize a given objective function Details: Optimization can be: - Maximize Portfolio Return - Minimize Portfolio Standard Deviation - Maximize Portfolio Sharpe Ratio Constraints: - Weights must be between some given boundaries - Weights must sum to 1 ''' def __init__(self, minWeight = -1, maxWeight = 1, objFunction = 'std'): ''' Description: Initialize the CustomPortfolioOptimizer Args: minWeight(float): The lower bound on portfolio weights maxWeight(float): The upper bound on portfolio weights objFunction: The objective function to optimize (return, std, sharpe) ''' self.minWeight = minWeight self.maxWeight = maxWeight self.objFunction = objFunction def Optimize(self, historicalLogReturns, initial_wt, covariance = None): ''' Description: Perform portfolio optimization using a provided matrix of historical returns and covariance (optional) Args: historicalLogReturns: Matrix of historical log-returns where each column represents a security and each row log-returns for the given date/time (size: K x N) covariance: Multi-dimensional array of double with the portfolio covariance of returns (size: K x K) Returns: Array of double with the portfolio weights (size: K x 1) ''' # if no covariance is provided, calculate it using the historicalLogReturns if covariance is None: covariance = historicalLogReturns.cov() size = historicalLogReturns.columns.size # K x 1 #x0 = np.array(size * [1. / size]) x0 = np.array(list(initial_wt.values())) #initial guess is alpha's allocation # apply equality constraints constraints = ({'type': 'eq', 'fun': lambda weights: self.GetBudgetConstraint(weights)}) opt = minimize(lambda weights: self.ObjectiveFunction(weights, historicalLogReturns, covariance), # Objective function x0, # Initial guess bounds = self.GetBoundaryConditions(size), # Bounds for variables constraints = constraints, # Constraints definition method = 'SLSQP') # Optimization method: Sequential Least Squares Programming return opt['x'] def ObjectiveFunction(self, weights, historicalLogReturns, covariance): ''' Description: Compute the objective function Args: weights: Portfolio weights historicalLogReturns: Matrix of historical log-returns covariance: Covariance matrix of historical log-returns ''' # calculate the annual return of portfolio annualizedPortfolioReturns = np.sum(historicalLogReturns.mean() * 21 * weights) #252 # calculate the annual standard deviation of portfolio annualizedPortfolioStd = np.sqrt( np.dot(weights.T, np.dot(covariance * 21, weights)) ) #252 if annualizedPortfolioStd == 0: raise ValueError(f'CustomPortfolioOptimizer.ObjectiveFunction: annualizedPortfolioStd cannot be zero. Weights: {weights}') # calculate annual sharpe ratio of portfolio annualizedPortfolioSharpeRatio = (annualizedPortfolioReturns / annualizedPortfolioStd) if self.objFunction == 'sharpe': return -annualizedPortfolioSharpeRatio # convert to negative to be minimized elif self.objFunction == 'return': return -annualizedPortfolioReturns # convert to negative to be minimized elif self.objFunction == 'std': return annualizedPortfolioStd else: raise ValueError(f'CustomPortfolioOptimizer.ObjectiveFunction: objFunction input has to be one of sharpe, return or std') def GetBoundaryConditions(self, size): ''' Create the boundary condition for the portfolio weights ''' return tuple((self.minWeight, self.maxWeight) for x in range(size)) def GetBudgetConstraint(self, weights): ''' Define a budget constraint: the sum of the weights equal to maximum leverage''' return np.sum(weights) - self.maxWeight
from clr import AddReference AddReference("QuantConnect.Algorithm.Framework") AddReference("QuantConnect.Indicators") AddReference("QuantConnect.Common") from QuantConnect import * from QuantConnect.Indicators import * from QuantConnect.Algorithm.Framework.Alphas import * import math import pandas as pd import numpy as np from scipy.optimize import minimize from datetime import datetime, timedelta from pytz import utc UTCMIN = datetime.min.replace(tzinfo=utc) import numpy as np class Alpha_CAA: def __init__(self, variables, *args, **kwargs): self.Name = "Alpha_CAA" self.var = variables """ VAA parameters """ self.resolution = Resolution.Daily self.rebalancingTime = UTCMIN self.days_predicted = 5 self.rebalancingPeriod = Extensions.ToTimeSpan(self.resolution)*self.days_predicted self.tickers = [ "SPY", "IJR", "EEM", "EFA", "GLD", "TLT", "VNQ", "XLE", "XLK", "XLY", "XLU", "XLP", "XLI", "XLF", "XLV", "XLB" ] #Variable Definitions self.allocation = {} for symbol in self.tickers: self.allocation[symbol] = 0.0 def Update(self, algorithm, data): insights = [] if not self.var.Update_CAA: return insights df_price = algorithm.History(self.tickers, 252, Resolution.Daily)["close"].unstack(level=0) #df_price = pd.DataFrame(data,columns=data.keys()) daily_return = (df_price / df_price.shift(1)).dropna() #algorithm.Debug("df_price = " + str(df_price)) #algorithm.Debug("daily_return = " + str(daily_return)) a = PortfolioOptimization(daily_return, 0, len(self.tickers)) opt_weight = a.opt_portfolio() #algorithm.Debug("opt_weight = " + str(opt_weight)) allocation_temp = {} if math.isnan(sum(opt_weight)): self.rebalancingTime = algorithm.UtcTime + self.rebalancingPeriod return insights for i in range(len(self.tickers)): symbol = df_price.columns[i] weight = opt_weight[i] allocation_temp[symbol] = weight d = {key: abs(allocation_temp[key] - self.allocation.get(key, 0)) for key in allocation_temp.keys()} difference = sum(d.values()) for ticker, weight in allocation_temp.items(): symbol = algorithm.Symbol(ticker).Value if self.allocation[symbol] != weight and difference >= .05: if weight > 0: direction = InsightDirection.Up else: direction = InsightDirection.Flat #insights.append(Insight(symbol, predictionInterval, InsightType.Price, direction, magnitude, weight, sourceModel="Test")) insights.append(Insight(symbol, timedelta(days=30), InsightType.Price, direction, None, None, self.Name, weight)) self.allocation[symbol] = weight self.rebalancingTime = algorithm.UtcTime + self.rebalancingPeriod self.var.Update_CAA = False return Insight.Group(insights) def OnSecuritiesChanged(self, algorithm, changes): pass class PortfolioOptimization(object): def __init__(self, df_return, risk_free_rate, num_assets): self.daily_return = df_return self.n = num_assets # numbers of risk assets in portfolio def annual_port_return(self, weights): # calculate the annual return of portfolio return np.sum(self.daily_return.mean() * weights) * 252 def annual_port_vol(self, weights): # calculate the annual volatility of portfolio return np.sqrt(np.dot(weights.T, np.dot(self.daily_return.cov() * 252, weights))) def min_func(self, weights): # method 1: maximize sharp ratio return - (self.annual_port_return(weights)) / self.annual_port_vol(weights) def opt_portfolio(self): # maximize the sharpe ratio to find the optimal weights cons = ({'type': 'eq', 'fun': lambda x: np.sum(np.abs(x)) - 1}) bnds = [(0, .5)] * self.n opt = minimize(self.min_func, # object function np.array(self.n * [1. / self.n]), # initial value method='SLSQP', # optimization method bounds=bnds, # bounds for variables constraints=cons) # constraint conditions opt_weights = opt['x'] return opt_weights
from clr import AddReference AddReference("QuantConnect.Algorithm.Framework") AddReference("QuantConnect.Indicators") AddReference("QuantConnect.Common") import math from QuantConnect import * from QuantConnect.Algorithm.Framework.Alphas import * class Alpha_SimpleRebalance: def __init__(self, variables, *args, **kwargs): self.Name = "Alpha_SimpleRebalance" self.var = variables #assets self.stocks = [ "QQQ", "TLT", "EFZ", "IEF", "VXX", ] """ zscore parameters """ self.fixed_wt_pct = 1.0 self.fixed_wt = { 'QQQ':0.50, #.5 'TLT':0.30, #.3 'EFZ':0.05, #.05 'IEF':0.10, #.1 'VXX':0.05, #.05 } self.allocation = {} for symbol in self.stocks: self.allocation[symbol] = 0.0 def Update(self, algorithm, data): insights = [] allocation_temp = {} magnitude = {} direction = InsightDirection.Flat for symbol in self.stocks: if not algorithm.Securities.ContainsKey(symbol): continue if algorithm.Securities[symbol].Price == 0.0: continue allocation_temp[symbol] = 0.0 if symbol in self.fixed_wt: allocation_temp[symbol] = self.fixed_wt[symbol] * self.fixed_wt_pct if self.var.Update_SimpleRebalance: for symbol in allocation_temp: self.allocation[symbol] = allocation_temp[symbol] if self.allocation[symbol] > 0: direction = InsightDirection.Up else: direction = InsightDirection.Flat insights.append(Insight(symbol, timedelta(days = 30), InsightType.Price, direction, None, None, self.Name, self.allocation[symbol])) #algorithm.Debug(str(symbol) + " = " + str(self.allocation[symbol])) self.var.Update_SimpleRebalance = False return Insight.Group( insights ) def OnSecuritiesChanged(self, algorithm, changes): pass
from clr import AddReference AddReference("System") AddReference("QuantConnect.Common") AddReference("QuantConnect.Algorithm") AddReference("QuantConnect.Algorithm.Framework") from QuantConnect import * from QuantConnect.Algorithm import * from QuantConnect.Algorithm.Framework import * from QuantConnect.Algorithm.Framework.Portfolio import PortfolioTarget from QuantConnect.Algorithm.Framework.Risk import RiskManagementModel class Risk_Synthetic_Leverage(RiskManagementModel): '''Provides an implementation of IRiskManagementModel that that limits the sector exposure to the specified percentage''' def __init__(self, variables, drawdown_leverageMax = 0.03, drawdown_leverageMin = 0.06): self.leverageMax = abs(drawdown_leverageMax) # the % drawdown above which all levearged assets will be used self.leverageMin = abs(drawdown_leverageMin) # the % drawdown below which no levearged assets will be used self.trailingHighs = dict() self.trailingLows = dict() self.targetsCollection = PortfolioTargetCollection() self.risk_symbols = { 'QQQ':'TQQQ', 'TLT':'TMF', 'IEF':'TYD', 'SPY':'SPXL', } def ManageRisk(self, algorithm, targets): '''Manages the algorithm's risk at each time step Args: algorithm: The algorithm instance''' #maximumSectorExposureValue = float(algorithm.Portfolio.TotalPortfolioValue) * self.maximumSectorExposure self.targetsCollection.AddRange(targets) risk_targets = list() if not list(targets) == []: pass #algorithm.Debug("risk management started at " + str(algorithm.UtcTime)) for target in targets: symbol = target.Symbol security = algorithm.Securities[symbol] quantity = target.Quantity if str(symbol) not in self.risk_symbols: continue if not algorithm.Securities.ContainsKey(symbol): continue if not algorithm.Securities.ContainsKey(self.risk_symbols[str(symbol)]): continue if security.Price == 0.0: continue if algorithm.Securities[self.risk_symbols[str(symbol)]].Price == 0.0: continue try: self.trailingHighs[symbol] = max(algorithm.History(symbol, 252, Resolution.Daily).high) # Set to highest of past x days except: continue # Check for securities past the drawdown limit securityHigh = self.trailingHighs[symbol] drawdown = -1*((security.Low / securityHigh) - 1) non_levered_pct = max(0.0,min(1.0,(drawdown - self.leverageMax)/(self.leverageMin - self.leverageMax))) levered_pct = 1.0-non_levered_pct profit = security.Holdings.UnrealizedProfitPercent if profit > .10: levered_pct = levered_pct * .5 non_levered_pct = 1.0 - levered_pct total_value = quantity * security.Price non_levered_quant = int((total_value * non_levered_pct) / security.Price) levered_quant = int((total_value * levered_pct) / algorithm.Securities[self.risk_symbols[str(symbol)]].Price) risk_targets.append(PortfolioTarget(symbol, non_levered_quant)) risk_targets.append(PortfolioTarget(self.risk_symbols[str(symbol)], levered_quant)) algorithm.Log("LEVERAGE: " + str(symbol) + " : " + str(round(non_levered_pct*100,1)) + "% = " + str(int(non_levered_quant)) + " shares.") algorithm.Log("LEVERAGE: " + str(self.risk_symbols[str(symbol)]) + " : " + str(round(levered_pct*100,1)) + "% = " + str(int(levered_quant)) + " shares.") return risk_targets