Overall Statistics
Total Orders
248
Average Win
0.30%
Average Loss
-0.20%
Compounding Annual Return
29.635%
Drawdown
10.200%
Expectancy
0.525
Start Equity
1000000
End Equity
1085181.37
Net Profit
8.518%
Sharpe Ratio
0.732
Sortino Ratio
1.018
Probabilistic Sharpe Ratio
46.186%
Loss Rate
39%
Win Rate
61%
Profit-Loss Ratio
1.51
Alpha
0.046
Beta
1.44
Annual Standard Deviation
0.244
Annual Variance
0.06
Information Ratio
0.416
Tracking Error
0.207
Treynor Ratio
0.124
Total Fees
$702.01
Estimated Strategy Capacity
$68000000.00
Lowest Capacity Asset
WING W1BBEDOGB8MD
Portfolio Turnover
6.90%
import numpy as np
from AlgorithmImports import *

class AssetWeightCalculator:
    def __init__(self, algorithm: QCAlgorithm):
        
        self.algorithm = algorithm

        self.risk_free = self.algorithm.add_equity("BIL", Resolution.HOUR)
        
    def coarse_selection(self, coarse):
        """
        Selects stonks, first filter
        """

        # Sorts by dollar volume before taking top 200 
        sorted_by_volume = sorted([x for x in coarse if x.price > 10 and x.has_fundamental_data],
                                key=lambda x: x.dollar_volume, 
                                reverse=True)
        return [x.symbol for x in sorted_by_volume][:200]

    def fine_selection(self, fine):
        """
        Selects stonks, second filter
        """
        filtered = [x.symbol for x in fine if x.market_cap is not None and x.market_cap > 10e9]
        self.algorithm.debug(f"Fine Selection: {len(filtered)} symbols passed filters")

        # Doing it this way makes it so that stocks are ranked on each universe update and then the macds can be redone with the scheduler in main
        ranked_symbols = self.rank_stocks(filtered)
        return ranked_symbols

    def calculate_sharpe_ratio(self, symbol, period=4914): # This is 3 yrs worth of trading days
        """
        Calculates the sharpe
        """
        try:
            # If a KeyValuePair was recieved only take the symbol
            if hasattr(symbol, "Key"):
                symbol = symbol.Key

            history = self.algorithm.history([symbol], period, Resolution.HOUR) 

            if history.empty:
                self.algorithm.debug(f"No history for {symbol.value}")
                return None
            
            # Get risk-free rate
            rf_history = self.algorithm.history(self.risk_free.symbol, 1, Resolution.HOUR)
            risk_free_rate = rf_history['close'].iloc[-1]/100 if not rf_history.empty else 0.02  # Default to 2% if no data
            
            # Sharpe ratio logic
            returns = history['close'].pct_change().dropna()
            excess_returns = returns - (risk_free_rate/1638)
            mean_excess_return = excess_returns.mean() * 1638
            std_dev = excess_returns.std() * np.sqrt(1638)
            return mean_excess_return / std_dev if std_dev != 0 else None
            
        except Exception as e:
            self.algorithm.debug(f"Error calculating Sharpe for {symbol.value}: {str(e)}")
            return None

    def rank_stocks(self, symbols):
        """
        Ranks da top 50 stocks based on sharpe
        """
        if not symbols:
            self.algorithm.debug("No symbols to rank")
            return []
            
        self.algorithm.debug(f"Ranking {len(symbols)} symbols")

        # Converting from key pair if neccessary
        symbols = [s.Key if hasattr(s, 'Key') else s for s in symbols]
        scores = {symbol: self.calculate_sharpe_ratio(symbol) for symbol in symbols}
        valid_scores = {k: v for k, v in scores.items() if v is not None}
        
        self.algorithm.debug(f"Valid Sharpe ratios: {len(valid_scores)} out of {len(symbols)}")
        
        if not valid_scores:
            return []
            
        sorted_scores = sorted(valid_scores, key=valid_scores.get, reverse=True)[:20]

        self.algorithm.log(f"All symbols before ranking: {[s.value for s in symbols]}")
        self.algorithm.log(f"Symbols after filtering: {[s.value for s in valid_scores.keys()]}")

        return sorted_scores

    def normalize_scores(self, scores):
        """
        The list of scores from the ranking method are
        normalized using a z score so that an additive
        operation may be used in WeightCombiner()
        """
        values = np.array(list(scores.values()))
        mean = np.mean(values)
        std_dev = np.std(values)

        if std_dev == 0:
            # If no variation in scores, assign equal normalized scores
            return {symbol: 0 for symbol in scores.keys()}

        normalized_scores = {symbol: (score - mean) / std_dev for symbol, score in scores.items()}
        print(normalized_scores) #To see output for debugging
        return normalized_scores

