Overall Statistics
Total Trades
6
Average Win
0%
Average Loss
-0.89%
Compounding Annual Return
-85.918%
Drawdown
2.900%
Expectancy
-1
Net Profit
-2.650%
Sharpe Ratio
-3.569
Probabilistic Sharpe Ratio
3.729%
Loss Rate
100%
Win Rate
0%
Profit-Loss Ratio
0
Alpha
3.751
Beta
-1.867
Annual Standard Deviation
0.207
Annual Variance
0.043
Information Ratio
-13.641
Tracking Error
0.23
Treynor Ratio
0.395
Total Fees
$53.70
from datetime import timedelta
import numpy as np
from scipy import stats
from collections import deque
import math

from QuantConnect.Data.UniverseSelection import * 
from Selection.FundamentalUniverseSelectionModel import FundamentalUniverseSelectionModel


class conners_crash(QCAlgorithm):
    

    #filteredByPrice = None
    
    def Initialize(self):
        self.SetStartDate(2018, 1, 1)  
        self.SetEndDate(2018, 1, 5) 
        self.SetCash(100000)  
        self.AddUniverse(self.CoarseSelectionFilter)
        self.UniverseSettings.Resolution = Resolution.Daily

        self.SetWarmUp(150)
        
        self.UniverseSettings.Leverage = 1

        #self.SetSecurityInitializer(lambda x: x.SetDataNormalizationMode(DataNormalizationMode.Raw))
        
        
        self.indi = {}
        self.rsi = {} 
        self.vola = {} 
        self.indi_Filter = {}
        self.rsi_Filter = {}
        
        self.rsi_w = 3
        self.streak_w = 3
        self.pct_rank_w = 100

        self.vola_w = 100
        self.hist_vol = 100    

        self.max_num_shorts = 5

        self.crsi_entry=80
        self.crsi_flat=30    
        
        
    def CoarseSelectionFilter(self, coarse):
        #filtered = [x for x in coarse if x.Price > 5 and x.DollarVolume > 1e9 ]
        filtered = [x for x in coarse if x.Price > 5 and x.Volume > 1e6 ]
        return [x.Symbol for x in filtered]     

    def OnSecuritiesChanged(self, changes):
        
        self.symbolDataBySymbol = {}
        
        symbols = [ x.Symbol for x in changes.AddedSecurities ]
        for ticker in symbols:
            
            if ticker not in self.symbolDataBySymbol:
                crsi = CustomConnors( 'My_Custom', self.rsi_w, self.streak_w, self.pct_rank_w)
                vola = CustomVolatility( 'My_Custom', self.vola_w )
                self.RegisterIndicator(ticker, crsi, Resolution.Daily)
                self.RegisterIndicator(ticker, vola, Resolution.Daily)
                history = self.History(ticker, max(self.rsi_w, self.streak_w, self.pct_rank_w,self.vola_w), Resolution.Daily)    
                crsi.WarmUp(history)
                vola.WarmUp(history)
                    
                symbolData = SymbolData(ticker, crsi, vola)
                self.symbolDataBySymbol[ticker] = symbolData
            
            
        symbolsRemoved = [ x.Symbol for x in changes.RemovedSecurities ]
        stocks_short = [ x.Symbol for x in self.Portfolio.Values if x.IsShort ]
        
        for ticker in stocks_short:
            a=ticker
        for ticker in symbolsRemoved:
            if not ticker in stocks_short:
                self.symbolDataBySymbol.pop(ticker, None)
            else:
                a = ticker

           

    def OnData(self, data):
        '''OnData event is the primary entry point for your algorithm. Each new data point will be pumped in here.
            Arguments:
                data: Slice object keyed by symbol containing the stock data
 
        1. The stock’s closing price must be greater than $5/share.
        2. average day volume 21 trading > 1e6 shares a day
        3. 100-day Historical Volatility >= 100%.
        4. Enter Short on ConnorsRSI (CRSI,3,2,10) > 90 (RSI, Streak, Percentage Change)
        5. Short next day on a limit order 5% higher. 
        6. The exit Trade ConnorsRSI < 20
        Results from 2007-2017 should be:
        # Trades: around 500
        Win Rate: 73%
        Avg. Profit Per Trade: 6.5%
        Avg. Days per Tr%ade: 12
        Avg. Win: 16.5%
        Avg. Loss: 19.5 %
        '''

        stocks_short = []
        neutral = []
    
        # only stock with volatilytiy > 100%
        filter1  = [x[0] for x in self.symbolDataBySymbol.items() if (self.symbolDataBySymbol[x[0]].VOLA.Value > self.hist_vol) ]
        
        entry  = [x for x in filter1 if (self.symbolDataBySymbol[x].CRSI.Value > self.crsi_entry) ]
        
        # find short stocks
        stocks_short = [ x for x in self.Portfolio.Values if x.IsShort ]
        
        for x in stocks_short:
            if not x in self.symbolDataBySymbol:
                neutral.append(x.Symbol)
            else:
                if self.symbolDataBySymbol[x.Symbol].CRSI.Value < self.crsi_flat < 30:
                    neutral.append(x.Symbol)
        
        #neutral = [x for x in stocks_short if (self.symbolDataBySymbol[x].CRSI.Value < self.crsi_flat)]

        # calculating size
        actuall_shorts = len(stocks_short)
        new_entrys = self.max_num_shorts - actuall_shorts
        portfolio_cash =  self.Portfolio.Cash  # Sum of all currencies in account (only settled cash)
        one_positon_cash = portfolio_cash / new_entrys

        if new_entrys and entry:
            for ticker in entry:
                if new_entrys <= 0:
                    continue
                numer_stocks = one_positon_cash  * 0.95 / self.Securities[ticker].Price
                self.MarketOrder(ticker, -numer_stocks)
                new_entrys-=1

        for ticker in neutral:
            self.Liquidate(ticker)
            

