Overall Statistics
Total Trades
298
Average Win
0.18%
Average Loss
-0.06%
Compounding Annual Return
1389.967%
Drawdown
3.000%
Expectancy
2.382
Net Profit
37.471%
Sharpe Ratio
42.536
Probabilistic Sharpe Ratio
99.914%
Loss Rate
14%
Win Rate
86%
Profit-Loss Ratio
2.95
Alpha
0
Beta
0
Annual Standard Deviation
0.275
Annual Variance
0.076
Information Ratio
42.536
Tracking Error
0.275
Treynor Ratio
0
Total Fees
$298.69
from QuantConnect.Data.UniverseSelection import *
from Selection.FundamentalUniverseSelectionModel import FundamentalUniverseSelectionModel
from itertools import groupby
from math import ceil

class QC500UniverseSelectionModel(FundamentalUniverseSelectionModel):
    '''Defines the QC500 universe as a universe selection model for framework algorithm
    For details: https://github.com/QuantConnect/Lean/pull/1663'''

    def __init__(self, filterFineData = True, universeSettings = None, securityInitializer = None):
        '''Initializes a new default instance of the QC500UniverseSelectionModel'''
        super().__init__(filterFineData, universeSettings, securityInitializer)
        self.numberOfSymbolsCoarse = 1000
        self.numberOfSymbolsFine = 500
        self.dollarVolumeBySymbol = {}
        self.lastMonth = -1

    def SelectCoarse(self, algorithm, coarse):
        if algorithm.Time.month == self.lastMonth:
            return Universe.Unchanged

        # Do not invest in stocks generating errors messages in logs
        filteredErrors = [x for x in coarse if x.Symbol.Value != "VIAC" and x.Symbol.Value != "BEEM" and x.Symbol.Value != "LSI" and x.Symbol.Value != "IQV" and x.Symbol.Value != "GBT" and x.Symbol.Value != "VTRS" and x.Symbol.Value != "FUBO" and x.Symbol.Value != "SPCE" and x.Symbol.Value != "TFC" and x.Symbol.Value != "PEAK"]

        sortedByDollarVolume = sorted([x for x in filteredErrors if x.HasFundamentalData and x.Volume > 0 and x.Price > 0],
                                     key = lambda x: x.DollarVolume, reverse=True)[:self.numberOfSymbolsCoarse]

        self.dollarVolumeBySymbol = {x.Symbol:x.DollarVolume for x in sortedByDollarVolume}

        # If no security has met the QC500 criteria, the universe is unchanged.
        # A new selection will be attempted on the next trading day as self.lastMonth is not updated
        if len(self.dollarVolumeBySymbol) == 0:
            return Universe.Unchanged

        # return the symbol objects our sorted collection
        return list(self.dollarVolumeBySymbol.keys())

    def SelectFine(self, algorithm, fine):
        '''Performs fine selection for the QC500 constituents
        The company's headquarter must in the U.S.
        The stock must be traded on either the NYSE or NASDAQ
        At least half a year since its initial public offering
        The stock's market cap must be greater than 500 million'''

        sortedBySector = sorted([x for x in fine if x.CompanyReference.CountryId == "USA"
                                        and x.CompanyReference.PrimaryExchangeID in ["NYS","NAS"]
                                        and (algorithm.Time - x.SecurityReference.IPODate).days > 180
                                        and x.MarketCap > 5e8],
                               key = lambda x: x.CompanyReference.IndustryTemplateCode)

        count = len(sortedBySector)

        # If no security has met the QC500 criteria, the universe is unchanged.
        # A new selection will be attempted on the next trading day as self.lastMonth is not updated
        if count == 0:
            return Universe.Unchanged

        # Update self.lastMonth after all QC500 criteria checks passed
        self.lastMonth = algorithm.Time.month

        percent = self.numberOfSymbolsFine / count
        sortedByDollarVolume = []

        # select stocks with top dollar volume in every single sector
        for code, g in groupby(sortedBySector, lambda x: x.CompanyReference.IndustryTemplateCode):
            y = sorted(g, key = lambda x: self.dollarVolumeBySymbol[x.Symbol], reverse = True)
            c = ceil(len(y) * percent)
            sortedByDollarVolume.extend(y[:c])

        sortedByDollarVolume = sorted(sortedByDollarVolume, key = lambda x: self.dollarVolumeBySymbol[x.Symbol], reverse=True)
        return [x.Symbol for x in sortedByDollarVolume[:self.numberOfSymbolsFine]]
