Overall Statistics
Total Orders
1155
Average Win
1.53%
Average Loss
-1.34%
Compounding Annual Return
21.512%
Drawdown
48.000%
Expectancy
0.379
Start Equity
100000
End Equity
1668052.59
Net Profit
1568.053%
Sharpe Ratio
0.718
Sortino Ratio
0.806
Probabilistic Sharpe Ratio
10.977%
Loss Rate
36%
Win Rate
64%
Profit-Loss Ratio
1.14
Alpha
0.014
Beta
1.136
Annual Standard Deviation
0.217
Annual Variance
0.047
Information Ratio
0.307
Tracking Error
0.102
Treynor Ratio
0.137
Total Fees
$5093.16
Estimated Strategy Capacity
$170000000.00
Lowest Capacity Asset
CTAS R735QTJ8XC9X
Portfolio Turnover
2.17%
# region imports
from AlgorithmImports import *
import math
from QuantConnect.Data.UniverseSelection import SecurityChanges
import numpy as np
from scipy import stats
from datetime import timedelta
# endregion

class Licentav3(QCAlgorithm):

    def Initialize(self):
        """
        Initialize the algorithm, setting up the initial parameters, configurations, and universe.
        This includes setting the start date, adding a benchmark, enabling indicator warm-up,
        adding the reference security (SPY), and configuring the universe of ETFs.
        """
        self._securities = set()  # Set to hold the securities being tracked
        self.etf_symbols = []  # List to store ETF constituent symbols
        self.rebalance_flag = False  # Flag to trigger rebalancing
        self.weights = dict()  # Dictionary to store weights of ETF constituents
        self.SetStartDate(2010, 1, 1)  # Set start date of the algorithm
        self.EnableAutomaticIndicatorWarmUp = True  # Enable automatic indicator warm-up
        self.referenceSymbol = self.AddEquity("SPY", Resolution.Daily).Symbol  # Add SPY as the reference symbol
        self.SetBenchmark("SPY")  # Set SPY as the benchmark
        self.SetCash(100000)  # Set starting cash to $100,000

        # Set up reference rate of change (ROC) indicator with a window size of 5 years (252 trading days per year)
        self.referenceROC = self.calculate_log_return(self.referenceSymbol, 1, Resolution.Daily, Field.Close)
        self.referenceROC.Window.Size = 252 * 5  # 5 years of trading days
        history = self.History(self.referenceSymbol, 252 * 5 + 1, Resolution.Daily)  # Fetch 5 years + 1 day of history
        for time, row in history.loc[self.referenceSymbol].iterrows():
            self.referenceROC.Update(time, row.close)  # Update ROC with historical data

        # Universe settings
        self.universe_settings.resolution = Resolution.Daily
        self.universe_settings.fill_forward = True
        # Add universe of ETFs
        self.AddUniverse(self.Universe.ETF("SPY", Market.USA, self.universe_settings, self.etf_constituents_filter))

    def OnData(self, data: Slice):
        """
        Process incoming data (called each time new data is received).
        Updates the securities being tracked and plots various metrics related to the calculated indicators.
        """
        if len(self._securities) == 0:
            return  # If there are no securities, return immediately
        for security in self._securities:
            security.update_indicators()  # Update indicators for each security

        # Filter securities based on availability of various indicators
        all_securities = [x for x in self._securities]
        securities_with_beta = [x for x in all_securities if x.beta is not None]
        securities_with_cost_of_equity = [x for x in all_securities if x.cost_of_equity is not None]
        securities_with_eps = [x for x in all_securities if x.forward_eps is not None]
        securities_with_eps_and_cost_of_equity = [x for x in all_securities if x.forward_eps is not None and x.cost_of_equity is not None]
        securities_with_pvgo = [x for x in all_securities if x.pvgo is not None]
        securities_with_implied_growth = [x for x in all_securities if x.implied_growth is not None]
        securities_with_expected_growth = [x for x in all_securities if x.expected_growth is not None]
        
        # Plot the counts of securities with the various indicators
        self.Plot('Securities', 'Total', len(all_securities))
        self.Plot('Securities', 'Beta', len(securities_with_beta))
        self.Plot('Securities', 'Cost of Equity', len(securities_with_cost_of_equity))
        self.Plot('Securities', 'EPS1', len(securities_with_eps))
        self.Plot('Securities', 'EPS1 and Cost of Equity', len(securities_with_eps_and_cost_of_equity))
        self.Plot('Securities', 'PVGO', len(securities_with_pvgo))
        self.Plot('Securities', 'Implied Growth', len(securities_with_implied_growth))
        self.Plot('Securities', 'Expected Growth', len(securities_with_expected_growth))

    def calculate_market_return(self):
        """
        Calculate the annualized market return (Rm) using the reference rate of change (ROC) indicator.
        This method aggregates the daily returns stored in the ROC window and annualizes the result.
        """
        daily_returns = [x.Value for x in self.referenceROC.Window]
        average_daily_return = sum(daily_returns) / len(daily_returns)
        annualized_return = average_daily_return * 252  # Annualize the daily return (252 trading days per year)
        return annualized_return

    def get_risk_free_rate(self):
        """
        Retrieve the risk-free rate (Rf) from the risk-free interest rate model at the current algorithm time.
        """
        risk_free_rate = self.risk_free_interest_rate_model.GetInterestRate(self.Time)
        return risk_free_rate

    ### Algorithm specific methods

    def etf_constituents_filter(self, constituents: List[ETFConstituentUniverse]) -> List[Symbol]:
        """
        Filter the ETF constituents and update the internal list of ETF symbols and their weights.
        Sets the rebalance flag if new constituents are detected.
        """
        etf_symbols = [c.Symbol.ToString() for c in constituents]
        new_symbols = list(set(etf_symbols) - set(self.etf_symbols))
        if len(new_symbols) > 0:
            self.etf_symbols = etf_symbols
            for c in constituents:
                self.weights[c.Symbol] = c.Weight
            self.rebalance_flag = True
            return [c.Symbol for c in constituents]
        else:
            return Universe.Unchanged

    def OnSecuritiesChanged(self, changes: SecurityChanges) -> None:
        """
        Handle changes in the universe of securities.
        Removes securities that are no longer in the universe and deregisters their indicators.
        Adds new securities to the tracking set and initializes their indicators.
        """
        for security in changes.RemovedSecurities:
            if security.Invested:
                self.Liquidate(security.Symbol)  # Liquidate removed security if it is invested
            security_data = [x for x in self._securities if x.Symbol == security.Symbol]
            if security_data:
                self.DeregisterIndicator(security_data[0].rate_of_change)
                self.DeregisterIndicator(security_data[0].beta_indicator)
                self._securities.discard(security_data[0])

        for security in changes.AddedSecurities:
            if security.Symbol == self.referenceSymbol:
                continue
            self._securities.add(SymbolData(self, security.Symbol))

