Overall Statistics
Total Orders
49127
Average Win
0.08%
Average Loss
-0.09%
Compounding Annual Return
0.563%
Drawdown
53.900%
Expectancy
0.009
Start Equity
100000
End Equity
114971.96
Net Profit
14.972%
Sharpe Ratio
-0.023
Sortino Ratio
-0.025
Probabilistic Sharpe Ratio
0.000%
Loss Rate
46%
Win Rate
54%
Profit-Loss Ratio
0.88
Alpha
0.015
Beta
-0.455
Annual Standard Deviation
0.168
Annual Variance
0.028
Information Ratio
-0.169
Tracking Error
0.276
Treynor Ratio
0.009
Total Fees
$2480.10
Estimated Strategy Capacity
$5300000.00
Lowest Capacity Asset
WTW S9QGEXER1E1X
Portfolio Turnover
4.26%
# https://quantpedia.com/strategies/52-weeks-high-effect-in-stocks/
#
# The investment universe consists of all stocks from NYSE, AMEX and NASDAQ (the research paper used the CRSP 
# database for backtesting). The ratio between the current price and 52-week high is calculated for each stock 
# at the end of each month (PRILAG i,t = Price i,t / 52-Week High i,t). Every month, the investor then calculates
# the weighted average of ratios (PRILAG i,t) from all firms in each industry (20 industries are used), where the
# weight is the market capitalization of the stock at the end of month t. The winners (losers) are stocks in the
# six industries with the highest (lowest) weighted averages of PRILAGi,t. The investor buys stocks in the winner
# portfolio and shorts stocks in the loser portfolio and holds them for three months. Stocks are weighted equally
# and the portfolio is rebalanced monthly (which means that 1/3 of the portfolio is rebalanced each month).
#
# QC implementation changes:
#   - Universe consists of 500 most liquid stocks traded on NYSE, AMEX, or NASDAQ.

from numpy import floor
from AlgorithmImports import *
from typing import List, Dict, Tuple