class SymbolData:
    '''Contains data specific to a symbol required by this model'''
    def __init__(self, symbol, crsi, vola):
        self.Symbol = symbol
        self.CRSI = crsi
        self.VOLA = vola

  

class CustomConnors:
    def __init__(self, name, rsi_p, streak_p, pct_rank_p):
        period = max(rsi_p, streak_p, pct_rank_p)
        self.Name = name
        self.Time = datetime.min
        self.IsReady = False
        self.Value = 0
        self.Rsi = 0
        self.Streak = 0
        self.Pct_Rank = 0
        
        self.queue = deque(maxlen=period)
        
        self.rsi_p = rsi_p
        self.streak_p = streak_p
        self.pct_rank_p = pct_rank_p

    def __repr__(self):
        return "{0} -> IsReady: {1}. Time: {2}. Value: {3}".format(self.Name, self.IsReady, self.Time, self.Value)
        
    # Update method is mandatory
    def Update(self, input):
        return self.Update_Main(input.Time, input.Close)
        
    def Update_warmup(self, input):
        return self.Update_Main(input.Index, input.close)
   
    def Update_Main(self, time, value):
    #def Update(self, input):
        #self.queue.appendleft(input.close)  # used by warm up
        #self.queue.appendleft(input.Close)  # used by RegisterIndicator
        self.queue.appendleft(value) # neutral
        count = len(self.queue)
        #self.Time = input.Index # used by warm up
        #self.Time = input.Time # used by RegisterIndicator
        self.Time = time # neutral

        self.IsReady = count == self.queue.maxlen
        
        #### start here the indicator calulation
        if self.IsReady:

            # Connors streak indicator
            #  Cassic RSI from zipline
            #"100 - (100 / (1 + (ups / downs)))"
            close = np.array(self.queue)[-self.rsi_p:]
            diffs = np.diff(close)
            ups = np.nanmean(np.clip(diffs, 0, np.inf))
            downs = abs(np.nanmean(np.clip(diffs, -np.inf,0)))
            if downs == 0:
                self.Rsi = 100
            else:
                self.Rsi = 100 - (100 / (1 + (ups / downs)))
            
            # Connors streak indicator
            curstreak = 0
            close = np.array(self.queue)[-self.streak_p:]
            l=len(close)
            streak  = np.zeros(l)
            for i in range(l):
                if close[i-1] < close[i]:
                    streak[i] = curstreak = max(1, curstreak + 1)
                elif close[i-1] > close[i]:
                    streak[i] = curstreak = min(-1, curstreak - 1)
                else:
                    streak[i] = curstreak = 0
            # cal the rsi from streak        
            diffs = np.diff(streak)
            ups = np.nanmean(np.clip(diffs, 0, np.inf))
            downs = abs(np.nanmean(np.clip(diffs, -np.inf, 0)))
            if downs == 0:
                self.Streak = 100
            else:
                self.Streak = 100 - (100 / (1 + (ups / downs)))

            # Connors Pct Rank Indicator
            #cl = np.zeros(count)
            close = np.array(self.queue)[-self.pct_rank_p:]
            #close = np.array([[1, 1],[2, 2],[4,4],[15,15],[226,22]])
            daily_returns = np.diff(close) / close[0:-1] 
            # today return greater than how many past returns
            today_gt_past = daily_returns[-1] > daily_returns [0:-1]
            # sum of today > past
            num = sum(today_gt_past)
            # sum as as percentage
            l=np.shape(today_gt_past)[0]
            self.Pct_Rank = num / l * 100

            # combined Connor RSI
            self.Value =  (self.Rsi +  self.Streak +  self.Pct_Rank ) / 3.0
            #self.Value =  (self.Rsi +  self.Streak ) / 2.0
            #### finish the custom indicator

        return self.IsReady
        
    def WarmUp(self,history):
        for tuple in history.itertuples():
            self.Update_warmup(tuple)

        

class CustomVolatility:
    def __init__(self, name, period):
        self.Name = name
        self.Time = datetime.min
        self.IsReady = False
        self.Value = 0
        self.queue = deque(maxlen=period)

    def __repr__(self):
        return "{0} -> IsReady: {1}. Time: {2}. Value: {3}".format(self.Name, self.IsReady, self.Time, self.Value)
        
    # Update method is mandatory
    def Update(self, input):
        return self.Update_Main(input.Time, input.Close)
        
    def Update_warmup(self, input):
        return self.Update_Main(input.Index, input.close)
   
    def Update_Main(self, time, value):
        self.queue.appendleft(value) # neutral
        count = len(self.queue)
        self.Time = time # neutral

        self.IsReady = count == self.queue.maxlen
        
        #### start here the indicator calulation
        if self.IsReady:
            # [0:-1] is needed to remove last close since diff is one element shorter 
            close = np.array(self.queue)
            log_close = np.log(close)
            diffs = np.diff(log_close)
            self.Value = diffs.std() * math.sqrt(252) * 100

        return self.IsReady
        
    def WarmUp(self,history):
        for tuple in history.itertuples():
            self.Update_warmup(tuple)