Created with Highcharts 12.1.2Equity2000200220042006200820102012201420162018202020222024202695k100k105k110k-4-2000.0010.00200.10100M200M020k40k024
Overall Statistics
Total Orders
138
Average Win
0.33%
Average Loss
-0.42%
Compounding Annual Return
0.111%
Drawdown
3.600%
Expectancy
0.104
Start Equity
100000
End Equity
102838.76
Net Profit
2.839%
Sharpe Ratio
-2.11
Sortino Ratio
-1.809
Probabilistic Sharpe Ratio
0.000%
Loss Rate
38%
Win Rate
62%
Profit-Loss Ratio
0.79
Alpha
-0.023
Beta
0.031
Annual Standard Deviation
0.01
Annual Variance
0
Information Ratio
-0.414
Tracking Error
0.154
Treynor Ratio
-0.707
Total Fees
$13.88
Estimated Strategy Capacity
$21000000.00
Lowest Capacity Asset
AA R735QTJ8XC9X
Portfolio Turnover
0.03%
#region imports
from AlgorithmImports import *
from dataclasses import dataclass
from collections import deque
#endregion

class RollingQuarterlyData():
    def __init__(self):
        self._data: Deque = deque(maxlen=4)
        self._current_quarter: int = -1

    def add_symbol(self, symbol: Symbol):
        if len(self._data) != 0:
            if symbol not in self._data[-1]:
                self._data[-1].append(symbol)
            else:
                self._data[-1].remove(symbol)

    def add_new_quartal(self) -> None:
        self._data.append([])

    def get_data(self):
        return list(self._data)

    def is_ready(self) -> bool:
        return len(self._data) == self._data.maxlen

    def symbol_in_all_quartals(self, symbol: Symbol) -> bool:
        return all(symbol in sublist for sublist in self._data)

class EarningsData():
    def __init__(self, top_percentile: int, period: int) -> None:
        self._earnings_df: DataFrame = pd.DataFrame()
        self._top_percentile: int = top_percentile
        self._period: int = period

    def add_earnings_performance(self, earnings_date: datetime.date, performance_dict: Dict[Symbol, float]) -> None:
        new_row: DataFrame = pd.DataFrame(performance_dict, index=[earnings_date])
    
        self._earnings_df = pd.concat([self._earnings_df, new_row])
        self._earnings_df = self._earnings_df.sort_index()

    def update_rolling_window(self, date: datetime.date) -> None:
        # Define the cutoff.
        cutoff_date: datetime.date = date - timedelta(days=self._period)
        
        self._earnings_df = self._earnings_df[self._earnings_df.index >= cutoff_date]

    def is_ready(self) -> bool:
        if self._earnings_df.empty:
            return False

        return (self._earnings_df.index.max() - self._earnings_df.index.min()).days >= self._period

    def get_current_threshold(self, last_selection: List[Symbol]) -> float:
        df_selected: DataFrame = self._earnings_df.loc[:, self._earnings_df.columns.intersection(last_selection)]

        # Flatten all existing values into a list, ignoring NaN values.
        performance_flatten: np.ndarray = df_selected.values.flatten()
        performance_flatten: np.ndarray = performance_flatten[~np.isnan(performance_flatten)]

        if len(performance_flatten) == 0:
            return None

        threshold: float = np.percentile(performance_flatten, self._top_percentile)

        return threshold

