Overall Statistics
Total Orders
621
Average Win
0.78%
Average Loss
-0.50%
Compounding Annual Return
90.165%
Drawdown
8.000%
Expectancy
0.447
Start Equity
100000
End Equity
190053.32
Net Profit
90.053%
Sharpe Ratio
3.118
Sortino Ratio
4.314
Probabilistic Sharpe Ratio
96.682%
Loss Rate
44%
Win Rate
56%
Profit-Loss Ratio
1.56
Alpha
0.478
Beta
0.544
Annual Standard Deviation
0.188
Annual Variance
0.035
Information Ratio
2.106
Tracking Error
0.185
Treynor Ratio
1.075
Total Fees
$3640.37
Estimated Strategy Capacity
$0
Lowest Capacity Asset
BNGO WY1XIAJHWBQD
Portfolio Turnover
12.22%
from AlgorithmImports import *
import json
import math
import pandas as pd
from datetime import timedelta
from io import StringIO

class Form4TradingAlgorithm(QCAlgorithm):
    def Initialize(self):
        year = 2021
        self.SetStartDate(2021, 1, 1)  # Start backtesting period
        self.SetEndDate(2021, 12, 31)  # End backtesting period
        starting_capital = 100000
        self.SetCash(starting_capital)  # Initial capital for each year
        self.maxSimultaneousOpenPositions = 10
        self.maxInitialCapitalPerPosition = starting_capital / self.maxSimultaneousOpenPositions
        self.leverage = 1 # was 3: great results
        
        # self.transactionCostRate = 0.001  # 0.1% transaction cost
        self.set_brokerage_model(BrokerageName.INTERACTIVE_BROKERS_BROKERAGE, AccountType.Margin)
        self.stopLossThreshold = -0.15  # -15% stop loss

        # self.backtestYears = range(2012, 2024)

        self.openPositions = []  # Track open positions
        # self.ResetCapital()

        self.allInsiderTrades = self.LoadAllInsiderTrades()

        self.AddEquity("SPY", Resolution.Daily)
        self.SetBenchmark("SPY")

        # Add ticker to universe to receive pricing data
        for _, insiderTrade in self.allInsiderTrades.iterrows():
            ticker = insiderTrade['ticker']
            self.AddEquity(ticker, Resolution.Daily)


        self.Schedule.On(self.DateRules.EveryDay("SPY"), # Align with the trading calendar of SPY, excluding weekends and holidays
                        # SPY: Symbol of the security whose market open time is being used as a reference
                        # 0: This specifies how many minutes after the market opens the function should be triggered.
                        self.TimeRules.AfterMarketOpen("SPY", 1), 
                        self.OnMarketOpen)


    # def OnData(self, data):
    def OnMarketOpen(self):
        self.Debug(f"Market open logic executed at: {self.Time}")
        # Check open positions
        self.CheckOpenPositions()

        # Get filings for the previous trading day
        todaysInsiderTrades = self.GetTodaysInsiderTrades()

        if len(todaysInsiderTrades) == 0:
            return

        # Calculate available slots and capital allocation
        availableSlots = self.maxSimultaneousOpenPositions - len(self.openPositions)

        if availableSlots <= 0:
            return

        availableCash = self.Portfolio.Cash
        availableBuyingPower = availableCash + (self.Portfolio.TotalPortfolioValue - availableCash) * (self.leverage - 1)
        positionSize = availableBuyingPower / availableSlots
        positionSize = min(positionSize, self.maxInitialCapitalPerPosition)

        # for insiderTrade in todaysInsiderTrades[:availableSlots]:
        for _, insiderTrade in todaysInsiderTrades[:availableSlots].iterrows():
            # Fetch current stock price
            ticker = insiderTrade['ticker']
            openPrice = self.Securities[ticker].Open

            if openPrice is None or openPrice <= 0:
                continue

            numShares = math.floor(positionSize / openPrice)
            
            if numShares > 0:
                # self.Debug(f"Buying {ticker} at {self.Time} for ${openPrice} ({numShares} shares)")
                # self.SetHoldings(symbol, numShares)
                # self.MarketOnOpenOrder(ticker, numShares)
                self.MarketOrder(ticker, numShares)

                self.openPositions.append({
                    "Symbol": ticker,
                    "EntryDate": self.Time,
                    "EntryPrice": openPrice,
                    "StopLossPrice": openPrice * (1 + self.stopLossThreshold)
                })


    # def OnMarketOpen(self):
    #     # Logic that you want to execute at market open
    #     self.Debug(f"Market open logic executed at: {self.Time}")
    #     self.CheckOpenPositions()


    def CheckOpenPositions(self):
        for position in self.openPositions[:]:
            symbol = position["Symbol"]
            stopLossPrice = position["StopLossPrice"]
            entryDate = position["EntryDate"]
            numShares = position.get("NumShares")

            currentPrice = self.Securities[symbol].Open

            if currentPrice is None:
                continue

            # calender days    
            # daysHeld = (self.Time - entryDate).days
            # business days
            daysHeld = len(pd.date_range(start=entryDate, end=self.Time, freq='B')) - 1
            # daysHeld = sum(1 for day in range((self.Time - entryDate).days + 1) if self.Securities[symbol].Exchange.Hours.IsOpen(entryDate + timedelta(days=day)))
            # daysHeld = 0
            # self.Debug(f"ticker: {symbol}, entryDate: {entryDate}") 
            # currentDate = entryDate
            # while currentDate < self.Time:
            #     self.Debug(f"ticker: {symbol}, currentDate: {currentDate}")
            #     try:
            #         if self.Securities[symbol].Exchange.Hours.IsOpen(currentDate):
            #             daysHeld += 1
            #     except Exception as e:
            #         tmp = 1
            #     currentDate += timedelta(days=1)

            if currentPrice <= stopLossPrice or daysHeld >= 5:
                self.Liquidate(symbol)
                # proceeds = numShares * currentPrice * (1 - self.transactionCostRate)
                # self.yearlyCapital += proceeds
                self.openPositions.remove(position)


    def GetTodaysInsiderTrades(self):
        today = self.Time.date()
        return self.allInsiderTrades[self.allInsiderTrades['date'] == pd.Timestamp(today)]


    def GetForm4Filings(self):
        # try:
        #     data = self.ObjectStore.ReadBytes(self.objectStoreKey)
        #     if data:
        #         filings = json.loads(data)
        #         # Filter filings for the previous trading day
        #         return [f for f in filings if f['Date'] == (self.Time - timedelta(days=1)).strftime('%Y-%m-%d')]
        # except Exception as e:
        #     self.Debug(f"Error reading filings: {e}")
        # return []
         # Load signal data from Object Store
        if self.ObjectStore.ContainsKey("StrategyA-2012-2023.csv"):
            try:
                data = self.ObjectStore.ReadBytes(self.objectStoreKey)
                if data:
                    filings = json.loads(data)
                    # csv_file = self.ObjectStore.Read("StrategyA-2012-2023.csv")
                    return [f for f in filings if f['Date'] == (self.Time - timedelta(days=1)).strftime('%Y-%m-%d')]
                # df = pd.read_csv(StringIO(csv_data), usecols=['index', 'issuerTicker'], sep=",")
                # df.rename(columns={'index': 'date', 'issuerTicker': 'ticker'}, inplace=True)
                # df['date'] = pd.to_datetime(df['date'], format='%Y-%m-%d')
                # df.sort_values(by='date', inplace=True)
                # return df
            except Exception as e:
                self.Debug(f"Error reading CSV: {e}")
                return []

    def LoadAllInsiderTrades(self):
        try:
            # data = self.ObjectStore.ReadBytes(self.objectStoreKey)
            # if data:
                # filings = json.loads(data)
                # return [f for f in filings if f['Date'] == (self.Time - timedelta(days=1)).strftime('%Y-%m-%d')]
            csv_data = self.ObjectStore.Read("StrategyA-2012-2023.csv")
            df = pd.read_csv(StringIO(csv_data), usecols=['index', 'issuerTicker'], sep=",")
            df.rename(columns={'index': 'date', 'issuerTicker': 'ticker'}, inplace=True)
            df['date'] = pd.to_datetime(df['date'], format='%Y-%m-%d')
            df.sort_values(by='date', inplace=True)
            return df
        except Exception as e:
            self.Debug(f"Error reading CSV: {e}")
            return

    def ParseCsv(self, csv_data):
        try:
            # Parse CSV
            df = pd.read_csv(StringIO(csv_data), usecols=['index', 'issuerTicker'], sep=",")
            df.rename(columns={'index': 'date', 'issuerTicker': 'ticker'}, inplace=True)
            df['date'] = pd.to_datetime(df['date'], format='%Y-%m-%d')
            df.sort_values(by='date', inplace=True)
            return df
        except Exception as e:
            self.Debug(f"CSV Parsing Error: {e}")
            return pd.DataFrame()

    def CalculateMaxSimultaneousOpenPositions(self):
        # Example: Calculate average filings over a 5-day period
        filings = self.GetForm4Filings()
        if not filings:
            return 1  # Default to 1 if no data
        
        return max(1, math.ceil(len(filings) / 5))

    def GetStockSymbol(self, symbol):
        try:
            return Symbol.Create(symbol, SecurityType.Equity, Market.USA)
        except Exception as e:
            self.Debug(f"Error getting symbol: {e}")
            return None

    # def GetStockPrice(self, symbol):
    #     # Get the stock's opening price
    #     if self.Securities.ContainsKey(symbol):
    #         return self.Securities[symbol].Price
    #     return None