Overall Statistics
Total Orders
5148
Average Win
0.07%
Average Loss
-0.07%
Compounding Annual Return
-2.838%
Drawdown
20.300%
Expectancy
-0.074
Start Equity
1000000
End Equity
910632.05
Net Profit
-8.937%
Sharpe Ratio
-0.531
Sortino Ratio
-0.525
Probabilistic Sharpe Ratio
0.437%
Loss Rate
52%
Win Rate
48%
Profit-Loss Ratio
0.95
Alpha
-0.041
Beta
-0.035
Annual Standard Deviation
0.081
Annual Variance
0.007
Information Ratio
-0.665
Tracking Error
0.168
Treynor Ratio
1.22
Total Fees
$6428.98
Estimated Strategy Capacity
$29000000.00
Lowest Capacity Asset
NET X7TG3O4R7O11
Portfolio Turnover
5.20%
# region imports
from AlgorithmImports import *
from scipy.stats import linregress
import numpy as np
# endregion

class SimpleDynamicMomentumAlgorithm(QCAlgorithm):
    def Initialize(self):
        self.SetStartDate(2021, 1, 1)  # Set start date
        self.SetEndDate(datetime.now())  # Set end date
        self.SetCash(1000000)  # Set initial capital

        self.lookback = 90 # Lookback period for momentum calculation (e.g., 3 months)
        self.rebalance_period = 30  # Rebalance period (e.g., monthly)
        self.next_rebalance = self.Time + timedelta(days=self.rebalance_period)

        self.stop_loss_percentage = 0.07

        self.entry_prices = {}  # Store the entry prices for positions
        self.highest_prices = {}  # Store the highest price reached by a stock for trailing stop loss

        # Market index to gauge overall market conditions
        self.market = self.AddEquity("SPY", Resolution.Daily).Symbol

        # Moving averages for market condition
        self.short_sma = self.SMA(self.market, 50, Resolution.Daily)
        self.long_sma = self.SMA(self.market, 200, Resolution.Daily)

        self.AddUniverse(self.CoarseSelectionFunction, self.FineSelectionFunction)

        self.Schedule.On(self.DateRules.EveryDay(), self.TimeRules.At(10, 0), self.Rebalance)
        self.last_month = -1

    def CoarseSelectionFunction(self, coarse):
        if self.Time.month == self.last_month:
            return Universe.Unchanged

        self.last_month = self.Time.month
        filtered = [x.Symbol for x in coarse if x.HasFundamentalData and x.Price > 10]
        return filtered

    def FineSelectionFunction(self, fine):
        tech_sector_code = 311  # GICS code for the technology sector
        min_market_cap = 1e10  # Minimum market cap for large-cap stocks
        min_volume = 1e6  # Minimum average daily volume

        filtered = [x for x in fine if x.AssetClassification.MorningstarSectorCode == tech_sector_code 
                    and x.MarketCap >= min_market_cap 
                    and x.Volume > min_volume]
        sorted_by_market_cap = sorted(filtered, key=lambda x: x.MarketCap, reverse=True)[:500]
        self.symbols = [x.Symbol for x in sorted_by_market_cap]
        return self.symbols

    def OnData(self, data):
        self.UpdateTrailingStopLoss(data)

    def UpdateTrailingStopLoss(self, data):
        for symbol in list(self.entry_prices.keys()):
            if symbol in data and data[symbol] is not None:
                current_price = data[symbol].Price
                # Update the highest price reached
                if symbol not in self.highest_prices:
                    self.highest_prices[symbol] = current_price
                else:
                    self.highest_prices[symbol] = max(self.highest_prices[symbol], current_price)

                # Calculate trailing stop price
                trailing_stop_price = self.highest_prices[symbol] * (1 - self.stop_loss_percentage)

                # Check if the current price is below the stop price
                if current_price < trailing_stop_price:
                    self.Liquidate(symbol)
                    self.Debug(f"Trailing stop-loss triggered for {symbol.Value} at {current_price}")
                    del self.entry_prices[symbol]
                    del self.highest_prices[symbol]

    # Calculate momentum using annualized exponential regression slope
    def calculate_momentum(self, history):
        log_prices = np.log(history['close'])
        days = np.arange(len(log_prices))
        slope, _, _, _, _ = linregress(days, log_prices)
        annualized_slope = slope * 252  # Assuming 252 trading days in a year
        return annualized_slope

    # Calculate historical volatility (annualized standard deviation of daily returns)
    def calculate_volatility(self, history):
        daily_returns = history['close'].pct_change().dropna()
        annualized_volatility = daily_returns.std() * np.sqrt(252)  # Assuming 252 trading days in a year
        return annualized_volatility

    def Rebalance(self):
        if self.Time < self.next_rebalance:
            return

        if self.short_sma.Current.Value > self.long_sma.Current.Value:
            long_weight = 0.8
        else:
            long_weight = 0.2
        short_weight = 1 - long_weight

        momentum = {}
        volatility = {}
        for symbol in self.symbols:
            history = self.History(symbol, self.lookback, Resolution.Daily)
            if not history.empty:
                momentum[symbol] = self.calculate_momentum(history)
                volatility[symbol] = self.calculate_volatility(history)

        sorted_symbols = sorted(momentum.items(), key=lambda x: x[1], reverse=True)
    
        num_long = int(len(sorted_symbols) * long_weight)
        num_short = int(len(sorted_symbols) * short_weight)

        long_symbols = [symbol for symbol, mom in sorted_symbols[:num_long]]
        short_symbols = [symbol for symbol, mom in sorted_symbols[-num_short:]]

        # Calculate inverse volatility weights
        long_volatility_sum = sum(1/volatility[symbol] for symbol in long_symbols)
        short_volatility_sum = sum(1/volatility[symbol] for symbol in short_symbols)

        for symbol in self.symbols:
            if symbol in long_symbols:
                weight = (1 / volatility[symbol]) / long_volatility_sum * long_weight
                self.SetHoldings(symbol, weight)
                self.entry_prices[symbol] = self.Securities[symbol].Price * (1 - self.stop_loss_percentage)
            elif symbol in short_symbols:
                weight = (1 / volatility[symbol]) / short_volatility_sum * short_weight
                self.SetHoldings(symbol, -weight)
                self.entry_prices[symbol] = self.Securities[symbol].Price * (1 + self.stop_loss_percentage)
            else:
                self.Liquidate(symbol)
                if symbol in self.entry_prices:
                    del self.entry_prices[symbol]

        self.next_rebalance = self.Time + timedelta(days=self.rebalance_period)

    def OnEndOfAlgorithm(self):
        self.Debug("Algorithm finished running.")