class SymbolData:
    def __init__(self, algorithm: QCAlgorithm, symbol: Symbol):
        """
        Initialize the SymbolData class, setting up various indicators and attributes for each ETF constituent symbol.
        This includes setting up rate of change (ROC) and beta indicators, and initializing historical data.
        """
        self.Symbol = symbol
        self.algorithm = algorithm
        self.referenceSymbol = algorithm.referenceSymbol
        self.weight: Decimal = algorithm.weights[symbol]

        # Set up rate of change (ROC) indicator with a window size of 5 years (252 trading days per year)
        self.rate_of_change = algorithm.calculate_log_return(symbol, 1)
        self.rate_of_change.Window.Size = 252 * 5
        self.ecdf = None
        self.rate_of_change.Updated += self.update_ecdf
        # Set up beta indicator with a window size of 5 years
        self.beta_indicator = algorithm.B(self.Symbol, self.referenceSymbol, 252 * 5, Resolution.Daily)
        # Warm up the ROC indicator with historical data
        history = algorithm.History([self.referenceSymbol], 252 * 5 + 1, Resolution.Daily)
        for time, row in history.loc[self.referenceSymbol].iterrows():
            self.rate_of_change.Update(time, row.close)

        # Initialize other attributes
        self.price = None
        self.beta = None
        self.cost_of_equity = None
        self.forward_eps = None
        self.pvgo = None
        self.implied_growth = None
        self.expected_growth = None

    def update_price(self):
        """
        Update the current price of the security.
        """
        price = self.algorithm.Securities[self.Symbol].Price
        if not math.isnan(price):
            self.price = price

    def update_beta(self):
        """
        Update the beta value using the beta indicator.
        """
        beta = self.beta_indicator.Current.Value
        if not math.isnan(beta) and beta != 0:
            self.beta = beta

    def update_cost_of_equity(self):
        """
        Calculate and update the cost of equity (k) using the risk-free rate (Rf), market return (Rm), and beta.
        """
        risk_free_rate = self.algorithm.get_risk_free_rate()
        market_return = self.algorithm.calculate_market_return()
        beta = self.beta
        if beta is not None:
            self.cost_of_equity = risk_free_rate + beta * (market_return - risk_free_rate)

    def update_forward_eps(self):
        """
        Calculate and update the forward earnings per share (EPS1) using the forward earning yield and price.
        """
        earning_yield = self.algorithm.Securities[self.Symbol].Fundamentals.ValuationRatios.ForwardEarningYield
        price = self.price
        if not math.isnan(earning_yield) and earning_yield != 0:
            self.forward_eps = earning_yield * price

    def update_pvgo(self):
        """
        Calculate and update the present value of growth opportunities (PVGO).
        """
        price = self.price
        forward_eps = self.forward_eps
        cost_of_equity = self.cost_of_equity
        if forward_eps is not None and cost_of_equity is not None and price is not None:
            self.pvgo = price - forward_eps / cost_of_equity

    def update_implied_growth(self):
        """
        Calculate and update the implied growth rate.
        """
        cost_of_equity = self.cost_of_equity
        pvgo = self.pvgo
        forward_eps = self.forward_eps
        if forward_eps is not None and pvgo is not None:
            self.implied_growth = cost_of_equity * pvgo / (forward_eps + pvgo)

    def update_expected_growth(self):
        """
        Calculate and update the expected growth rate using the empirical cumulative distribution function (ecdf).
        """
        implied_growth = self.implied_growth
        ecdf = self.ecdf
        historical_growth = [x.Value for x in self.rate_of_change.Window]
        if implied_growth is not None and ecdf is not None:
            daily_implied_growth = implied_growth / 252
            p_below = ecdf.cdf.evaluate(daily_implied_growth)
            p_above = 1 - p_below
            mean_below = np.mean([x for x in historical_growth if x < daily_implied_growth])
            mean_above = np.mean([x for x in historical_growth if x > daily_implied_growth])
            self.expected_growth = p_below * mean_below + p_above * mean_above

    def update_ecdf(self, sender, updated_value):
        """
        Update the empirical cumulative distribution function (ecdf) of historical returns on capital (ROC).
        """
        if self.rate_of_change.Window.IsReady:
            historical_values = [x.Value for x in self.rate_of_change.Window]
            self.ecdf = stats.ecdf(historical_values)

    def update_indicators(self):
        """
        Update all calculated values for the symbol, including price, beta, cost of equity, forward EPS, PVGO, implied growth, and expected growth.
        """
        self.update_price()
        self.update_beta()
        self.update_cost_of_equity()
        self.update_forward_eps()
        self.update_pvgo()
        self.update_implied_growth()
        self.update_expected_growth()
