Overall Statistics
Total Orders
124
Average Win
1.17%
Average Loss
-0.52%
Compounding Annual Return
15.071%
Drawdown
35.900%
Expectancy
1.105
Start Equity
10000
End Equity
34316.11
Net Profit
243.161%
Sharpe Ratio
0.514
Sortino Ratio
0.536
Probabilistic Sharpe Ratio
7.739%
Loss Rate
35%
Win Rate
65%
Profit-Loss Ratio
2.25
Alpha
0.008
Beta
1.05
Annual Standard Deviation
0.195
Annual Variance
0.038
Information Ratio
0.11
Tracking Error
0.117
Treynor Ratio
0.096
Total Fees
$113.00
Estimated Strategy Capacity
$1600000.00
Lowest Capacity Asset
CCNE RF2OA4CJ9N51
Portfolio Turnover
0.07%
from AlgorithmImports import *

class FactorInvestingStrategy(QCAlgorithm):
    def Initialize(self):
        # Set start and end dates
        self.SetStartDate(2016, 1, 1)  # Adjusted start date to ensure data availability
        self.SetEndDate(2024, 10, 10)
        self.SetCash(10000)  # Starting cash

        # Set resolution and universe
        self.UniverseSettings.Resolution = Resolution.Daily
        self.AddUniverse(self.CoarseSelectionFunction)
        self.lastRebalanceTime = datetime.min
        self.rebalanceInterval = timedelta(weeks=4)  # Rebalance every 4 weeks

        # Dictionaries to store data for trailing stops
        self.highestPrices = {}
        self.stopOrderTickets = {}

        # List to store selected symbols
        self.long_symbols = []

        # Warm up the algorithm to gather historical data
        self.SetWarmUp(timedelta(days=365))  # Warm up for 1 year

        # Flag to indicate warm-up completion
        self.warmupComplete = False

    def CoarseSelectionFunction(self, coarse):
        if (self.Time - self.lastRebalanceTime) < self.rebalanceInterval:
            return Universe.Unchanged

        # Filter for stocks with fundamental data, price > $5, and sufficient volume
        filtered = [c.Symbol for c in coarse if c.HasFundamentalData and c.Price > 5 and c.DollarVolume > 1e6]
        return filtered

    def OnSecuritiesChanged(self, changes):
        if self.IsWarmingUp or not self.warmupComplete:
            return  # Skip processing during warm-up or before initial rebalancing

        self.lastRebalanceTime = self.Time
        self.ScheduleRebalance()

    def ScheduleRebalance(self):
        self.SelectSymbols()
        self.SetHoldingsForSymbols()

    def SelectSymbols(self):
        value_scores = {}
        momentum_scores = {}
        quality_scores = {}

        for security in self.ActiveSecurities.Values:
            # Ensure security has data before processing
            if not security.HasData:
                continue

            # Calculate individual factor scores
            value_score = self.CalculateValueScore(security)
            momentum_score = self.CalculateMomentumScore(security)
            quality_score = self.CalculateQualityScore(security)

            symbol = security.Symbol

            if value_score is not None:
                value_scores[symbol] = value_score
            if momentum_score is not None:
                momentum_scores[symbol] = momentum_score
            if quality_score is not None:
                quality_scores[symbol] = quality_score

        # Standardize scores
        value_scores_std = self.StandardizeScores(value_scores)
        momentum_scores_std = self.StandardizeScores(momentum_scores)
        quality_scores_std = self.StandardizeScores(quality_scores)

        # Combine standardized scores
        combined_scores = {}
        for symbol in value_scores_std.keys():
            if symbol in momentum_scores_std and symbol in quality_scores_std:
                combined_score = (
                    1/3 * value_scores_std[symbol] +
                    1/3 * momentum_scores_std[symbol] +
                    1/3 * quality_scores_std[symbol]
                )
                combined_scores[symbol] = combined_score

        # Rank stocks based on combined score
        ranked_symbols = [symbol for symbol, score in
                          sorted(combined_scores.items(), key=lambda x: x[1], reverse=True)]

        # Select top N stocks
        self.long_symbols = ranked_symbols[:50]

    def CalculateValueScore(self, security):
        pe_ratio = security.Fundamentals.ValuationRatios.PERatio
        if pe_ratio and pe_ratio > 0:
            return 1 / pe_ratio  # Earnings yield
        else:
            return None

    def CalculateMomentumScore(self, security):
        symbol = security.Symbol
        lookback_period = 252
        history = self.History(symbol, lookback_period, Resolution.Daily)
        min_history_length = 60  # Minimum required days
        if history.empty or len(history['close']) < min_history_length:
            return None
        price_start = history['close'][0]
        current_price = history['close'][-1]
        return (current_price / price_start) - 1

    def CalculateQualityScore(self, security):
        fundamentals = security.Fundamentals

        # Return on Equity (ROE)
        roe = fundamentals.OperationRatios.ROE.Value
        if roe is None:
            return None

        # Debt-to-Equity Ratio
        debt_to_equity = fundamentals.OperationRatios.TotalDebtEquityRatio.Value
        if debt_to_equity is None or debt_to_equity <= 0:
            return None

        # For Quality, higher ROE and lower Debt-to-Equity is better
        # We'll use the ratio of ROE to Debt-to-Equity
        return roe / debt_to_equity

    def StandardizeScores(self, scores_dict):
        # Remove None values
        valid_scores = {k: v for k, v in scores_dict.items() if v is not None}
        if not valid_scores:
            return {}

        mean = sum(valid_scores.values()) / len(valid_scores)
        variance = sum((v - mean) ** 2 for v in valid_scores.values()) / len(valid_scores)
        std_dev = variance ** 0.5 if variance > 0 else 1

        standardized_scores = {k: (v - mean) / std_dev for k, v in valid_scores.items()}
        return standardized_scores

    def SetHoldingsForSymbols(self):
        if self.IsWarmingUp:
            return  # Skip order operations during warm-up

        if not self.long_symbols:
            self.Debug("No symbols selected for investment at this time.")
            return  # Avoid division by zero

        # Liquidate positions not in the new selection
        for symbol in list(self.Portfolio.Keys):
            if symbol not in self.long_symbols:
                self.Liquidate(symbol)
                self.highestPrices.pop(symbol, None)
                stop_ticket = self.stopOrderTickets.pop(symbol, None)
                if stop_ticket:
                    self.Transactions.CancelOrder(stop_ticket.OrderId)

        # Set holdings for selected symbols
        weight = 1 / len(self.long_symbols)
        for symbol in self.long_symbols:
            if not self.Portfolio[symbol].Invested:
                self.SetHoldings(symbol, weight)
                # Initialize highest price for trailing stop
                self.highestPrices[symbol] = self.Securities[symbol].Price
                # Place initial stop order
                quantity = self.Portfolio[symbol].Quantity
                stop_price = self.Securities[symbol].Price * 0.95  # 5% below current price
                stop_ticket = self.StopMarketOrder(symbol, -quantity, stop_price)
                self.stopOrderTickets[symbol] = stop_ticket

    def OnData(self, data):
        if self.IsWarmingUp:
            return  # Skip processing during warm-up

        if not self.warmupComplete:
            self.Debug("Warm-up finished. Performing initial symbol selection and rebalancing.")
            self.warmupComplete = True
            self.ScheduleRebalance()
            return

        # Update highest prices and adjust stop orders
        for symbol in list(self.Portfolio.Keys):
            if symbol in data and data[symbol]:
                price = data[symbol].Close
                if symbol in self.highestPrices:
                    if price > self.highestPrices[symbol]:
                        self.highestPrices[symbol] = price
                        # Update stop price
                        new_stop_price = price * 0.95  # 5% below highest price
                        stop_ticket = self.stopOrderTickets.get(symbol, None)
                        if stop_ticket and stop_ticket.Status == OrderStatus.Submitted:
                            update_fields = UpdateOrderFields()
                            update_fields.StopPrice = new_stop_price
                            self.Transactions.UpdateOrder(stop_ticket.OrderId, update_fields)
                else:
                    # Initialize highest price if not set
                    self.highestPrices[symbol] = price

    def OnOrderEvent(self, orderEvent):
        if orderEvent.Status == OrderStatus.Filled:
            symbol = orderEvent.Symbol
            # If a stop order was filled, remove from tracking
            if orderEvent.Direction == OrderDirection.Sell:
                self.highestPrices.pop(symbol, None)
                self.stopOrderTickets.pop(symbol, None)
                if symbol in self.long_symbols:
                    self.long_symbols.remove(symbol)