Overall Statistics |
Total Trades 0 Average Win 0% Average Loss 0% Compounding Annual Return 0% Drawdown 0% Expectancy 0 Net Profit 0% Sharpe Ratio 0 Probabilistic Sharpe Ratio 0% Loss Rate 0% Win Rate 0% Profit-Loss Ratio 0 Alpha 0 Beta 0 Annual Standard Deviation 0 Annual Variance 0 Information Ratio -51.397 Tracking Error 0.047 Treynor Ratio 0 Total Fees $0.00 |
from datetime import timedelta import numpy as np from scipy import stats from collections import deque import math from QuantConnect.Data.UniverseSelection import * from Selection.FundamentalUniverseSelectionModel import FundamentalUniverseSelectionModel class conners_crash(QCAlgorithm): #filteredByPrice = None def Initialize(self): self.SetStartDate(2018, 1, 1) self.SetEndDate(2018, 1, 5) self.SetCash(100000) self.AddUniverse(self.CoarseSelectionFilter) self.UniverseSettings.Resolution = Resolution.Daily self.SetWarmUp(150) self.UniverseSettings.Leverage = 1 #self.SetSecurityInitializer(lambda x: x.SetDataNormalizationMode(DataNormalizationMode.Raw)) self.indi = {} self.rsi = {} self.vola = {} self.indi_Filter = {} self.rsi_Filter = {} self.rsi_w = 3 self.streak_w = 3 self.pct_rank_w = 100 self.vola_w = 100 self.hist_vol = 100 self.max_num_shorts = 5 self.crsi_entry=80 self.crsi_flat=30 def CoarseSelectionFilter(self, coarse): #filtered = [x for x in coarse if x.Price > 5 and x.DollarVolume > 1e9 ] '''You should cap this to run at x day/day(s). It is currently running as often as the resolution and may be returning a very large number of stocks.''' filtered = [x for x in coarse if x.Price > 5 and x.Volume > 1e6 ] '''Run the algorithm without the 10 stock cap, and speed will be very slow''' final_universe = [x.Symbol for x in filtered][:10] #This returns 10 stocks self.Debug(len(final_universe)) return final_universe def OnSecuritiesChanged(self, changes): self.symbolDataBySymbol = {} list_b1 = [] list_b2 = [] list_pop = [] symbols = [ x.Symbol for x in changes.AddedSecurities ] for ticker in symbols: if ticker not in self.symbolDataBySymbol: crsi = CustomConnors( 'My_Custom', self.rsi_w, self.streak_w, self.pct_rank_w) vola = CustomVolatility( 'My_Custom', self.vola_w ) self.RegisterIndicator(ticker, crsi, Resolution.Daily) self.RegisterIndicator(ticker, vola, Resolution.Daily) history = self.History(ticker, max(self.rsi_w, self.streak_w, self.pct_rank_w,self.vola_w), Resolution.Daily) crsi.WarmUp(history) vola.WarmUp(history) symbolData = SymbolData(ticker, crsi, vola) self.symbolDataBySymbol[ticker] = symbolData ''' Carsten: My original version was this below (as you suggested), but i lost somehow ticker in OnData, thas why I disbled the pop for the ticker which were in the portfolio. Now we get an issue in OnData''' symbolsRemoved = [ x.Symbol for x in changes.RemovedSecurities ] for ticker in symbolsRemoved: self.symbolDataBySymbol.pop(ticker, None) '''After created an array of removed securities. You need to remove the securities from the self.symbolDataBySymbol dictionary by doing something along the lines: for stock in symbolsRemoved: self.symbolDataBySymbol.pop(stock, None)''' '''I recommend moving the below lines of code to a new method such as OnData. It is important to keep separation of concerns (OnSecuritiesChanged should only handle incoming and exiting securities; evaluating whether a security is short or not can be done in OnData. ''' #stocks_short = [ x.Symbol for x in self.Portfolio.Values if x.IsShort ] #for ticker in stocks_short: # list_b1.append(str(ticker)) #for ticker in symbolsRemoved: # if not ticker in stocks_short: # # self.symbolDataBySymbol.pop(ticker, None) # list_pop.append(str(ticker)) # else: # list_b2.append(str(ticker)) # #a= ticker def OnData(self, data): '''OnData event is the primary entry point for your algorithm. Each new data point will be pumped in here. Arguments: data: Slice object keyed by symbol containing the stock data 1. The stock’s closing price must be greater than $5/share. 2. average day volume 21 trading > 1e6 shares a day 3. 100-day Historical Volatility >= 100%. 4. Enter Short on ConnorsRSI (CRSI,3,2,10) > 90 (RSI, Streak, Percentage Change) 5. Short next day on a limit order 5% higher. 6. The exit Trade ConnorsRSI < 20 Results from 2007-2017 should be: # Trades: around 500 Win Rate: 73% Avg. Profit Per Trade: 6.5% Avg. Days per Tr%ade: 12 Avg. Win: 16.5% Avg. Loss: 19.5 % ''' stocks_short = [] neutral = [] list_a = [] list_b = [] # only stock with volatilytiy > 100% filter1 = [x[0] for x in self.symbolDataBySymbol.items() if (self.symbolDataBySymbol[x[0]].VOLA.Value > self.hist_vol) ] entry = [x for x in filter1 if (self.symbolDataBySymbol[x].CRSI.Value > self.crsi_entry) ] # find short stocks stocks_short = [ x.Symbol for x in self.Portfolio.Values if x.IsShort ] ''' Carsten: for testing: you can see stocks_short does not match with self.symbolDataBySymbol''' #for x in self.symbolDataBySymbol: # list_a.append(str(x)) #for x in stocks_short: # list_b.append(str(x)) ''' Carsten: to run, you would need this part, but it's not what it want to do ''' #for x in stocks_short: # if not x in self.symbolDataBySymbol: # neutral.append(x) # else: # if self.symbolDataBySymbol[x].CRSI.Value < self.crsi_flat < 30: # neutral.append(x) ''' Carsten: Now we get an issue, because the ticker which are in [ x.Symbol for x in self.Portfolio.Values if x.IsShort ] are not in self.symbolDataBySymbol, I don't know why''' neutral = [x for x in stocks_short if (self.symbolDataBySymbol[x].CRSI.Value < self.crsi_flat)] # calculating size actuall_shorts = len(stocks_short) new_entrys = self.max_num_shorts - actuall_shorts portfolio_cash = self.Portfolio.Cash # Sum of all currencies in account (only settled cash) one_positon_cash = portfolio_cash / new_entrys if new_entrys and entry: for ticker in entry: if new_entrys <= 0: continue numer_stocks = one_positon_cash * 0.95 / self.Securities[ticker].Price self.MarketOrder(ticker, -numer_stocks) new_entrys-=1 for ticker in neutral: self.Liquidate(ticker) class SymbolData: '''Contains data specific to a symbol required by this model''' def __init__(self, symbol, crsi, vola): self.Symbol = symbol self.CRSI = crsi self.VOLA = vola class CustomConnors: def __init__(self, name, rsi_p, streak_p, pct_rank_p): period = max(rsi_p, streak_p, pct_rank_p) self.Name = name self.Time = datetime.min self.IsReady = False self.Value = 0 self.Rsi = 0 self.Streak = 0 self.Pct_Rank = 0 self.queue = deque(maxlen=period) self.rsi_p = rsi_p self.streak_p = streak_p self.pct_rank_p = pct_rank_p def __repr__(self): return "{0} -> IsReady: {1}. Time: {2}. Value: {3}".format(self.Name, self.IsReady, self.Time, self.Value) # Update method is mandatory def Update(self, input): return self.Update_Main(input.Time, input.Close) def Update_warmup(self, input): return self.Update_Main(input.Index, input.close) def Update_Main(self, time, value): #def Update(self, input): #self.queue.appendleft(input.close) # used by warm up #self.queue.appendleft(input.Close) # used by RegisterIndicator self.queue.appendleft(value) # neutral count = len(self.queue) #self.Time = input.Index # used by warm up #self.Time = input.Time # used by RegisterIndicator self.Time = time # neutral self.IsReady = count == self.queue.maxlen #### start here the indicator calulation if self.IsReady: # Connors streak indicator # Cassic RSI from zipline #"100 - (100 / (1 + (ups / downs)))" close = np.array(self.queue)[-self.rsi_p:] diffs = np.diff(close) ups = np.nanmean(np.clip(diffs, 0, np.inf)) downs = abs(np.nanmean(np.clip(diffs, -np.inf,0))) if downs == 0: self.Rsi = 100 else: self.Rsi = 100 - (100 / (1 + (ups / downs))) # Connors streak indicator curstreak = 0 close = np.array(self.queue)[-self.streak_p:] l=len(close) streak = np.zeros(l) for i in range(l): if close[i-1] < close[i]: streak[i] = curstreak = max(1, curstreak + 1) elif close[i-1] > close[i]: streak[i] = curstreak = min(-1, curstreak - 1) else: streak[i] = curstreak = 0 # cal the rsi from streak diffs = np.diff(streak) ups = np.nanmean(np.clip(diffs, 0, np.inf)) downs = abs(np.nanmean(np.clip(diffs, -np.inf, 0))) if downs == 0: self.Streak = 100 else: self.Streak = 100 - (100 / (1 + (ups / downs))) # Connors Pct Rank Indicator #cl = np.zeros(count) close = np.array(self.queue)[-self.pct_rank_p:] #close = np.array([[1, 1],[2, 2],[4,4],[15,15],[226,22]]) daily_returns = np.diff(close) / close[0:-1] # today return greater than how many past returns today_gt_past = daily_returns[-1] > daily_returns [0:-1] # sum of today > past num = sum(today_gt_past) # sum as as percentage l=np.shape(today_gt_past)[0] self.Pct_Rank = num / l * 100 # combined Connor RSI self.Value = (self.Rsi + self.Streak + self.Pct_Rank ) / 3.0 #self.Value = (self.Rsi + self.Streak ) / 2.0 #### finish the custom indicator return self.IsReady def WarmUp(self,history): for tuple in history.itertuples(): self.Update_warmup(tuple) class CustomVolatility: def __init__(self, name, period): self.Name = name self.Time = datetime.min self.IsReady = False self.Value = 0 self.queue = deque(maxlen=period) def __repr__(self): return "{0} -> IsReady: {1}. Time: {2}. Value: {3}".format(self.Name, self.IsReady, self.Time, self.Value) # Update method is mandatory def Update(self, input): return self.Update_Main(input.Time, input.Close) def Update_warmup(self, input): return self.Update_Main(input.Index, input.close) def Update_Main(self, time, value): self.queue.appendleft(value) # neutral count = len(self.queue) self.Time = time # neutral self.IsReady = count == self.queue.maxlen #### start here the indicator calulation if self.IsReady: # [0:-1] is needed to remove last close since diff is one element shorter close = np.array(self.queue) log_close = np.log(close) diffs = np.diff(log_close) self.Value = diffs.std() * math.sqrt(252) * 100 return self.IsReady def WarmUp(self,history): for tuple in history.itertuples(): self.Update_warmup(tuple)