# region imports
from AlgorithmImports import *
import math
from QuantConnect.Data.UniverseSelection import SecurityChanges
import numpy as np
from scipy import stats
from datetime import timedelta
# endregion

"""
    Algorithm: Licentav3
    
    Overview:
    This algorithm is designed to invest in the constituents of the SPY ETF, weighted by their implied growth rates, 
    with monthly rebalancing. The core idea is to use various financial indicators and metrics to identify and 
    weight investments based on their potential for growth. The calculations include Logarithmic Returns (LogR), 
    Present Value of Growth Opportunities (PVGO), beta, cost of equity, and Forward Earnings Per Share (EPS1).

    Key Concepts and Calculations:
    1. Logarithmic Returns (LogR):
        - Calculated as: LogR = ln(Current Price / Previous Price)
        - Frequency: Calculated daily
        - Time Period: 5 years (1260 trading days)
        - Purpose: Used to calculate beta, historical market returns, and the empirical cumulative distribution function (ecdf).
    
    2. Present Value of Growth Opportunities (PVGO):
        - Calculated as: PVGO = Price - (EPS1 / Cost of Equity)
        - Frequency: Calculated monthly
        - Inputs: Current price, Forward Earnings Per Share (EPS1), and cost of equity (k)
        - Purpose: Determines the value of growth opportunities beyond the no-growth scenario.
    
    3. Beta:
        - Calculated as: Beta = Covariance(Security Returns, Market Returns) / Variance(Market Returns)
        - Frequency: Calculated monthly
        - Time Period: 5 years (1260 trading days)
        - Purpose: Measures the sensitivity of the security's returns to market returns, used in the cost of equity calculation.
    
    4. Cost of Equity (k):
        - Calculated as: k = Risk-Free Rate + Beta * (Market Return - Risk-Free Rate)
        - Frequency: Calculated monthly
        - Inputs: Risk-free rate, historical market return, and beta
        - Purpose: Represents the return required by investors for taking on the risk of the security.
    
    5. Forward Earnings Per Share (EPS1):
        - Calculated as: EPS1 = Forward Earning Yield * Current Price
        - Frequency: Calculated monthly
        - Inputs: Forward earning yield and current price
        - Purpose: Projects future earnings based on the most recent forecast data.
    
    6. Empirical Cumulative Distribution Function (ecdf):
        - Constructed using historical daily LogR data
        - Purpose: Used to calculate expected growth rates by understanding the distribution of historical returns.
    
    Workflow:
    1. Initialize: Sets up initial parameters, universe settings, and historical data for SPY.
    2. OnData: Processes incoming data, updates indicators for each security, and plots various metrics.
    3. Monthly Rebalancing: Adjusts portfolio weights based on the latest implied growth values.
    
    Detailed Steps:
    - Initialize indicators and historical data for each SPY constituent.
    - Calculate daily LogR to maintain up-to-date returns data.
    - At the end of each month, update beta, cost of equity, EPS1, and PVGO using the latest data.
    - Assign weights to each constituent based on the calculated implied growth.
    - Implement monthly rebalancing to adjust portfolio weights according to the latest calculations.

    By following this structured approach, the algorithm aims to make informed investment decisions based on 
    comprehensive financial metrics, aligning the portfolio with securities that exhibit strong growth potential.
"""
class Licentav3(QCAlgorithm):

    def Initialize(self):
        self._securities = set() # Set to hold the securities being tracked
        self.etf_symbols = []
        self.rebalance_flag = False
        self.weights = dict()
        self.SetStartDate(2010, 1, 1)
        # self.SetEndDate(2010, 12, 1)
        self.EnableAutomaticIndicatorWarmUp = True
        self.referenceSymbol = self.AddEquity("QQQ", Resolution.Daily).Symbol
        self.SetBenchmark("QQQ")
        self.testSymbols = None # ["AAPL"] # "TSLA", "AAPL","MSFT","ZM"]
        self.SetCash(100000)

        # Flags to store or run from QC cache
        self.runFromCache   = False
        self.updateCache    = True
        

        # Set up reference rate of change (ROC) indicator with a window size of 5 years (252 trading days per year)
        self.referenceROC = self.logr(self.referenceSymbol, 1, Resolution.Daily, Field.Close)
        self.referenceROC.Window.Size = 252*5
        history = self.History(self.referenceSymbol, 252*5+1, Resolution.Daily)
        for time, row in history.loc[self.referenceSymbol].iterrows():
            self.referenceROC.Update(time, row.close)

        #Universe settings
        self.universe_settings.resolution = Resolution.Daily
        self.universe_settings.fill_forward = True

        # Add universe of ETFs 
        self.add_universe(self.universe.etf("QQQ", Market.USA, self.universe_settings, self.etf_constituents_filter))

        # schedule monthly rebalance
        self.Schedule.On( self.DateRules.MonthStart(self.referenceSymbol),
                            self.TimeRules.AfterMarketOpen(self.referenceSymbol, 31),
                            self.SetRebalanceFlag )            
    
    
    def SetRebalanceFlag(self):
         if self.Time.month in [1, 4, 7, 10]:
            self.debug("rebalancing")
            self.rebalance_flag = True        
            self.liquidate()

    def OnData(self, data: Slice):

        if len(self._securities) == 0:
            return

        if(self.rebalance_flag):
            self.rebalance_flag = False
            for security in self._securities:
                security.update()

            securities = [x for x in self._securities]
            securities_with_beta = [x for x in securities if x.beta != None]
            securities_with_cost_of_equity = [x for x in securities if x.k != None]
            securities_with_eps = [x for x in securities if x.EPS1 != None]
            securities_with_eps_and_k = [x for x in securities if x.EPS1 != None and x.k != None]
            securities_with_pgvo = [x for x in securities if x.PVGO != None]
            securities_with_implied_growth = [x for x in securities if x.implied_growth != None]
            securities_with_expected_growth = [x for x in securities if x.expected_growth != None]

            # Weighted by implied growth
            # ---------------------------- 
            # top_ten_implied_growth_stocks = sorted(securities_with_implied_growth, key=lambda x: x.implied_growth, reverse=True)[:10]
            # total_implied_growth = sum(x.implied_growth for x in top_ten_implied_growth_stocks)
            # for stock in top_ten_implied_growth_stocks:
            #     weight = stock.implied_growth / total_implied_growth
            #     self.set_holdings(stock.Symbol, weight)

            # Weighted by expected growth
            # ------------------------------ 
            top_ten_expected_growth_stocks = sorted(securities_with_expected_growth, key=lambda x: x.expected_growth, reverse=True)[:10]
            total_expected_growth = sum(x.expected_growth for x in top_ten_expected_growth_stocks)
            for stock in top_ten_expected_growth_stocks:
                weight = stock.expected_growth / total_expected_growth
                self.set_holdings(stock.Symbol, weight)




            self.plot('Secutities', 'Total', len(securities))
            # self.plot('Secutities', 'Beta', len(securities_with_beta))
            # self.plot('Secutities', 'Cost of Equity', len(securities_with_cost_of_equity))   
            # self.plot('Secutities', 'EPS1', len(securities_with_eps))
            # self.plot('Secutities', 'EPS1 and k', len(securities_with_eps_and_k))
            # self.plot('Secutities', 'PVGO', len(securities_with_pgvo))
            # self.plot('Secutities', 'Implied Growth', len(securities_with_implied_growth))
            # self.plot('Secutities', 'Expected Growth', len(securities_with_expected_growth))              




    def getRm(self):
        lst = [x.Value for x in self.referenceROC.Window]
        Rm = sum(lst) / len(lst)
        # Annualize the market return
        Rm = Rm * 252
        return Rm
    
    def getRf(self):
        # Risk free rate
        Rf = self.risk_free_interest_rate_model.get_interest_rate(self.time)
        return Rf    

    ### Algorithm specific methods
               
    def etf_constituents_filter(self, constituents: List[ETFConstituentUniverse]) -> List[Symbol]:
        etf = [c.Symbol.ToString()  for c in constituents]
        difference = list(set(etf) - set(self.etf_symbols))
        if len(difference) > 0:
            self.etf_symbols = etf
            for c in constituents:
                self.weights[c.Symbol] = c.Weight
            # self.rebalance_flag = True
            return [c.Symbol for c in constituents]
        else:
            return Universe.UNCHANGED
    
    def OnSecuritiesChanged(self, changes: SecurityChanges) -> None:
        
        for security in changes.removed_securities:
            # If removed security is in the portfolio, liquidate it
            if security.Invested:
                self.Liquidate(security.Symbol)
            # Remove the security from the _securieties list and deregister the indicators
            
            security_data = [x for x in self._securities if x.Symbol == security.Symbol]
            
            if(len(security_data) > 0):
                security_data = security_data[0]
                self.DeregisterIndicator(security_data.roc)
                self.DeregisterIndicator(security_data.betaIndicator)
                self._securities.discard(security_data)

        for security in changes.added_securities:
            if security.Symbol == self.referenceSymbol:
                continue
            if (self.testSymbols is None or security.Symbol.value in self.testSymbols):
                self._securities.add(SymbolData(self, security.Symbol))