# NOTE: Manager for new trades. It's represented by certain count of equally weighted brackets for long and short positions.
# If there's a place for new trade, it will be managed for time of holding period.
class TradeManager():
    def __init__(self, 
                algorithm: QCAlgorithm, 
                long_size: int, 
                short_size: int, 
                holding_period: int) -> None:
        self.algorithm: QCAlgorithm = algorithm  # algorithm to execute orders in.
        
        self.long_size: int = long_size
        self.short_size: int = short_size
        
        self.long_len: int = 0
        self.short_len: int = 0
    
        # Arrays of ManagedSymbols
        self.symbols: List[ManagedSymbol] = []
        
        self.holding_period: int = holding_period    # Days of holding.
    
    # Add stock symbol object
    def add(
        self, 
        symbol: Symbol, 
        long_flag: bool, 
        price: float) -> None:
    
        # Open new long trade.
        quantity: Union[None, int] = None
        
        if long_flag:
            # If there's a place for it.
            if self.long_len < self.long_size:
                quantity: int = int(self.algorithm.portfolio.total_portfolio_value / self.long_size / price)

                self.algorithm.market_order(symbol, quantity)
                self.long_len += 1
            # else:
            #     self.algorithm.Log("There's not place for additional trade.")

        # Open new short trade.
        else:
            # If there's a place for it.
            if self.short_len < self.short_size:
                quantity: int = int(self.algorithm.portfolio.total_portfolio_value / self.short_size / price)

                self.algorithm.market_order(symbol, -quantity)
                self.short_len += 1
            # else:
                # self.algorithm.Log("There's not place for additional trade.")
   
        if quantity:
            managed_symbol: ManagedSymbol = ManagedSymbol(symbol, self.holding_period, quantity)
            self.symbols.append(managed_symbol)

    # Decrement holding period and liquidate symbols.
    def try_liquidate(self) -> None:
        symbols_to_delete: List[ManagedSymbol] = []
        for managed_symbol in self.symbols:
            managed_symbol.days_to_liquidate -= 1
            
            # Liquidate.
            if managed_symbol.days_to_liquidate == 0:
                symbols_to_delete.append(managed_symbol)
                
                self.algorithm.market_order(managed_symbol.symbol, -managed_symbol.quantity)
                self.long_len -= 1

        # Remove symbols from management.
        for managed_symbol in symbols_to_delete:
            self.symbols.remove(managed_symbol)
    
    def liquidate_ticker(self, ticker: str) -> None:
        symbol_to_delete: Union[None, Symbol] = None
        for managed_symbol in self.symbols:
            if managed_symbol.symbol.Value == ticker:
                if managed_symbol.long_flag:
                    self.algorithm.market_order(managed_symbol.symbol, -managed_symbol.quantity)
                    self.long_len -= 1
                else:
                    self.short_len -= 1
                    self.algorithm.market_order(managed_symbol.symbol, managed_symbol.quantity)
                symbol_to_delete = managed_symbol
                
                break
        
        if symbol_to_delete: self.symbols.remove(symbol_to_delete)
        else: self.algorithm.Debug("Ticker is not held in portfolio!")

@dataclass   
class ManagedSymbol():
    symbol: Symbol 
    days_to_liquidate: int 
    quantity: int 

# Custom fee model.
class CustomFeeModel(FeeModel):
    def GetOrderFee(self, parameters):
        fee = parameters.Security.Price * parameters.Order.AbsoluteQuantity * 0.00005
        return OrderFee(CashAmount(fee, "USD"))
# region imports
from AlgorithmImports import *
from pandas.tseries.offsets import BDay
from typing import List, Dict
from pandas.core.frame import DataFrame
from pandas.core.series import Series
from numpy import isnan
import data_tools
# endregion

