Overall Statistics
Total Trades
24542
Average Win
0.03%
Average Loss
-0.02%
Compounding Annual Return
-46.045%
Drawdown
57.100%
Expectancy
-0.196
Net Profit
-45.495%
Sharpe Ratio
-1.524
Probabilistic Sharpe Ratio
0.112%
Loss Rate
66%
Win Rate
34%
Profit-Loss Ratio
1.37
Alpha
-0.392
Beta
0.069
Annual Standard Deviation
0.248
Annual Variance
0.062
Information Ratio
-1.505
Tracking Error
0.377
Treynor Ratio
-5.45
Total Fees
$24595.17
from datetime import timedelta
import numpy as np
from scipy import stats
from collections import deque

from QuantConnect.Data.UniverseSelection import * 
from Selection.FundamentalUniverseSelectionModel import FundamentalUniverseSelectionModel

class ModulatedMultidimensionalReplicator(QCAlgorithm):

    def Initialize(self):
        self.SetStartDate(2020, 1, 1)     
        #self.SetEndDate(2020, 1, 1)        
        self.SetCash(50000)
        
        # Timedelta for the indicator with longes time
        #self.SetWarmUp(timedelta(200))
        
        self.UniverseSettings.Resolution = Resolution.Daily
        self.AddUniverseSelection(MyUniverseSelectionModel())

        
        self.AddAlpha(MOMAlphaModel())
        
        self.Settings.RebalancePortfolioOnInsightChanges = False          
        self.Settings.RebalancePortfolioOnSecurityChanges = False
        
        #self.SetPortfolioConstruction(EqualWeightingPortfolioConstructionModel(self.DateRules.Every(DayOfWeek.Monday)))
        self.SetPortfolioConstruction(EqualWeightingPortfolioConstructionModel())
        self.SetExecution(ImmediateExecutionModel())    
        

        
    def OnData(self, data):
        '''OnData event is the primary entry point for your algorithm. Each new data point will be pumped in here.
            Arguments:
                data: Slice object keyed by symbol containing the stock data
        '''

        # if not self.Portfolio.Invested:
        #    self.SetHoldings("SPY", 1)
        
        
        
class MyUniverseSelectionModel(FundamentalUniverseSelectionModel):

    def __init__(self):
        super().__init__(True, None, None)

    def SelectCoarse(self, algorithm, coarse):
        #filtered = [x for x in coarse if x.Price > 5 and x.DollarVolume > 1e9 ]
        filtered = [x for x in coarse if x.Price > 5 and x.Volume > 1e6 ]
        return [x.Symbol for x in filtered]       
        
    #def FineSelectionFunction(self, algorithm, fine):
        #self.vola = {} 
        
        #for security in fine:
            
            ### Volatility
            # initialize indicator
            #self.vola[symbol] = CustomVolatility( 'My_Custom', 30 )
            #algorithm.RegisterIndicator(symbol, self.vola[symbol], Resolution.Daily)
                
            # warmup indicator
            #history = algorithm.History(symbol, 30, Resolution.Daily)    
            #self.vola[symbol].WarmUp(history)

        
        
        ## filter stocks with the top market cap
        #top = sorted(fine, key = lambda x: x.EarningReports.BasicAverageShares.ThreeMonths * (x.EarningReports.BasicEPS.TwelveMonths*x.ValuationRatios.PERatio), reverse=True)
        #filtered  = [ x for x in fine if (self.vola[x[0]].Value > 100) ]   
        #return [x.Symbol for x in filtered]    
    
   
        
        
        