class SymbolData:
    def __init__(self, algorithm: QCAlgorithm , symbol: Symbol):
        self.Symbol = symbol
        self.algorithm = algorithm
        self.referenceSymbol = algorithm.referenceSymbol
        self.weight: Decimal = self.algorithm.weights[symbol]

        # Setup ROC indicator and ecdf
        self.roc = algorithm.logr(symbol, 1)
        self.roc.Window.Size = 252*5
        self.ecdf = None
        self.roc.Updated += self.update_ecdf
        
        # Setup Beta indicator
        self.betaIndicator = self.algorithm.B(self.Symbol, self.referenceSymbol, 252*5, Resolution.Daily)
        
        # Warm up the indicator
        history = self.algorithm.History([self.referenceSymbol], 252*5+1, Resolution.Daily)
        for time, row in history.loc[self.referenceSymbol].iterrows():
            self.roc.Update(time, row.close) 
       
        self.price = None
        self.beta = None
        self.k = None
        self.EPS1 = None
        self.PVGO = None
        self.implied_growth = None
        self.expected_growth = None

    def updatePrice(self):
        price = self.algorithm.Securities[self.Symbol].Price
        if not math.isnan(price):
            self.price = price

    def updateBeta(self):
        beta = self.betaIndicator.Current.Value
        if not math.isnan(beta) and beta != 0:
            self.beta = beta

    def updateCostOfEquity(self):
        Rf = self.algorithm.getRf()
        Rm = self.algorithm.getRm()
        beta = self.beta
        if beta != None:
            # Calculate the cost of equity
            k = Rf + beta * (Rm - Rf)   
            self.k = k 

    def updateEPS1(self):
        earningYield = self.algorithm.Securities[self.Symbol].Fundamentals.ValuationRatios.ForwardEarningYield
        price = self.price
        if math.isnan(earningYield) == False and earningYield != 0:
            eps1 = earningYield * price
            self.EPS1 = eps1

    def updatePVGO(self):
        price = self.price
        EPS1 = self.EPS1
        k = self.k
        if EPS1 != None and k != None and price != None:
            #Calculate PVGO
            PVGO = price - EPS1/k
            self.PVGO = PVGO

    def updateImpliedGrowth(self):
        k = self.k
        PVGO = self.PVGO
        EPS1 = self.EPS1
        if EPS1 != None and PVGO != None:
            implied_growth = k * PVGO/(EPS1+PVGO)
            self.implied_growth = implied_growth

    def updateExpectedGrowth(self):
        implied_growth = self.implied_growth
        ecdf = self.ecdf
        historical_growth = [x.Value for x in self.roc.Window]
        if implied_growth != None and ecdf != None:
            daily_implied_growth = implied_growth/252
            p_below = ecdf.cdf.evaluate(daily_implied_growth)
            p_above = 1 - p_below
            mean_below = np.mean([x for x in historical_growth if x < daily_implied_growth])
            mean_above = np.mean([x for x in historical_growth if x > daily_implied_growth])
            expected_growth = p_below * mean_below + p_above * mean_above
            self.expected_growth = expected_growth
            
    def update_ecdf(self, s, item):
        if self.roc.Window.IsReady:
            lst = [x.Value for x in self.roc.Window]
            ecdf = stats.ecdf(lst)
            self.ecdf = ecdf

    def update(self):
        self.updatePrice()
        self.updateBeta()
        self.updateCostOfEquity()
        self.updateEPS1()
        self.updatePVGO()
        self.updateImpliedGrowth()
        self.updateExpectedGrowth()
        

        if( self.algorithm.testSymbols is not None and \
            self.Symbol.value in self.algorithm.testSymbols and \
            self.implied_growth is not None ):

            # self.algorithm.plot('Growth', f"{self.Symbol.value} Implied", self.implied_growth)
            self.algorithm.plot('Growth', f"{self.Symbol.value} Expected", self.expected_growth)
            self.algorithm.plot('EPS', f"{self.Symbol.value} EPS", self.EPS1)