from clr import AddReference

AddReference("System")
AddReference("QuantConnect.Algorithm")
AddReference("QuantConnect.Common")

from System import *
from QuantConnect import *
from QuantConnect.Algorithm import *

from QC500UniverseSelectionModel import QC500UniverseSelectionModel
from ClenowMomentumAlphaModel import ClenowMomentumAlphaModel
from Execution.ImmediateExecutionModel import ImmediateExecutionModel
from Risk.NullRiskManagementModel import NullRiskManagementModel
from QuantConnect.Indicators import *

class A0001(QCAlgorithm):

    def Initialize(self):
        self.SetStartDate(2021, 1, 1)  # Set Start Date
        # self.SetEndDate(2017, 1, 10)  # Set Start Date
        self.SetCash(100000)  # Set Strategy Cash
        
        self.SetBrokerageModel(BrokerageName.InteractiveBrokersBrokerage, AccountType.Margin)

        # Benchmark
        self.benchmark = Symbol.Create('SPY', SecurityType.Equity, Market.USA)
        self.AddEquity('SPY', Resolution.Daily)
        
        self.referenceSMAperiod = 200
        self.referenceTicker = "SPY"
        self.referenceSMA = self.SMA(self.referenceTicker, self.referenceSMAperiod, Resolution.Daily)
        
        self.linRegPeriod = 90
        self.SMAperiod = 100
        self.ATRperiod = 20
        
        self.riskFactor = 0.1/100
        
        # Data resolution
        self.UniverseSettings.Resolution = Resolution.Daily
        self.Settings.RebalancePortfolioOnInsightChanges = False
        self.Settings.RebalancePortfolioOnSecurityChanges = False

        self.SetUniverseSelection(QC500UniverseSelectionModel())
        self.AddAlpha(ClenowMomentumAlphaModel(Resolution.Daily, self.linRegPeriod, self.SMAperiod, self.ATRperiod, self.referenceSMAperiod, self.referenceTicker, self.referenceSMA, self.riskFactor))
        self.SetExecution(ImmediateExecutionModel())
        self.SetPortfolioConstruction(InsightWeightingPortfolioConstructionModel(self.DateRules.Every(DayOfWeek.Wednesday)))
        self.SetRiskManagement(NullRiskManagementModel())
from collections import deque
from datetime import datetime, timedelta
import numpy as np
from scipy import stats
import pandas as pd

from QuantConnect.Algorithm.Framework.Alphas import *