class Metatron004(QCAlgorithm):

    _trade_exec_minute_offset: int = 15
    _period: int = 365
    _abnormal_perf_period: int = 4
    _top_percentile: int = 80
    _execution_day_offset: int = 1

    # Trade manager parameters.
    _holding_period: int = 120
    _long_count: int = 50

    # Market short flag.
    _market_short_flag: bool = False
    _traded_percentage: float = .5

    # Quarterly top stock check.
    _quarter_months: List[int] = [12, 3, 6, 9]
    _new_quartal_flag: bool = False

    def initialize(self) -> None:
        self.set_start_date(2000, 1, 1)
        self.set_cash(100_000)
        
        self._exchange_codes: List[str] = ['NYS', 'NAS', 'ASE']    

        self._fundamental_count: int = 500
        self._fundamental_sorting_key = lambda x: x.market_cap

        self._dataset_symbol: Symbol = self.add_data(EODHDUpcomingEarnings, "earnings").symbol

        self._earnings_dates: Dict[datetime.date, List[Symbol]] = {}
        self._last_selection: List[Symbol] = []

        self._market: Symbol = self.add_equity('SPY', Resolution.MINUTE).symbol

        self._trade_manager: data_tools.TradeManager = data_tools.TradeManager(self, self._long_count, 0, self._holding_period)
        self._earnings_df: data_tools.EarningsData = data_tools.EarningsData(self._top_percentile, self._period)
        self._rolling_data: data_tools.RollingQuarterlyData = data_tools.RollingQuarterlyData()

        self._rebalance_flag: bool = False
        self.universe_settings.resolution = Resolution.MINUTE
        self.add_universe(self.fundamental_selection_function)
        self.settings.minimum_order_margin_portfolio_percentage = 0.
        self.set_security_initializer(lambda security: security.set_fee_model(data_tools.CustomFeeModel()))

        self.schedule.on(
            self.date_rules.every_day(self._market),
            self.time_rules.before_market_close(self._market, self._trade_exec_minute_offset),
            self._before_close
        )

        self.schedule.on(
            self.date_rules.month_end(self._market),
            self.time_rules.before_market_close(self._market),
            self._month_end
        )   

    def fundamental_selection_function(self, fundamental: List[Fundamental]) -> List[Symbol]:
        selected: List[Fundamental] = [
            x for x in fundamental 
            if x.has_fundamental_data
            and x.market == 'usa'
            and x.SecurityReference.ExchangeId in self._exchange_codes
        ]
        
        if len(selected) > self._fundamental_count:
            selected = [x for x in sorted(selected, key=self._fundamental_sorting_key, reverse=True)[:self._fundamental_count]]

        self._last_selection = list(map(lambda x: x.symbol, selected))

        return self._last_selection

    def on_data(self, slice: Slice) -> None:
        # Get upcoming earnings dates.
        for symbol, upcomings_earnings_data_point in slice.get(EODHDUpcomingEarnings).items():
            execution_date: datetime.date = (upcomings_earnings_data_point.report_date + BDay(self._execution_day_offset)).date()
            if execution_date not in self._earnings_dates:
                self._earnings_dates[execution_date] = []
            if symbol not in self._earnings_dates[execution_date]:
                self._earnings_dates[execution_date].append(symbol)

        # Rebalance every day.
        if not self._rebalance_flag:
            return
        self._rebalance_flag = False

        # Liquidate opened symbols after n days.
        self._trade_manager.try_liquidate()

        if self.time.date() in self._earnings_dates:
            history: DataFrame = self.history(
                TradeBar, self._earnings_dates[self.time.date()] + [self._market], start=self.time - BDay(self._abnormal_perf_period), end=self.time
            ).unstack(level=0)

            if history.empty:
                self.log('No history data available.')
                return
            closes: DataFrame = history.resample('B').last().close[-self._abnormal_perf_period:]               
            performance: Series = closes.iloc[-1] / closes.iloc[0] - 1
            abnormal_performance: Series = performance.drop(self._market) - performance[self._market]
 
            if self._earnings_df.is_ready():
                threshold: float = self._earnings_df.get_current_threshold(self._last_selection)
                if threshold:
                    for symbol in self._earnings_dates[self.time.date()]:
                        if slice.contains_key(symbol) and slice[symbol]:
                            # Check if stock is in top abnormal performance.
                            if symbol in self._last_selection and abnormal_performance[symbol] > threshold:
                                self._rolling_data.add_symbol(symbol)

                            if self._rolling_data.is_ready():
                                # Check if stock is in all 4 quartals.
                                if self._rolling_data.symbol_in_all_quartals(symbol):
                                    # Open new trades.
                                    self._trade_manager.add(symbol, True, slice[symbol].close)
                                    if self._market_short_flag:
                                        if not self.portfolio[self._market].invested:
                                            self.set_holdings(self._market, -self._traded_percentage)
                                
            # Save abnormal performance data to dataframe.
            self._earnings_df.add_earnings_performance(self.time.date(), abnormal_performance.to_dict())
            self._earnings_dates.pop(self.time.date())
                    
        if self._new_quartal_flag:
            self._new_quartal_flag = False
            self._rolling_data.add_new_quartal()

        self._earnings_df.update_rolling_window(self.time.date())
        self._last_selection.clear()

    def _before_close(self) -> None:
        self._rebalance_flag = True

    def _month_end(self) -> None:
        if self._market_short_flag:
            if self.portfolio.invested:
                if self.securities[self._market].get_last_data():
                    self.set_holdings(self._market, -self._traded_percentage)

        if self.time.month in self._quarter_months:
            self._new_quartal_flag = True