Overall Statistics |
Total Trades 681 Average Win 0.93% Average Loss -1.31% Compounding Annual Return -33.809% Drawdown 49.000% Expectancy -0.048 Net Profit -33.809% Sharpe Ratio -0.656 Probabilistic Sharpe Ratio 2.157% Loss Rate 44% Win Rate 56% Profit-Loss Ratio 0.71 Alpha -0.236 Beta 0.036 Annual Standard Deviation 0.361 Annual Variance 0.13 Information Ratio -0.54 Tracking Error 0.39 Treynor Ratio -6.571 Total Fees $3017.52 |
from datetime import timedelta import numpy as np from scipy import stats from collections import deque import math from QuantConnect.Data.UniverseSelection import * from Selection.FundamentalUniverseSelectionModel import FundamentalUniverseSelectionModel class conners_crash(QCAlgorithm): filteredByPrice = None def Initialize(self): self.SetStartDate(2018, 1, 1) self.SetEndDate(2018, 12, 31) self.SetCash(100000) self.AddUniverse(self.CoarseSelectionFilter) self.UniverseSettings.Resolution = Resolution.Daily self.SetWarmUp(150) self.UniverseSettings.Leverage = 1 #self.DefaultOrderProperties.TimeInForce = TimeInForce.Day self.indi = {} self.rsi = {} self.vola = {} self.indi_Filter = {} self.rsi_Filter = {} self.rsi_w = 3 self.streak_w = 2 self.pct_rank_w = 100 self.vola_w = 100 self.CRSI_entry = 90 self.CRSI_exit = 30 self.securities = [] self.avtive_sec = [] def CoarseSelectionFilter(self, coarse): filtered = [x for x in coarse if x.Price > 5 and x.Volume > 1e6 ] return [x.Symbol for x in filtered] def OnSecuritiesChanged(self, changes): for security in changes.AddedSecurities: #self.avtive_sec.append(security) symbol = security.Symbol ### Connors RSI # DoDo include the standart RSI # initialize indicator self.indi[symbol] = CustomConnors( 'My_Custom', self.rsi_w, self.streak_w, self.pct_rank_w) self.RegisterIndicator(symbol, self.indi[symbol], Resolution.Daily) # warmup indicator history = self.History(symbol, max(self.rsi_w, self.streak_w, self.pct_rank_w), Resolution.Daily) self.indi[symbol].WarmUp(history) ### Volatility # initialize indicator self.vola[symbol] = CustomVolatility( 'My_Custom', self.vola_w ) self.RegisterIndicator(symbol, self.vola[symbol], Resolution.Daily) # warmup indicator history = self.History(symbol, self.vola_w, Resolution.Daily) self.vola[symbol].WarmUp(history) # remove securities for security in changes.RemovedSecurities: #self.avtive_sec.remove(security) symbol = security.Symbol if security in self.indi: self.indi[symbol].remove(security) if security in self.rsi: self.rsi[symbol].remove(security) if security in self.vola: self.vola[symbol].remove(security) def OnData(self, data): ''' ''' insights = [] C_RSI = {} filter1 = [] stocks_short = [] my_invested = [] # only stock with volatilytiy > 100% filter1 = [x[0] for x in self.vola.items() if (self.vola[x[0]].Value > 100) ] # find short stocks my_invested = [ x.Symbol for x in self.Portfolio.Values if x.Invested ] # check if it is short for ticker in my_invested: if self.Portfolio[ticker].IsShort: stocks_short.append(ticker) ## filtering for buy candiates short = [x for x in filter1 if (self.indi[x].Value > self.CRSI_entry) ] ## filter portfolio for exit candiates neutral = [x for x in stocks_short if (self.indi[x].Value < self.CRSI_exit) ] # calculating shopping size max_num_shorts = 20 actuall_shorts = len(stocks_short) new_entrys = max_num_shorts - actuall_shorts portfolio_cash = self.Portfolio.Cash # Sum of all currencies in account (only settled cash) one_positon_cash = portfolio_cash / new_entrys if new_entrys and len(short) > 0: ## sorting #num_entry=min(,len(short)) #ordered = sorted(short, key=lambda x: self.indi[x].Value, reverse=True)[:num_entry] for ticker in short: if new_entrys <= 0: continue price = self.History( ticker, 1, Resolution.Daily) if not price.empty: latestclose = price.close[-1] numer_stocks = one_positon_cash * 0.95 / latestclose #self.Securities[ticker].FeeModel = ConstantFeeModel(0) # for testing purpose self.LimitOrder(ticker, -numer_stocks, 1.03 * price.close[-1]) #self.StopLimitOrder(ticker, -numer_stocks, ( 5 * self.vola[ticker].Std + latestclose) , 1.03 * latestclose) new_entrys-=1 for ticker in neutral: self.Liquidate(ticker) # checking indicator warmup #self.Plot("CustomConnors", "C RSI", list(self.indi.values())[0].Value) #self.Plot("CustomVolatility", "volatility", list(self.vola.values())[0].Value ) def OnOrderEvent(self, orderEvent): order = self.Transactions.GetOrderById(orderEvent.OrderId) if order.Status == OrderStatus.Filled: self.Log(str(orderEvent)) #if order.Type == OrderType.Limit or order.Type == OrderTpye.StopMarket: # self.Transactions.CancelOpenOrders(order.Symbol) if order.Status == OrderStatus.Canceled: self.Log(str(orderEvent)) class CustomConnors: def __init__(self, name, rsi_p, streak_p, pct_rank_p): period = max(rsi_p, streak_p, pct_rank_p) self.Name = name self.Time = datetime.min self.IsReady = False self.Value = 0 self.Rsi = 0 self.Streak = 0 self.Pct_Rank = 0 self.queue = deque(maxlen=period) self.rsi_p = rsi_p self.streak_p = streak_p self.pct_rank_p = pct_rank_p def __repr__(self): return "{0} -> IsReady: {1}. Time: {2}. Value: {3}".format(self.Name, self.IsReady, self.Time, self.Value) # Update method is mandatory def Update(self, input): return self.Update_Main(input.Time, input.Close) def Update_warmup(self, input): return self.Update_Main(input.Index, input.close) def Update_Main(self, time, value): self.queue.appendleft(value) # neutral count = len(self.queue) self.Time = time self.IsReady = count == self.queue.maxlen #### start here the indicator calulation if self.IsReady: # Connors streak indicator # Cassic RSI from zipline #"100 - (100 / (1 + (ups / downs)))" close = np.array(self.queue)[-self.rsi_p:] diffs = np.diff(close) ups = np.nanmean(np.clip(diffs, 0, np.inf)) downs = abs(np.nanmean(np.clip(diffs, -np.inf,0))) if downs == 0: self.Rsi = 100 else: self.Rsi = 100 - (100 / (1 + (ups / downs))) # Connors streak indicator curstreak = 0 close = np.array(self.queue)[-self.streak_p:] l=len(close) streak = np.zeros(l) for i in range(l): if close[i-1] < close[i]: streak[i] = curstreak = max(1, curstreak + 1) elif close[i-1] > close[i]: streak[i] = curstreak = min(-1, curstreak - 1) else: streak[i] = curstreak = 0 # cal the rsi from streak diffs = np.diff(streak) ups = np.nanmean(np.clip(diffs, 0, np.inf)) downs = abs(np.nanmean(np.clip(diffs, -np.inf, 0))) if downs == 0: self.Streak = 100 else: self.Streak = 100 - (100 / (1 + (ups / downs))) # Connors Pct Rank Indicator close = np.array(self.queue)[-self.pct_rank_p:] daily_returns = np.diff(close) / close[0:-1] today_gt_past = daily_returns[-1] > daily_returns [0:-1] num = sum(today_gt_past) # sum as as percentage l=np.shape(today_gt_past)[0] self.Pct_Rank = num / l * 100 # combined Connor RSI self.Value = (self.Rsi + self.Streak + self.Pct_Rank ) / 3.0 #### finish the custom indicator return self.IsReady def WarmUp(self,history): for tuple in history.itertuples(): self.Update_warmup(tuple) class CustomVolatility: def __init__(self, name, period): self.Name = name self.Time = datetime.min self.IsReady = False self.Value = 0 self.Std = 0 self.queue = deque(maxlen=period) def __repr__(self): return "{0} -> IsReady: {1}. Time: {2}. Value: {3}".format(self.Name, self.IsReady, self.Time, self.Value) # Update method is mandatory def Update(self, input): return self.Update_Main(input.Time, input.Close) def Update_warmup(self, input): return self.Update_Main(input.Index, input.close) def Update_Main(self, time, value): self.queue.appendleft(value) # neutral count = len(self.queue) self.Time = time # neutral self.IsReady = count == self.queue.maxlen #### start here the indicator calulation if self.IsReady: # [0:-1] is needed to remove last close since diff is one element shorter close = np.array(self.queue) log_close = np.log(close) diffs = np.diff(log_close) self.Std = diffs.std() self.Value = diffs.std() * math.sqrt(252) * 100 return self.IsReady def WarmUp(self,history): for tuple in history.itertuples(): self.Update_warmup(tuple)