class ClenowMomentumAlphaModel(AlphaModel):
    def __init__(self, resolution, linRegPeriod, SMAperiod, ATRperiod, referenceSMAperiod, referenceTicker, referenceSMA, riskFactor):
        self.resolution = resolution
        self.symbolDataBySymbol = {}
        
        self.linRegPeriod = linRegPeriod
        self.SMAperiod = SMAperiod
        self.ATRperiod = ATRperiod
   
        self.referenceSMAperiod = referenceSMAperiod
        self.referenceTicker = referenceTicker
        self.referenceSMA = referenceSMA
  
        self.riskFactor = riskFactor

        self.isPositionsRebalancingWeek = True
        self.previousStocksBought = []
        
        self.referenceSMAisSet = False
        
    def Update(self, algorithm, data):
        insights = []
        topSelection = []
        stocksToBuy = []
        refSecurity = None

        if algorithm.Time.isoweekday() != 3:
            return insights
        else:
            # Weekly update of insights every Wednesday
            for symbol, symbolData in self.symbolDataBySymbol.items():
                if data.ContainsKey(symbolData.Symbol):
                    if data[symbolData.Symbol] is None:
                        continue
                    else:
                        # collect ClenowMomentum and ATR data for every stock
                        if symbolData.Symbol.Value == self.referenceTicker:
                            refSecurity = symbolData
                        else:
                            selectedStock = []
                            selectedStock.append(symbolData)
                            selectedStock.append(symbolData.ClenowMomentum.Value)
                            selectedStock.append(self.riskFactor*data[symbolData.Symbol].Close/symbolData.ATR.Current.Value)
    
                            topSelection.append(selectedStock)
                    
            #Rank and keep 20% of stock based on momentum indicator
            topSelection = sorted(topSelection, key=lambda x: x[1], reverse=True)            
            topSelection = topSelection[:int(20*len(topSelection)/100)]

            # Setup portfolio available %
            AvailablePorfolioPercent = 1
            
            # buy the previous stock first or sell according to conditions
            for previousStockBought in self.previousStocksBought:
                for selectedStock in topSelection:
                    if previousStockBought[0] == selectedStock[0] and data[selectedStock[0].Symbol].Close > selectedStock[0].SMA.Current.Value:
                        # Every positions rebalancing week, update ATR value for last week stocks
                        if self.isPositionsRebalancingWeek == True:
                            if selectedStock[2] < AvailablePorfolioPercent:
                                AvailablePorfolioPercent = AvailablePorfolioPercent - selectedStock[2]
                                stocksToBuy.append(selectedStock)
                        else:
                        # Every other week, keep the same ATR for positions
                            if previousStockBought[2] < AvailablePorfolioPercent:
                                AvailablePorfolioPercent = AvailablePorfolioPercent - previousStockBought[2]
                                stocksToBuy.append(previousStockBought)

                        topSelection.remove(selectedStock)
                    elif data[selectedStock[0].Symbol].Close < selectedStock[0].SMA.Current.Value:
                        topSelection.remove(selectedStock)

            # If market reference > referenceSMA -> bullish market -> buy more stock
            if data[refSecurity.Symbol].Close > refSecurity.referenceSMA.Current.Value:
                for selectedStock in topSelection:
                    if selectedStock[2] < AvailablePorfolioPercent:
                        stocksToBuy.append(selectedStock)
                        AvailablePorfolioPercent = AvailablePorfolioPercent - selectedStock[2]
                    else:
                        break

            for stockToBuy in stocksToBuy:
                insights.append(Insight.Price(stockToBuy[0].Symbol, timedelta(days=1), InsightDirection.Up, None, 1.00, None, stockToBuy[2]))
            
            self.previousStocksBought = stocksToBuy
            
            if self.isPositionsRebalancingWeek == True:
                self.isPositionsRebalancingWeek = False
            else:
                self.isPositionsRebalancingWeek = True

            return insights

    def OnSecuritiesChanged(self, algorithm, changes):
        # Initialize referenceTicker SMA once
        if self.referenceSMAisSet == False:
            history = algorithm.History([self.referenceTicker], max(self.SMAperiod, self.ATRperiod, self.linRegPeriod, self.referenceSMAperiod), self.resolution)
            tickers = history.index.levels[0]
        
            for ticker in tickers:
                symbol = SymbolCache.GetSymbol(ticker)
                if symbol not in self.symbolDataBySymbol:
                    symbolData = SymbolData(symbol, self.SMAperiod, self.ATRperiod, self.linRegPeriod, self.referenceSMAperiod, self.referenceTicker)
                    self.symbolDataBySymbol[symbol] = symbolData
                    symbolData.RegisterIndicators(algorithm, self.resolution)
                    symbolData.WarmUpIndicators(history, ticker, symbol)

            self.referenceSMAisSet = True

        # initialize data for added securities
        addedSymbols = [x.Symbol for x in changes.AddedSecurities]
        if len(addedSymbols) == 0: 
            return
        else:
            history = algorithm.History(addedSymbols, max(self.SMAperiod, self.ATRperiod, self.linRegPeriod), self.resolution)
            tickers = history.index.levels[0]
        
            for ticker in tickers:
                symbol = SymbolCache.GetSymbol(ticker)
                
                if symbol not in self.symbolDataBySymbol:
                    symbolData = SymbolData(symbol, self.SMAperiod, self.ATRperiod, self.linRegPeriod, self.referenceSMAperiod, self.referenceTicker)
                    self.symbolDataBySymbol[symbol] = symbolData
                    symbolData.RegisterIndicators(algorithm, self.resolution)
                    symbolData.WarmUpIndicators(history, ticker, symbol)
        
        # clean up data for removed securities
        for removed in changes.RemovedSecurities:
            symbolData = self.symbolDataBySymbol.pop(removed.Symbol, None)
            if symbolData is not None:
                symbolData.RemoveConsolidators(algorithm)

