Overall Statistics
Total Trades
208
Average Win
0.09%
Average Loss
-0.22%
Compounding Annual Return
4.294%
Drawdown
7.900%
Expectancy
-0.227
Net Profit
4.294%
Sharpe Ratio
0.345
Probabilistic Sharpe Ratio
21.707%
Loss Rate
46%
Win Rate
54%
Profit-Loss Ratio
0.42
Alpha
-0.016
Beta
0.577
Annual Standard Deviation
0.101
Annual Variance
0.01
Information Ratio
-0.584
Tracking Error
0.091
Treynor Ratio
0.06
Total Fees
$210.18
Estimated Strategy Capacity
$5900000.00
Lowest Capacity Asset
TPL R735QTJ8XC9X
Portfolio Turnover
1.47%
#region imports
from AlgorithmImports import *
#endregion
# https://quantpedia.com/Screener/Details/14

import numpy as np
import pandas as pd
from scipy.stats import linregress
from collections import deque
from datetime import timedelta
from datetime import datetime
import math

class MomentumEffectAlgorithm(QCAlgorithm):

    def Initialize(self):
        self.SetStartDate(2016, 1, 1)  # Set Start Date
        self.SetEndDate(2017, 1, 1)    # Set End Date       
        self.SetCash(100000)           # Set Strategy Cash

        # create the equities universe
        self.AddUniverse(self.CoarseSelectionFunction, self.FineSelectionFunction)
        self.resolution = Resolution.Daily

        self.UniverseSettings.Resolution = self.resolution
        self.UniverseSettings.DataNormalizationMode = DataNormalizationMode.Adjusted
        self.UniverseSettings.MinimumTimeInUniverse = timedelta(days=7)
        
        self.symbol_dictionary = {} # dictionary for holding SymbolData key by symbol
        self.invested = []   # list of securities currently invested in portfolio 
        self.ranked_selection = [] # list of securities selected by momentum ranking and strategy rules on rerank
        self.excluded_securities = ['SPY']

        self.N_FACTOR = 20
        self.risk_free_rate = 0.03

        # adjust these in line with Clenow - want approx 20 stocks in the portfolio
        self.num_coarse = 500 # Number of symbols selected at Coarse Selection
        self.num_positions = 20     # Number of symbols with open positions
        self.num_current_holdings = 0

        self.risk_factor = 0.001    # targeting 10 basis point move per day

        self.momentum_period = 63 # 3 month 
        self.momentum_threshold = 0.0  # TODO: check the value to see if this is reasonable
        self.filter_period = 100
        self.volatility_period = 20
        self.index_filter_period = 200

        # dictionaries for holding moving average, momentum, and volatility values for each symbol
        self.ma = {}
        self.sharpe = {}
        self.returns = {}
        self.momentum = {}
        self.exp_momentum = {}
        self.volatility = {}
        
        # variables to control the portfolio rebalance and the stock selection reranking
        self.UpdateFineFilter = 1
        self.month = -1
        self.dayofweek = 3
        self.weekly_rebalance = False
        self.monthly_rebalance = False
        self.rerank = True
        
        # set up market index TODO change this to AddIndex
        market = self.AddEquity("SPY", self.resolution)
        market.SetDataNormalizationMode(DataNormalizationMode.Raw)
        self.market = market.Symbol
        
        self.SetBenchmark(self.market)

        self.index_sma = self.SMA(self.market, self.index_filter_period, self.resolution)
        # self.RegisterIndicator(self.market, self.index_sma, self.resolution)
        # self.WarmUpIndicator(self.market, self.index_sma)

        # set Brokerage model and Fee Structure
        self.SetBrokerageModel(BrokerageName.InteractiveBrokersBrokerage, AccountType.Cash)

        # set Free Cash to 0.5%
        self.Settings.FreePortfolioValuePercentage = 0.005


    # risk parity position sizing - input desired percentage, adjusts requested percentage holding by volality to give constant risk
    def CalculateRiskParityPositionSizePercentage(self, symbol, vol, percentage)-> float:
        quantity = self.CalculateOrderQuantity(symbol, percentage) # get the quantity of shares for holding of 100/num_positions %
        rounded_holding = 0.0
        if (vol > 0 and quantity != 0):
            holding_percent = (quantity / vol) 
            rounded_holding = round(holding_percent, 2) # round down to 0.01
        return rounded_holding 

    # risk parity position sizing
    def CalculateRiskParityPositionSize(self, symbol, percentage, vol_risk_weighting)-> float:       
        quantity = self.CalculateOrderQuantity(symbol, percentage) # get the quantity of shares for a holding target percentage
        desired_quantity = (quantity * vol_risk_weighting) 
        rounded_holding = round(desired_quantity, 0) # round to int
        return rounded_holding

    # QC methods and overrrides
    def OnWarmUpFinished(self) -> None:
        self.Log("Equities Momentum Algorithm Ready")

    # may eventually switch this out for small or mid cap stocks
    def CoarseSelectionFunction(self, coarse):
        '''Drop securities which have no fundamental data or have too low prices. Select those with highest by dollar volume'''
        selected = sorted([x for x in coarse if x.HasFundamentalData and x.Price > 5], key=lambda x: x.DollarVolume, reverse=True)
        return [x.Symbol for x in selected[:self.num_coarse]]

    
    # the approach we take to universe selection is (filtered, then ranked), then those symbols are managed (via symbol data or risk management) in ondata
    def FineSelectionFunction(self, fundamental):
        if not self.rerank:
            return Universe.Unchanged
        
        universe_valid = [x for x in fundamental
            if float(x.EarningReports.BasicAverageShares.ThreeMonths) * x.Price > 1e9
            and x.SecurityReference.IsPrimaryShare
            and x.SecurityReference.SecurityType == "ST00000001"
            and x.SecurityReference.IsDepositaryReceipt == 0
            and x.CompanyReference.IsLimitedPartnership == 0
            and x.OperationRatios.ROIC
            and x.OperationRatios.CapExGrowth
            and x.OperationRatios.FCFGrowth
            and x.ValuationRatios.BookValueYield
            and x.ValuationRatios.EVToEBITDA
            and x.ValuationRatios.PricetoEBITDA
            and x.ValuationRatios.PERatio
            and x.MarketCap
            ]
        
        universe_symbols = [i.Symbol for i in universe_valid]
        self.returns, self.momentum, self.exp_momentum, self.volatility, self.sharpe, self.ma = self.get_indicator_data(universe_symbols)
        
        sortedByfactor0 = sorted(universe_valid, key=lambda x: self.returns[x.Symbol],                   reverse=False) # high return or sharpe or low volatility
        sortedByfactor1 = sorted(universe_valid, key=lambda x: x.OperationRatios.ROIC.OneYear,           reverse=False) # high ROIC
        sortedByfactor2 = sorted(universe_valid, key=lambda x: x.OperationRatios.CapExGrowth.ThreeYears, reverse=False) # high growth
        sortedByfactor3 = sorted(universe_valid, key=lambda x: x.OperationRatios.FCFGrowth.ThreeYears,   reverse=False) # high growth
        sortedByfactor4 = sorted(universe_valid, key=lambda x: x.ValuationRatios.BookValueYield,         reverse=False) # high Book Value Yield
        sortedByfactor5 = sorted(universe_valid, key=lambda x: x.ValuationRatios.EVToEBITDA,             reverse=True)  # low enterprise value to EBITDA
        sortedByfactor6 = sorted(universe_valid, key=lambda x: x.ValuationRatios.PricetoEBITDA,          reverse=True)  # low share price to EBITDA
        sortedByfactor7 = sorted(universe_valid, key=lambda x: x.ValuationRatios.PERatio,                reverse=True)  # low share price to its per-share earnings
        sortedByfactor8 = sorted(universe_valid, key=lambda x: x.MarketCap,                              reverse=True)  # market cap

        stock_dict = {}
        for i, elem in enumerate(sortedByfactor0):
            rank0 = i
            rank1 = sortedByfactor1.index(elem)
            rank2 = sortedByfactor2.index(elem)
            rank3 = sortedByfactor3.index(elem)
            rank4 = sortedByfactor4.index(elem)
            rank5 = sortedByfactor5.index(elem)
            rank6 = sortedByfactor6.index(elem)
            rank7 = sortedByfactor7.index(elem)
            rank8 = sortedByfactor8.index(elem)
            score = sum([rank0*1.0, rank1*1.0, rank2*0.0, rank3*0.3, rank4*0.0, rank5*0.0, rank6*0.0, rank7*0.0, rank8*0.0])
            stock_dict[elem] = score
        
        self.sorted_stock_dict = sorted(stock_dict.items(), key=lambda x:x[1], reverse=True)
        sorted_symbol = [x[0] for x in self.sorted_stock_dict]
        
        top  = [x for x in sorted_symbol[:self.N_FACTOR]]
        self.ranked_selection = [i.Symbol for i in top]
        self.rerank = False
        self.weekly_rebalance = True
        return self.ranked_selection
    
    # functions to calculate momentum, exponential momentum, moving averages, returns, volatility and sharpe
    def momentum_func(self, closes):
        returns = np.log(closes)
        x = np.arange(len(returns))
        slope, _, rvalue, _, _ = linregress(x, returns)
        return ((1 + slope) ** 252) * (rvalue ** 2)  # annualize slope and multiply by R^2

    def exponential_momentum_func(self, closes):
        returns = np.log(closes)
        x = np.arange(len(returns))
        slope, _, rvalue, _, _ = linregress(x, returns)
        annualised_slope = (np.power(np.exp(slope), 252)-1)*100
        return annualised_slope * (rvalue ** 2)  # annualize slope and multiply by R^2

    def get_indicator_data(self, symbols):
        hist_df = self.History(symbols, self.momentum_period, Resolution.Daily)
        returns = {}
        ma = {}
        mom = {}
        exp_mom = {}
        volatility = {}
        sharpe = {}
        for s in symbols:
            closes = hist_df.loc[str(s)]['close']
            mom[s] = self.momentum_func(closes)
            exp_mom[s] = self.exponential_momentum_func(closes)
            ret = np.log( hist_df.loc[str(s)]['close'] / hist_df.loc[str(s)]['close'].shift(1) )
            returns[s] = ret.mean() * 252   # annualised average return
            volatility[s] = ret.std() * np.sqrt(252) 
            sharpe[s] = (returns[s] - self.risk_free_rate) / volatility[s]
            # ma[s] = hist_df.loc[str(s)]['close'].rolling(self.filter_period).mean()

        return returns, mom, exp_mom, volatility, sharpe, ma
     
    def rma(s: pd.Series, period: int) -> pd.Series:
        return s.ewm(alpha=1 / period).mean()

    def atr(df: pd.DataFrame, length: int = 14) -> pd.Series:
        high, low, prev_close = df['high'], df['low'], df['close'].shift()
        tr_all = [high - low, high - prev_close, low - prev_close]
        tr_all = [tr.abs() for tr in tr_all]
        tr = pd.concat(tr_all, axis=1).max(axis=1)
        atr_ = rma(tr, length)
        return atr_

    def OnData(self, data):
        
        if self.IsWarmingUp: return

        if (self.Time.weekday() == self.dayofweek):
            self.rerank = True

        if (self.month != self.Time.month):
            self.monthly_rebalance = True
            self.month = self.Time.month

        # rerank weekly, rebalance monthly
        if self.weekly_rebalance:
            
            self.Debug(f'{self.Time} OnData Weekly Portfolio Rerank')
            liquidated = []
            purchased = []

            # Sell
            selected_symbols = [x.Value for x in self.ranked_selection]
            self.Debug(f'selected: {selected_symbols}')

            deselected_symbols = [symbol for symbol in self.invested if symbol not in self.ranked_selection]
            self.Debug(f'deselected: {deselected_symbols}')

            invested_symbols = [x.Value for x in self.invested]
            self.Debug(f'invested: {invested_symbols}')

            exit_df = self.History(deselected_symbols, self.filter_period, self.resolution)
            # Liquidate securities that we currently hold, but no longer in the top momentum rankings and have closed below the sma 100 filter
            for symbol in deselected_symbols:
                if data.Bars.ContainsKey(symbol):
                    close = data[symbol].Close
                else:
                    close = self.Securities[symbol].Price     # frankly you can just always use this to get last price for security
                filterMa = exit_df.loc[str(symbol)]['close'].mean()   # sum()/self.filter_period
                if close < filterMa:
                    self.Liquidate(symbol, f'{str(symbol)} as closed {close} is below filter {filterMa}')
                    liquidated.append(symbol.ID.ToString().split(' ')[0])
                    self.num_current_holdings -=1
                        
            # log which ones were removed and margin now
            self.Debug(f'OnData: SOLD {liquidated}. Current Cash: {self.Portfolio.Cash:.2f} Margin: {self.Portfolio.MarginRemaining:.2f}')

            # Buy
            # check if the market is in an uptrend: index close > index 200 sma
            index_df = self.History(self.market, self.index_filter_period, self.resolution)
            self.sma200 = index_df.loc[str(self.market)]['close'].mean() #  sum()/self.index_filter_period
            if data.Bars.ContainsKey(self.market):
                market_price = data[self.market].Close
            else:
                market_price = self.Securities[self.market].Close
            buying = (market_price > self.sma200) # get last value
            self.Debug(f'OnData: Buying = {buying} [{market_price:.2f}, {self.sma200:.2f}]')

            # if are buying, buy selected securities (check not already in holdings first though)
            if buying:
                self.invested = [x.Symbol for x in self.Portfolio.Values if x.Invested]
                self.num_current_holdings = len(self.invested)

                invested_symbols = [x.Value for x in self.invested]
                self.Debug(f'OnData: {self.num_current_holdings} invested: {invested_symbols}')

                portfolio_additions = [symbol for symbol in self.ranked_selection if symbol not in self.invested]
                new_symbols = [x.Value for x in portfolio_additions]
                target_new_holdings = len(new_symbols)
                self.Debug(f"OnData: {target_new_holdings} potential new holdings: " + str(new_symbols))

                # get risk weighting based on volatility
                weights = {}
                total = 0
                for symbol in portfolio_additions:
                    total += self.volatility[symbol]
                total = total * 0.99 # force total weights to less than 1, reserve ~ 1% of portfolio to cash
                for symbol in portfolio_additions:
                    weights[symbol]=round((self.volatility[symbol] / total) * (target_new_holdings/(self.num_positions)),3)

                purchased = []
                total_weight = 0
                # set the holdings based on the weights and available cash
                for symbol in portfolio_additions:
                    quantity = self.CalculateOrderQuantity(symbol, weights[symbol])
                    if data.Bars.ContainsKey(symbol):
                        price = data[symbol].Price
                    else:
                        price = self.Securities[symbol].Price
                    if (self.Portfolio.GetBuyingPower(symbol) > (price * quantity)):
                        self.SetHoldings(symbol, weights[symbol])
                        purchased.append(f"{symbol.ID.ToString().split(' ')[0]} {weights[symbol]:.3f}")
                        total_weight = total_weight + weights[symbol]
                
                self.Debug(f'OnData: BOUGHT {purchased} based on rankings and available cash. {total_weight}')
            
            self.weekly_rebalance = False
            portfolio_update = f'Value: {self.Portfolio.TotalPortfolioValue:.2f}, Holdings: {self.Portfolio.TotalHoldingsValue:.2f} Cash: {self.Portfolio.Cash:.2f}, [Margin Used: {self.Portfolio.TotalMarginUsed:.2f}, Margin Remaining {self.Portfolio.MarginRemaining:.2f}]'
            self.Debug(f'Portfolio: {portfolio_update}')

        if self.monthly_rebalance:
            self.Debug(f'{self.Time} OnData Monthly Portfolio Rebalance')

            rebalanced = []
            current_portfolio = [x.Symbol for x in self.Portfolio.Values if x.Invested]
            # recalculate indicator data for our current portfolio TODO: break apart the calculations so we can limit this to just volatility recalc
            self.returns, self.momentum, self.exp_momentum, self.volatility, self.sharpe, self.ma = self.get_indicator_data(current_portfolio)

            self.num_current_holdings = len(current_portfolio)

            current_portfolio_symbols = [x.Value for x in current_portfolio]
            self.Debug(f'current portfolio: {current_portfolio_symbols}')

            # get risk weighting bsed on volatility
            weights = {}
            total = 0
            for symbol in current_portfolio:
                total += self.volatility[symbol]
            total = total * 0.99 # force total weights to less than 1
            for symbol in current_portfolio:
                weights[symbol]=round(self.volatility[symbol]/total, 3)

            # use PortfolioTarget and SetHoldings to rebalance
            # TODO: Might have to liquidate all positions before rebalance, unless setHoldings can handle the deltas
            target_portfolio = []
            total_weight = 0
            if len(current_portfolio) > 1:
                for symbol in current_portfolio:
                    target_portfolio.append(PortfolioTarget(symbol, weights[symbol]))
                    rebalanced.append(f"{symbol.ID.ToString().split(' ')[0]} {weights[symbol]:.3f}")
                    total_weight = total_weight + weights[symbol]
                    
                # set holdings to rebalance to target portfolio
                self.SetHoldings(target_portfolio)
                self.monthly_rebalance = False
                self.Debug(f'OnData: Portfolio Rebalanced {self.num_current_holdings} Holdings {total_weight}, Portfolio Weights: {rebalanced}')
            
            self.monthly_rebalance = False
 
    def OnSecuritiesChanged(self, changes):

        addedSymbols = []
        removedSymbols = []

        # Clean up securities list and indicator data for removed securities
        for security in changes.RemovedSecurities:
            symbol = security.Symbol
            # self.Liquidate(symbol, f'{self.Time} {str(symbol)} no longer in Universe')  

        # Create indicators and warm them up for securities newly added to the universe
        for security in changes.AddedSecurities:
            symbol = security.Symbol
            if symbol not in self.excluded_securities: 
                addedSymbols.append(symbol.ID.ToString().split(' ')[0])

        self.Debug(f'OnSecuritiesChanged: Added: {addedSymbols}')
        self.Debug(f'OnSecuritiesChanged: Removed: {removedSymbols}')