class MOMAlphaModel(AlphaModel):
        
    '''
        1. The stock’s closing price must be greater than $5/share.

        2. average day volume 21 trading > 1e6 shares a day
    
        3. 100-day Historical Volatility >= 100%.
    
        4. Enter Short on ConnorsRSI (CRSI,3,2,10) > 90 (RSI, Streak, Percentage Change)
    
        5. Short next day on a limit order 5% higher. 

        6. The exit Trade ConnorsRSI < 20
        
        Rresults should be 2007-2017:
    
        Results from 2007-2017 should be:
        # Trades: around 500
        Win Rate: 73%
        Avg. Profit Per Trade: 6.5%
        Avg. Days per Tr%ade: 12
        Avg. Win: 16.5%
        Avg. Loss: 19.5 %
    '''
   
    def __init__(self):
        
        self.indi = {}
        self.rsi = {} 
        self.vola = {} 
        self.indi_Filter = {}
        self.rsi_Filter = {}
        
        self.rsi_w = 3
        self.streak_w = 2
        self.pct_rank_w = 100
        self.vola_w = 30

        self.securities = [] 
        
        self.avtive_sec = [] 

    def OnSecuritiesChanged(self, algorithm, changes):
    
        
        for security in changes.AddedSecurities:
            #self.avtive_sec.append(security) 
            symbol = security.Symbol
            
            ### Connors RSI # DoDo include the standart RSI
            # initialize indicator
            self.indi[symbol] = CustomConnors( 'My_Custom', self.rsi_w, self.streak_w, self.pct_rank_w)
            algorithm.RegisterIndicator(symbol, self.indi[symbol], Resolution.Daily)
            
            # warmup indicator
            history = algorithm.History(symbol, max(self.rsi_w, self.streak_w, self.pct_rank_w), Resolution.Daily)    
            self.indi[symbol].WarmUp(history)
            
            ### Standart RSI
            # initialize indicator
            self.rsi[symbol] = algorithm.RSI(symbol, 3, MovingAverageType.Simple, Resolution.Daily)
            algorithm.RegisterIndicator(symbol, self.rsi[symbol], Resolution.Daily)
            # warmup indicator not neccesary as RegisterIndicator is used
            
            ### Volatility
            # initialize indicator
            self.vola[symbol] = CustomVolatility( 'My_Custom', self.vola_w )
            algorithm.RegisterIndicator(symbol, self.vola[symbol], Resolution.Daily)
            
            # warmup indicator
            history = algorithm.History(symbol, self.vola_w, Resolution.Daily)    
            self.vola[symbol].WarmUp(history)

            
            
        # remove securities
        for security in changes.RemovedSecurities:
            #self.avtive_sec.remove(security) 
            symbol = security.Symbol
            if security in self.indi:
                self.indi[symbol].remove(security) 
            if security in self.rsi:
                self.rsi[symbol].remove(security)
            if security in self.vola:
                self.vola[symbol].remove(security)                
                
                
          
    def Update(self, algorithm, data):
        insights = []
        C_RSI = {}
        filter2  = []
        filter1 = []
        
        buy_filter = []
        stocks_short = []
        
        my_invested = []
        
        
        
        
        # only stock with volatilytiy > 100%
        filter1  = [x[0] for x in self.vola.items() if (self.vola[x[0]].Value > 100) ]
        #filter1  = [x[0] for x in self.vola.items()] 
        
        # stock made a > 5% jump yesterday
        for ticker in filter1:
            prices = algorithm.History(ticker, 2, Resolution.Daily)
            if not prices.empty:
                if len(prices.close) == 2:
                    if (prices.close[-1]/prices.close[-2]) > 1.05 :
                        filter2.append(ticker) 
        
        # find short stocks
        #my_invested = [ x.Symbol.Value for x in algorithm.Portfolio.Values if x.Invested ]
        my_invested = [ x.Symbol for x in algorithm.Portfolio.Values if x.Invested  ]
        
        for ticker in  my_invested:
            if algorithm.Portfolio[ticker].IsShort:
                stocks_short.append(ticker)

        
        for ticker in (filter2 + stocks_short):
            C_RSI[ticker]= (self.rsi[ticker].Current.Value + self.indi[ticker].Streak + self.indi[ticker].Pct_Rank)/3
            
        
        ## filtering for buy candiates 
        short  = [x for x in filter2 if (C_RSI[x] > 90) ]
        
        ## filter portfolio for exit candiates
        neutral = [x for x in stocks_short if (C_RSI[x] < 30) ]
            
        ## sorting
        #ordered = sorted(self.indi_Filter.items(), key=lambda x: x[1].Value, reverse=True)[:self.num]        
        for ticker in short:
            
            insights.append( Insight.Price(ticker, timedelta(14), InsightDirection.Down) ) 

        for ticker in neutral:
            insights.append( Insight.Price(ticker, timedelta(5), InsightDirection.Flat) )             
            
        
        # for testing
        #algorithm.Plot("Custom_Slope", "Value", list(self.indi.values())[0].Streak *10000)
        # for testing
        #algorithm.Plot("Custom_Slope", "RSI", list(self.rsi.values())[0].Current.Value )  
        algorithm.Plot("nummer of short", "shorts", len(short)  )  
        algorithm.Plot("nummer of closed", "flat", len(neutral)  )  
        return insights
    