class SymbolData:
    '''Contains data specific to a symbol required by this model'''
    def __init__(self, symbol, SMAperiod, ATRperiod, linRegPeriod, referenceSMAperiod, referenceTicker):
        self.Symbol = symbol
        
        self.SMA = SimpleMovingAverage(SMAperiod)
        self.ATR = AverageTrueRange(ATRperiod, MovingAverageType.Simple)
        self.ClenowMomentum = ClenowMomentum('ClenowMomentum', symbol, linRegPeriod)
        self.referenceSMA = SimpleMovingAverage(referenceSMAperiod)
        self.referenceTicker = referenceTicker

        self.SMAConsolidator = None
        self.ATRConsolidator = None
        self.ClenowMomentumConsolidator = None
        self.referenceSMAConsolidator = None
        self.previous = 0

    def RegisterIndicators(self, algorithm, resolution):
        self.SMAConsolidator = algorithm.ResolveConsolidator(self.Symbol, resolution)
        algorithm.RegisterIndicator(self.Symbol, self.SMA, self.SMAConsolidator, Field.Close)
        
        self.ATRConsolidator = algorithm.ResolveConsolidator(self.Symbol, resolution)
        algorithm.RegisterIndicator(self.Symbol, self.ATR, self.ATRConsolidator)
        
        self.ClenowMomentumConsolidator = algorithm.ResolveConsolidator(self.Symbol, resolution)
        algorithm.RegisterIndicator(self.Symbol, self.ClenowMomentum, self.ClenowMomentumConsolidator)
        
        if self.Symbol.Value == self.referenceTicker:
            self.referenceSMAConsolidator = algorithm.ResolveConsolidator(self.Symbol, resolution)
            algorithm.RegisterIndicator(self.Symbol, self.referenceSMA, self.referenceSMAConsolidator, Field.Close)

    def WarmUpIndicators(self, history, ticker, symbol):
        for tuple in history.loc[ticker].itertuples():
            self.SMA.Update(tuple.Index, tuple.close)
            bar = TradeBar(tuple.Index, symbol, tuple.open, tuple.high, tuple.low, tuple.close, tuple.volume)     
            self.ATR.Update(bar)
        self.ClenowMomentum.Warmup(history)
        
        if self.Symbol.Value == self.referenceTicker:
            for tuple in history.loc[ticker].itertuples():
                self.referenceSMA.Update(tuple.Index, tuple.close)
            
    def RemoveConsolidators(self, algorithm):
        if self.SMAConsolidator is not None:
            algorithm.SubscriptionManager.RemoveConsolidator(self.Symbol, self.SMAConsolidator)
        if self.ATRConsolidator is not None:
            algorithm.SubscriptionManager.RemoveConsolidator(self.Symbol, self.ATRConsolidator)
        if self.ClenowMomentumConsolidator is not None:
            algorithm.SubscriptionManager.RemoveConsolidator(self.Symbol, self.ClenowMomentumConsolidator)

    @property
    def CanEmit(self):
        if self.previousSMA == self.SMA.Samples:
            return False
        
        if self.previousATR == self.ATR.Samples:
            return False

        self.previousSMA = self.SMA.Samples
        self.previousATR = self.ATR.Samples
        
        return self.SMA.IsReady

# Define custom indicator
class ClenowMomentum:
    def __init__(self, name, symbol, period):
        self.symbol = symbol
        self.Name = name
        self.Time = datetime.min
        self.Value = 0
        self.IsReady = False
        self.queue = deque(maxlen=period)
        self.queueTime = deque(maxlen=period)
        self.CurrentReturn = 0
    
    # required update method
    def Update(self, input):
        return self.UpdateFromParameters(input.Time, input.Close)
    
    # update called for history warmup
    def UpdateFromParameters(self, time, value):
        self.queue.appendleft(value)
        if not len(self.queue) == self.queue.maxlen:
            return False
        self.queueTime.appendleft(time)
        self.Time = time
        count = len(self.queue)
        self.IsReady = count == self.queue.maxlen
        
        #start indicator calculation
        if self.IsReady:
            y = np.flipud(np.log(self.queue))
            x = np.arange(len(y))
            slope, intercept, r_value, p_value, std_err = stats.linregress(x, y)
            # Annualized exponential regression (ratio^252) x correlation
            self.Value = ((1+slope)**252)*(r_value**2)
        
        return self.IsReady

    def Warmup(self,history):
        for index, row in history.loc[self.symbol].iterrows():
            self.UpdateFromParameters(index, row['close'])