from AlgorithmImports import *

class MACDSignalGenerator:

    def __init__(self, algorithm: QCAlgorithm, symbols: list, cash_buffer: float = 0.05):
        self.algorithm = algorithm
        self.symbols = symbols
        self.cash_buffer = cash_buffer
        self.macd_indicators = {}  # {symbol: {variant: MACD}}
            
        # Define MACD parameters for different variants
        self.macd_variants = {
            "slow": {"fast": 12, "slow": 26, "signal": 9},
            "slow-med": {"fast": 9, "slow": 19, "signal": 5},
            "med-fast": {"fast": 7, "slow": 15, "signal": 3},
            "fast": {"fast": 5, "slow": 12, "signal": 2},
        }

    def remove_symbols(self, symbols: list):
        """
        Removes MACD indicators for the specified symbols.
        """
        for symbol in symbols:

            # Liquidate position before removing indicator
            self.algorithm.liquidate(symbol)

            # Unregister and delete indicators tied to each symbol
            if symbol in self.macd_indicators:
                for macd in self.macd_indicators[symbol].values():  # Better: gets MACD objects directly
                    self.algorithm.unregister_indicator(macd)
                del self.macd_indicators[symbol]
                

    def add_symbols(self, new_symbols):
            """
            Add in the new symbols that are given by AssetWeightCalculator.
            """
            # Log initial attempt
            self.algorithm.debug(f"Attempting to add symbols: {[s.value for s in new_symbols]}")

            # Get historical data for new symbols
            history = self.algorithm.history([s for s in new_symbols], 
                                        35,  # Longest MACD period needed
                                        Resolution.HOUR)

            # Log history data availability
            self.algorithm.debug(f"History data available for: {history.index.get_level_values(0).unique()}")
            
            self.symbols.extend(new_symbols)
            for symbol in new_symbols:

                security = self.algorithm.securities[symbol]

                # Detailed security check logging
               # self.algorithm.debug(f"Security {symbol.value} check:"
                               # f" has_data={security.has_data},"
                               # f" is_tradable={security.is_tradable},"
                               # f" price={security.price}")

                # Checking if price is 0
                if not (security.has_data and security.is_tradable and security.price > 0):
                    self.algorithm.debug(f"Waiting for valid price data: {symbol.value}")
                    continue

                # Adding the symbol
                if symbol not in self.macd_indicators:
                    self.macd_indicators[symbol] = {}

                    # Get symbol's historical data
                    if symbol not in history.index.get_level_values(0):
                        self.algorithm.debug(f"No history data for: {symbol.value}")
                        continue
                        
                    symbol_history = history.loc[symbol]
                    self.algorithm.debug(f"History rows for {symbol.value}: {len(symbol_history)}")

                    for variant, params in self.macd_variants.items():
                        macd = self.algorithm.macd(
                            symbol=symbol,
                            fast_period=params["fast"], 
                            slow_period=params["slow"], 
                            signal_period=params["signal"], 
                            type=MovingAverageType.EXPONENTIAL,
                            resolution=Resolution.HOUR,
                            selector=Field.CLOSE
                        )
                        self.macd_indicators[symbol][variant] = macd

                        # Warm up MACD with historical data
                        for time, row in symbol_history.iterrows():
                            macd.update(time, row['close'])
                            
                        self.macd_indicators[symbol][variant] = macd

    def calculate_position_sizes(self):
        position_sizes = {}
        max_position_limit = 0.1

        # Check if we have any symbols to process
        if not self.symbols or not self.macd_indicators:
            self.algorithm.debug("No symbols available for position calculation")
            return position_sizes
        
        # Calculating the maximum one variant can be in size
        max_position = (1 - self.cash_buffer) / (len(self.symbols) * len(self.macd_variants))


        for symbol in self.macd_indicators:
            position_sizes[symbol] = {}

            for variant, macd in self.macd_indicators[symbol].items():
                if macd.is_ready:

                    security = self.algorithm.securities[symbol]

                    # Detailed security check logging
                    # self.algorithm.debug(f"Position Check for {symbol.value}:"
                                   # f" has_data={security.has_data},"
                                   # f" is_tradable={security.is_tradable},"
                                   # f" price={security.price},"
                                   # f" last_data={security.get_last_data() is not None},")
                    
                    # More comprehensive check
                    # if not (security.has_data and 
                           # security.is_tradable and 
                           # security.price > 0 and
                           # security.get_last_data() is not None):
                       # self.algorithm.debug(f"Security not ready: {symbol.value}")
                       # continue

                    # Distance between fast and slow
                    distance = macd.fast.current.value - macd.slow.current.value

                    # Normalize the distance as a percentage difference and then as a fraction of max position
                    position_size = max_position * (distance / macd.slow.current.value) * 70 # Scalar value of max_position, the scalar integer can be though of as a form of leverage setting
                    
                    # Only allow positive positions, cap at maximum
                    position_size = max(0, min(position_size, max_position_limit))
                    position_sizes[symbol][variant] = position_size
                    #self.algorithm.debug(f"Calculated position for {symbol.value} {variant}: {position_size}")
                
                else:
                  position_sizes[symbol][variant] = 0 

        # Running daily cause the logging is too heavy hourly 
        if self.algorithm.time.hour == 10 and self.algorithm.time.minute == 0:
            rounded_positions = [(s.value, {k: round(v, 5) for k, v in sizes.items()}) for s, sizes in position_sizes.items()]
            #self.algorithm.debug(f"Daily position sizes proposed: {rounded_positions}")

        return position_sizes