class CustomConnors:
    def __init__(self, name, rsi_p, streak_p, pct_rank_p):
        period = max(rsi_p, streak_p, pct_rank_p)
        self.Name = name
        self.Time = datetime.min
        self.IsReady = False
        self.Value = 0
        self.Streak = 0
        self.Pct_Rank = 0
        self.queue = deque(maxlen=period)
        
        self.rsi_p = rsi_p
        self.streak_p = streak_p
        self.pct_rank_p = pct_rank_p

    def __repr__(self):
        return "{0} -> IsReady: {1}. Time: {2}. Value: {3}".format(self.Name, self.IsReady, self.Time, self.Value)
        
    # Update method is mandatory
    def Update(self, input):
        return self.Update_Main(input.Time, input.Close)
        
    def Update_warmup(self, input):
        return self.Update_Main(input.Index, input.close)
   
    def Update_Main(self, time, value):
    #def Update(self, input):
        #self.queue.appendleft(input.close)  # used by warm up
        #self.queue.appendleft(input.Close)  # used by RegisterIndicator
        self.queue.appendleft(value) # neutral
        count = len(self.queue)
        #self.Time = input.Index # used by warm up
        #self.Time = input.Time # used by RegisterIndicator
        self.Time = time # neutral

        self.IsReady = count == self.queue.maxlen
        
        # Connors streak indicator
        #### start here the indicator calulation
        if self.IsReady:
            curstreak = 0
            close = np.array(self.queue)[-self.streak_p:]
            streak  = np.zeros(self.streak_p)
            for i in range(self.streak_p):
                if close[i-1] < close[i]:
                    streak[i] = curstreak = max(1, curstreak + 1)
                elif close[i-1] > close[i]:
                    streak[i] = curstreak = min(-1, curstreak - 1)
                else:
                    streak[i] = curstreak = 0
                    
            self.Streak = (streak[-1] / (self.streak_p-1) + 1) * 50 # streak goes from  0 to 100 linear         

        #### finish the custom indicator
        
        # Connors Pct Rank Indicator
        #### start here the indicator calulation
            #cl = np.zeros(count)
            close = np.array(self.queue)[-self.pct_rank_p:]
            #close = np.array([[1, 1],[2, 2],[4,4],[15,15],[226,22]])
            daily_returns = np.diff(close) / close[0:-1] 
            # today return greater than how many past returns
            today_gt_past = daily_returns[-1] > daily_returns [0:-1]
            # sum of today > past
            num = sum(today_gt_past)
            # sum as as percentage
            l=np.shape(today_gt_past)[0]
            self.Pct_Rank = num / l * 100

        return self.IsReady
        
    def WarmUp(self,history):
        for tuple in history.itertuples():
            self.Update_warmup(tuple)

        

class CustomVolatility:
    def __init__(self, name, period):
        self.Name = name
        self.Time = datetime.min
        self.IsReady = False
        self.Value = 0
        self.Mean = 0
        self.Std = 0
        self.queue = deque(maxlen=period)

    def __repr__(self):
        return "{0} -> IsReady: {1}. Time: {2}. Value: {3}".format(self.Name, self.IsReady, self.Time, self.Value)
        
    # Update method is mandatory
    def Update(self, input):
        return self.Update_Main(input.Time, input.Close)
        
    def Update_warmup(self, input):
        return self.Update_Main(input.Index, input.close)
   
    def Update_Main(self, time, value):
        self.queue.appendleft(value) # neutral
        count = len(self.queue)
        self.Time = time # neutral

        self.IsReady = count == self.queue.maxlen
        
        #### start here the indicator calulation
        if self.IsReady:
            # [0:-1] is needed to remove last close since diff is one element shorter 
            close = np.array(self.queue)
            daily_returns = np.diff(close) / close[0:-1]  
            self.Std = daily_returns.std() #* math.sqrt(252) 
            self.Mean = daily_returns.mean() #* math.sqrt(252)  
            self.Value = self.Std / abs(self.Mean) * 100 # in %

        return self.IsReady
        
    def WarmUp(self,history):
        for tuple in history.itertuples():
            self.Update_warmup(tuple)