class Weeks52HighEffectinStocks(QCAlgorithm):

    def Initialize(self) -> None:
        self.SetStartDate(2000, 1, 1)
        self.SetCash(100000)

        self.SetSecurityInitializer(lambda x: x.SetMarketPrice(self.GetLastKnownPrice(x)))
        
        self.period:int = 12 * 21

        # Tranching.
        self.holding_period:int = 3
        self.managed_queue:List[RebalanceQueueItem] = []
        self.industry_count:int = 6
        self.leverage:int = 5
        self.selection_sorting_key = lambda x:x.MarketCap

        # Daily 'high' data.
        self.data:Dict[Symbol, SymbolData] = {}
        
        self.symbol:Symbol = self.AddEquity('SPY', Resolution.Daily).Symbol
        
        self.fundamental_count:int = 500
        self.selection_flag:bool = False
        self.Settings.MinimumOrderMarginPortfolioPercentage = 0.
        self.UniverseSettings.Resolution = Resolution.Daily
        self.AddUniverse(self.FundamentalSelectionFunction)
        self.Schedule.On(self.DateRules.MonthEnd(self.symbol), self.TimeRules.AfterMarketOpen(self.symbol), self.Selection)

        self.settings.daily_precise_end_time = False

    def OnSecuritiesChanged(self, changes:SecurityChanges) -> None:
        for security in changes.AddedSecurities:
            security.SetFeeModel(CustomFeeModel())
            security.SetLeverage(self.leverage)
            
    def FundamentalSelectionFunction(self, fundamental: List[Fundamental]) -> List[Symbol]:
        # Update the rolling window every day.
        for stock in fundamental:
            symbol:Symbol = stock.Symbol

            if symbol in self.data:
                # Store daily price.
                self.data[symbol].update(stock.AdjustedPrice)
            
        if not self.selection_flag:
            return Universe.Unchanged
        
        selected:List[Fundamental] = [x for x in fundamental if x.HasFundamentalData and x.Market == 'usa' and x.MarketCap != 0 and \
                                    ((x.SecurityReference.ExchangeId == "NYS") or (x.SecurityReference.ExchangeId == "NAS") or (x.SecurityReference.ExchangeId == "ASE"))]

        if len(selected) > self.fundamental_count:
            selected = [x for x in sorted(selected, key=self.selection_sorting_key, reverse=True)[:self.fundamental_count]]

        group:Dict[MorningstarIndustryGroupCode, float] = {}

        # Warmup price rolling windows.
        for stock in selected:
            symbol:Symbol = stock.Symbol
            if symbol not in self.data:
                self.data[symbol] = SymbolData(symbol, self.period)
                history:DataFrame = self.History(symbol, self.period, Resolution.Daily)
                if history.empty:
                    self.Log(f"Not enough data for {symbol} yet")
                    continue
                closes:pd.Series = history.loc[symbol].close
                for time, close in closes.items():
                    self.data[symbol].update(close)
                
            if not self.data[symbol].is_ready():
                continue

            industry_group_code:MorningstarIndustryGroupCode = stock.AssetClassification.MorningstarIndustryGroupCode
            if industry_group_code == 0: continue
            
            # Adding stocks in groups.
            if industry_group_code not in group:
                group[industry_group_code] = []
            
            max_high:float = self.data[symbol].maximum()
            price:float = self.data[symbol].get_latest_price()
            
            stock_prilag:float = (stock, price / max_high)
            group[industry_group_code].append(stock_prilag)
        
        top_industries:List[MorningstarIndustryGroupCode] = []
        low_industries:List[MorningstarIndustryGroupCode] = []
        
        if len(group) != 0: 
            # Weighted average of ratios calc.
            industry_prilag_weighted_avg:Dict[int, float] = {}
            for industry_code in group:
                total_market_cap:float = sum([stock_prilag_data[0].MarketCap for stock_prilag_data in group[industry_code]])
                if total_market_cap == 0: continue
                industry_prilag_weighted_avg[industry_code] = sum([stock_prilag_data[1] * (stock_prilag_data[0].MarketCap / total_market_cap) for stock_prilag_data in group[industry_code]])
            
            if len(industry_prilag_weighted_avg) != 0:
                # Weighted average industry sorting.
                sorted_by_weighted_avg:List = sorted(industry_prilag_weighted_avg.items(), key=lambda x: x[1], reverse = True)
                top_industries = [x[0] for x in sorted_by_weighted_avg[:self.industry_count]]
                low_industries = [x[0] for x in sorted_by_weighted_avg[-self.industry_count:]]
        
        long:List[Symbol] = []
        short:List[Symbol] = []
        for industry_code in top_industries:
            for stock_prilag_data in group[industry_code]:
                symbol:Symbol = stock_prilag_data[0].Symbol
                long.append(symbol)
        
        for industry_code in low_industries:
            for stock_prilag_data in group[industry_code]:
                symbol:Symbol = stock_prilag_data[0].Symbol
                short.append(symbol)
                
        long_w:float = self.Portfolio.TotalPortfolioValue / self.holding_period / len(long)
        short_w:float = self.Portfolio.TotalPortfolioValue / self.holding_period / len(short)
        
        # symbol/quantity collection
        long_symbol_q:List[Tuple[Union[Symbol, int]]] = [(x, floor(long_w / self.data[x].get_latest_price())) for x in long]
        short_symbol_q:List[Tuple[Union[Symbol, int]]] = [(x, -floor(short_w / self.data[x].get_latest_price())) for x in short]
        
        self.managed_queue.append(RebalanceQueueItem(long_symbol_q + short_symbol_q))
        
        return long + short

    def OnData(self, data:Slice) -> None:
        if not self.selection_flag:
            return
        self.selection_flag = False

        remove_item:int = None
        
        # Rebalance portfolio
        for item in self.managed_queue:
            if item.holding_period == self.holding_period:
                # Liquidate
                for symbol, quantity in item.symbol_q:
                    self.MarketOrder(symbol, -quantity)
                remove_item = item
            
            # Trade execution    
            if item.holding_period == 0:
                open_symbol_q:List[Tuple[Symbol, int]] = []
                
                for symbol, quantity in item.symbol_q:
                    if symbol in data and data[symbol]:
                        self.MarketOrder(symbol, quantity)
                        open_symbol_q.append((symbol, quantity))
                            
                # Only opened orders will be closed        
                item.symbol_q = open_symbol_q
                
            item.holding_period += 1
            
        # We need to remove closed part of portfolio after loop. Otherwise it will miss one item in self.managed_queue.
        if remove_item:
            self.managed_queue.remove(remove_item)

    def Selection(self) -> None:
        self.selection_flag = True

class RebalanceQueueItem():
    def __init__(self, symbol_q:Tuple[Symbol, int]) -> None:
        # symbol/quantity collections
        self.symbol_q:Tuple[Symbol, int] = symbol_q  
        self.holding_period:int = 0

class SymbolData():
    def __init__(self, symbol:Symbol, period:int) -> None:
        self.Symbol:Symbol = symbol
        self.Price:RollingWindow = RollingWindow[float](period)
    
    def update(self, value:float) -> None:
        self.Price.Add(value)
    
    def is_ready(self) -> bool:
        return self.Price.IsReady
     
    def maximum(self) -> float:
        return max([x for x in self.Price])
        
    def get_latest_price(self) -> float:
        return [x for x in self.Price][0]

# Custom fee model.
class CustomFeeModel(FeeModel):
    def GetOrderFee(self, parameters):
        fee = parameters.Security.Price * parameters.Order.AbsoluteQuantity * 0.00005
        return OrderFee(CashAmount(fee, "USD"))