Overall Statistics |
Total Trades 0 Average Win 0% Average Loss 0% Compounding Annual Return 0% Drawdown 0% Expectancy 0 Net Profit 0% Sharpe Ratio 0 Loss Rate 0% Win Rate 0% Profit-Loss Ratio 0 Alpha 0 Beta 0 Annual Standard Deviation 0 Annual Variance 0 Information Ratio 0 Tracking Error 0 Treynor Ratio 0 Total Fees $0.00 |
from clr import AddReference AddReference("System") AddReference("QuantConnect.Algorithm") AddReference("QuantConnect.Common") from System import * from QuantConnect import * from QuantConnect.Algorithm import * from QuantConnect.Indicators import * from QuantConnect.Data import SubscriptionDataSource from QuantConnect.Python import PythonData from QuantConnect.Orders import OrderStatus from QuantConnect.Orders.Fills import ImmediateFillModel from datetime import date, timedelta, datetime import numpy as np import pandas as pd import decimal as d import scipy.stats as stats import talib class cla_test(QCAlgorithm): proxies= ["SPY","QQQ","EEM"] caa_lower_bounds = [[0.00],[0.00],[0.00]] caa_upper_bounds = [[0.25],[0.25],[1.00]] caa_tv = 0.08 benchmark=["EXTSPY"] lookback_mom = 6 #total loopback lookback_corr=3 requiredBarsWarmup=lookback_mom*21 def Initialize(self): # Set the cash we'd like to use for our backtest # This is ignored in live trading self.SetCash(100000) # Start and end dates for the backtest. # These are ignored in live trading. self.SetStartDate(2015,1,1) self.SetEndDate(2017,10,2) self.SetBenchmark("SPY"); # Add assets to be used for symbol in list(set(self.proxies)|set(self.benchmark)): self.AddEquity(symbol, Resolution.Daily) self.SetWarmup(self.requiredBarsWarmup) # Schedule functions self.Schedule.On(self.DateRules.MonthStart("SPY"), self.TimeRules.BeforeMarketClose("SPY", 30), Action(self.Rebalance)) def OnData(self, slice): pass def Rebalance(self): if self.IsWarmingUp: return h = self.History(self.proxies,21*self.lookback_mom,Resolution.Daily) price_hist = h.close.unstack(level=0) n=len(self.proxies) r=np.log(price_hist[-21*self.lookback_corr:-1]).diff().dropna() covar = r.cov().values p_month=price_hist.resample('M').last() # next expected return computation R1 = p_month.iloc[[(-1-1),-1]].pct_change()[1:] #dummie R2 = p_month.iloc[[(-2-1),-1]].pct_change()[1:] R3 = p_month.iloc[[(-3-1),-1]].pct_change()[1:] R4 = p_month.iloc[[(-4-1),-1]].pct_change()[1:] R_mom = (R1 + R2 + R3 + R4) / 22 mean = R_mom.values # expected return mean=mean.reshape(n,1) mean=mean.reshape(n,1) lower_bounds = self.caa_lower_bounds upper_bounds = self.caa_upper_bounds try: cla = CLA(mean, covar, lower_bounds, upper_bounds) #KO decomment cla.solve() #KO problem also here # weights = pd.Series(getWeights(cla,context.caa_tv).flatten(), index=r.columns) except Exception as e: self.Debug("\n>>>>>>>>>>>>>>>>>>>>>>EXCEPTION:"+str(e)) def get_weights(cla,tv): mu,sigma,weights=cla.efFrontier(1000) # get effective fronter tv = tv / pow(252,0.5) # set target volatility, transfer to annual diff = 1000.0 index = 0 for i,value in enumerate(sigma): diff_now = abs(value - tv) if diff_now < diff: diff = diff_now index = i return weights[index] class CLA: def __init__(self,mean,covar,lB,uB): # Initialize the class self.mean=mean self.covar=covar self.lB=lB self.uB=uB self.w=[] # solution self.l=[] # lambdas self.g=[] # gammas self.f=[] # free weights def solve(self): # Compute the turning points,free sets and weights f,w=self.initAlgo() self.w.append(np.copy(w)) # store solution self.l.append(None) self.g.append(None) self.f.append(f[:]) while True: #1) case a): Bound one free weight l_in=None if len(f)>1: covarF,covarFB,meanF,wB=self.getMatrices(f) covarF_inv=np.linalg.inv(covarF) j=0 for i in f: l,bi=self.computeLambda(covarF_inv,covarFB,meanF,wB,j,[self.lB[i],self.uB[i]]) if l>l_in:l_in,i_in,bi_in=l,i,bi j+=1 #2) case b): Free one bounded weight l_out=None if len(f)<self.mean.shape[0]: b=self.getB(f) for i in b: covarF,covarFB,meanF,wB=self.getMatrices(f+[i]) covarF_inv=np.linalg.inv(covarF) l,bi=self.computeLambda(covarF_inv,covarFB,meanF,wB,meanF.shape[0]-1, \ self.w[-1][i]) if (self.l[-1]==None or l<self.l[-1]) and l>l_out:l_out,i_out=l,i if (l_in==None or l_in<0) and (l_out==None or l_out<0): #3) compute minimum variance solution self.l.append(0) covarF,covarFB,meanF,wB=self.getMatrices(f) covarF_inv=np.linalg.inv(covarF) meanF=np.zeros(meanF.shape) else: #4) decide lambda if l_in>l_out: self.l.append(l_in) f.remove(i_in) w[i_in]=bi_in # set value at the correct boundary else: self.l.append(l_out) f.append(i_out) covarF,covarFB,meanF,wB=self.getMatrices(f) covarF_inv=np.linalg.inv(covarF) #5) compute solution vector wF,g=self.computeW(covarF_inv,covarFB,meanF,wB) for i in range(len(f)):w[f[i]]=wF[i] self.w.append(np.copy(w)) # store solution self.g.append(g) self.f.append(f[:]) if self.l[-1]==0:break #6) Purge turning points self.purgeNumErr(10e-10) self.purgeExcess() def initAlgo(self): # Initialize the algo #1) Form structured array a=np.zeros((self.mean.shape[0]),dtype=[('id',int),('mu',float)]) b=[self.mean[i][0] for i in range(self.mean.shape[0])] # dump array into list a[:]=zip(range(self.mean.shape[0]),b) # fill structured array #2) Sort structured array b=np.sort(a,order='mu') #3) First free weight i,w=b.shape[0],np.copy(self.lB) while sum(w)<1: i-=1 w[b[i][0]]=self.uB[b[i][0]] w[b[i][0]]+=1-sum(w) return [b[i][0]],w def computeBi(self,c,bi): if c>0: bi=bi[1][0] if c<0: bi=bi[0][0] return bi def computeW(self,covarF_inv,covarFB,meanF,wB): #1) compute gamma onesF=np.ones(meanF.shape) g1=np.dot(np.dot(onesF.T,covarF_inv),meanF) g2=np.dot(np.dot(onesF.T,covarF_inv),onesF) if wB==None: g,w1=float(-self.l[-1]*g1/g2+1/g2),0 else: onesB=np.ones(wB.shape) g3=np.dot(onesB.T,wB) g4=np.dot(covarF_inv,covarFB) w1=np.dot(g4,wB) g4=np.dot(onesF.T,w1) g=float(-self.l[-1]*g1/g2+(1-g3+g4)/g2) #2) compute weights w2=np.dot(covarF_inv,onesF) w3=np.dot(covarF_inv,meanF) return -w1+g*w2+self.l[-1]*w3,g def computeLambda(self,covarF_inv,covarFB,meanF,wB,i,bi): #1) C onesF=np.ones(meanF.shape) c1=np.dot(np.dot(onesF.T,covarF_inv),onesF) c2=np.dot(covarF_inv,meanF) c3=np.dot(np.dot(onesF.T,covarF_inv),meanF) c4=np.dot(covarF_inv,onesF) c=-c1*c2[i]+c3*c4[i] if c==0:return None,None #2) bi if type(bi)==list:bi=self.computeBi(c,bi) #3) Lambda if wB==None: # All free assets return float((c4[i]-c1*bi)/c),bi else: onesB=np.ones(wB.shape) l1=np.dot(onesB.T,wB) l2=np.dot(covarF_inv,covarFB) l3=np.dot(l2,wB) l2=np.dot(onesF.T,l3) return float(((1-l1+l2)*c4[i]-c1*(bi+l3[i]))/c),bi def getMatrices(self,f): # Slice covarF,covarFB,covarB,meanF,meanB,wF,wB covarF=self.reduceMatrix(self.covar,f,f) meanF=self.reduceMatrix(self.mean,f,[0]) b=self.getB(f) covarFB=self.reduceMatrix(self.covar,f,b) wB=self.reduceMatrix(self.w[-1],b,[0]) return covarF,covarFB,meanF,wB def getB(self,f): return self.diffLists(range(self.mean.shape[0]),f) def diffLists(self,list1,list2): return list(set(list1)-set(list2)) def reduceMatrix(self,matrix,listX,listY): # Reduce a matrix to the provided list of rows and columns if len(listX)==0 or len(listY)==0:return matrix_=matrix[:,listY[0]:listY[0]+1] for i in listY[1:]: a=matrix[:,i:i+1] matrix_=np.append(matrix_,a,1) matrix__=matrix_[listX[0]:listX[0]+1,:] for i in listX[1:]: a=matrix_[i:i+1,:] matrix__=np.append(matrix__,a,0) return matrix__ def purgeNumErr(self,tol): # Purge violations of inequality constraints (associated with ill-conditioned covar matrix) i=0 while True: flag=False if i==len(self.w):break if abs(sum(self.w[i])-1)>tol: flag=True else: for j in range(self.w[i].shape[0]): if self.w[i][j]-self.lB[j]<-tol or self.w[i][j]-self.uB[j]>tol: flag=True;break if flag==True: del self.w[i] del self.l[i] del self.g[i] del self.f[i] else: i+=1 return def purgeExcess(self): # Remove violations of the convex hull i,repeat=0,False while True: if repeat==False:i+=1 if i==len(self.w)-1:break w=self.w[i] mu=np.dot(w.T,self.mean)[0,0] j,repeat=i+1,False while True: if j==len(self.w):break w=self.w[j] mu_=np.dot(w.T,self.mean)[0,0] if mu<mu_: del self.w[i] del self.l[i] del self.g[i] del self.f[i] repeat=True break else: j+=1 return def getMinVar(self): # Get the minimum variance solution var=[] for w in self.w: a=np.dot(np.dot(w.T,self.covar),w) var.append(a) return min(var)**.5,self.w[var.index(min(var))] def getMaxSR(self): # Get the max Sharpe ratio portfolio #1) Compute the local max SR portfolio between any two neighbor turning points w_sr,sr=[],[] for i in range(len(self.w)-1): w0=np.copy(self.w[i]) w1=np.copy(self.w[i+1]) kargs={'minimum':False,'args':(w0,w1)} a,b=self.goldenSection(self.evalSR,0,1,**kargs) w_sr.append(a*w0+(1-a)*w1) sr.append(b) return max(sr),w_sr[sr.index(max(sr))] def evalSR(self,a,w0,w1): # Evaluate SR of the portfolio within the convex combination w=a*w0+(1-a)*w1 b=np.dot(w.T,self.mean)[0,0] c=np.dot(np.dot(w.T,self.covar),w)[0,0]**.5 return b/c def goldenSection(self,obj,a,b,**kargs): # Golden section method. Maximum if kargs['minimum']==False is passed from math import log,ceil tol,sign,args=1.0e-9,1,None if 'minimum' in kargs and kargs['minimum']==False:sign=-1 if 'args' in kargs:args=kargs['args'] numIter=int(ceil(-2.078087*log(tol/abs(b-a)))) r=0.618033989 c=1.0-r # Initialize x1=r*a+c*b;x2=c*a+r*b f1=sign*obj(x1,*args);f2=sign*obj(x2,*args) # Loop for i in range(numIter): if f1>f2: a=x1 x1=x2;f1=f2 x2=c*a+r*b;f2=sign*obj(x2,*args) else: b=x2 x2=x1;f2=f1 x1=r*a+c*b;f1=sign*obj(x1,*args) if f1<f2:return x1,sign*f1 else:return x2,sign*f2 def efFrontier(self,points): # Get the efficient frontier mu,sigma,weights=[],[],[] a=np.linspace(0,1,points/len(self.w))[:-1] # remove the 1, to avoid duplications b=range(len(self.w)-1) for i in b: w0,w1=self.w[i],self.w[i+1] if i==b[-1]:a=np.linspace(0,1,points/len(self.w)) # include the 1 in the last iteration for j in a: w=w1*j+(1-j)*w0 weights.append(np.copy(w)) mu.append(np.dot(w.T,self.mean)[0,0]) sigma.append(np.dot(np.dot(w.T,self.covar))[0,0]**.5) return mu,sigma,weights