from AlgorithmImports import *
from ContinuousMACDSignalGenerator import MACDSignalGenerator
from AssetWeightCalculator import AssetWeightCalculator

class TestMACDInitializationAlgorithm(QCAlgorithm):
    def Initialize(self):

        self.set_start_date(2024, 2, 7)
        self.set_end_date(2024, 6, 1)
        self.set_cash(1000000)
        self.high_water_mark = self.portfolio.total_portfolio_value
        
        self.set_benchmark("SPY")
        self.bond_etf = self.add_equity("BIL", Resolution.HOUR)
        self.spy = self.add_equity("SPY", Resolution.HOUR)
        
        # Initialize 50-week SMA with historical data
        history = self.history([self.spy.symbol], 1750, Resolution.HOUR)
        
        # Create and warm up the SMA
        self.spy_sma = self.SMA(self.spy.symbol, 1750, Resolution.HOUR)
        
        if not history.empty:
            for time, row in history.loc[self.spy.symbol].iterrows():
                self.spy_sma.update(time, row['close'])
                
        self.debug(f"SMA initialized: {self.spy_sma.is_ready}, Current Value: {self.spy_sma.current.value}")

        

        # Initialize tracking set for universe changes
        self.current_symbols = set() 
        
        # Initialize the asset weight calculator
        self.asset_calculator = AssetWeightCalculator(self)

        # Add universe for coarse and fine selection
        self.spy = self.add_equity("SPY", Resolution.HOUR)
        self.add_universe(self.asset_calculator.coarse_selection, self.asset_calculator.fine_selection)

        # Universe settings
        self.universe_settings.Resolution = Resolution.HOUR

        # Initialize MACD generator 
        self.macd_generator = MACDSignalGenerator(self, [])

        # Scheduled ranking update
        self.schedule.on(self.date_rules.week_start("SPY", 3), 
                         self.time_rules.after_market_open("SPY", 1), 
                         self.rank_and_update_symbols
                        )
        
        # Schedule Wednesday 11:30 rebalancing
        self.schedule.on(
                        self.date_rules.week_start("SPY", 3),
                        self.time_rules.after_market_open("SPY", 120),
                        self.rebalance_positions
                        )


    def rank_and_update_symbols(self):

        # Skip during warmup
        if self.is_warming_up:
            self.debug("Skipping rank_and_update during warmup")
            return

        # Get new universe composition
        new_symbols = set(self.active_securities.keys)
        
        # Determine added and removed symbols
        removed_symbols = self.current_symbols - new_symbols
        added_symbols = new_symbols - self.current_symbols
        
        # Update tracking set
        self.current_symbols = new_symbols
        
        # Handle removals and additions
        if removed_symbols:
            self.macd_generator.remove_symbols(list(removed_symbols))
            self.log(f"Weekly Update - Removed: {[x.value for x in removed_symbols]}")
        
        if added_symbols:
            self.macd_generator.add_symbols(list(added_symbols))
            self.log(f"Weekly Update - Added: {[y.value for y in added_symbols]}")
        
        # Rank current universe and update MACD
        ranked_symbols = self.asset_calculator.rank_stocks(self.active_securities)

        self.macd_generator.symbols = ranked_symbols
        
        # self.debug(f"Weekly Update - Ranked Symbols: {[s.value for s in ranked_symbols]}")

    def on_data(self, data: slice):
        # If anything is still needing to recieve data before running then return
        if self.is_warming_up: return

        # Update high water mark
        self.high_water_mark = max(self.high_water_mark, self.portfolio.total_portfolio_value)
        
        # Calculate drawdown
        drawdown = (self.portfolio.total_portfolio_value / self.high_water_mark) - 1
        
        # Example: Exit if drawdown exceeds threshold
        if drawdown < -0.15:  # 15% drawdown
            self.high_water_mark = self.portfolio.total_portfolio_value
            self.liquidate()
            self.debug(f"Hitting stop!")

        self.macd_generator.calculate_position_sizes()

        

    def rebalance_positions(self):
        """Actual position rebalancing 119 mins after re-ranking"""

        if self.is_warming_up: 
            return
        
        # Trying to use a market regime filter to reduce drawdowns
        if self.spy.price < self.spy_sma.current.value:
            # Check if we're already heavily in bonds
            bond_position = self.portfolio[self.bond_etf.symbol].holdings_value / self.portfolio.total_portfolio_value
            
            if bond_position >= 0.5:  # Already more than 50% in bonds
                self.debug(f"Maintaining bond position at {bond_position:.2%}")

                # Liquidate any non-bond positions
                for symbol in self.portfolio.keys():
                    if symbol != self.bond_etf.symbol:
                        self.liquidate(symbol)
                return

            self.debug(f"Bearish regime - Moving to bonds. SPY: {self.spy.price}, SMA: {self.spy_sma.current.value}")
            self.liquidate()
            self.set_holdings(self.bond_etf.symbol, 1.0)
            return

        # Bullish regime - first liquidate any bond position
        if self.portfolio[self.bond_etf.symbol].holdings_value > 0:
            self.debug("Exiting bonds and returning to MACD strategy")
            self.liquidate(self.bond_etf.symbol)

        position_sizes = self.macd_generator.calculate_position_sizes()

        # Apply the positions
        for symbol, variants in position_sizes.items():
            security = self.securities[symbol]
            
            if not security.has_data or not security.is_tradable:
                self.debug(f"Skipping trade for {symbol.value} - not ready")
                continue
                
            total_size = sum(variants.values())  # Use variant values
            if abs(total_size) > 0.001:  # Minimum position size threshold
                self.set_holdings(symbol, total_size)
                self.debug(f"Setting {symbol.value} position to {total_size}")
        
    def on_warmup_finished(self):
        pass