from datetime import timedelta
import numpy as np
from AlgorithmImports import *
class MyFrameworkAlgorithm(QCAlgorithm):
def Initialize(self):
self.SetStartDate(2024, 1, 1)
self.SetEndDate(datetime.now())
self.SetCash(100000)
self.spy = self.AddEquity("SPY", Resolution.Minute).Symbol
self.lookback = timedelta(minutes=5)
self.position_count = 0
self.positions = []
self.halt_trades = True
self.initial_price = None
self.stop_loss_percent = 0.03 # Set a stop-loss at 3%
self.Settings.MinimumOrderMarginPortfolioValue = 0
self.SetWarmUp(self.lookback)
# Invest 40% of the initial capital in SPY to hold
self.Schedule.On(
self.DateRules.EveryDay(),
self.TimeRules.AfterMarketOpen(self.spy, 1), # Ensure the market is open before placing trades
self.InitialInvestment
)
self.Schedule.On(
self.DateRules.EveryDay(),
self.TimeRules.At(10, 0), # Wait for 30 minutes after market open
self.CheckInitialSlope
)
self.Schedule.On(
self.DateRules.EveryDay(),
self.TimeRules.Every(timedelta(minutes=5)),
self.EvaluateMarket
)
self.Schedule.On(
self.DateRules.EveryDay(),
self.TimeRules.At(12, 15), # Sell all positions by 12:15 PM
self.SellAllPositions
)
def InitialInvestment(self):
if self.IsWarmingUp:
return
if not self.Portfolio[self.spy].Invested:
self.SetHoldings(self.spy, 0.4)
self.Log("Invested 40% of initial capital in SPY to hold.")
def CheckInitialSlope(self):
history = self.History(self.spy, self.lookback, Resolution.Minute)
if history.empty or self.spy not in history.index or 'close' not in history.columns:
self.Log("History data is empty or 'close' column is missing.")
return
closes = history.loc[self.spy]["close"].values
if len(closes) < self.lookback.total_seconds() / 60:
self.Log("Not enough historical data.")
return
x = np.arange(len(closes))
m, _ = np.polyfit(x, closes, 1)
self.Log(f"Initial market slope evaluated: m = {m:.4f}")
if abs(m) <= np.tan(np.radians(30)): # Slope less than 30 degrees
self.halt_trades = False
self.initial_price = closes[-1]
self.Log("Shallow market slope detected at open. Ready to trade.")
else:
self.halt_trades = True
self.Log("Steep market slope detected at open. Halting trades.")
def EvaluateMarket(self):
if self.IsWarmingUp or self.halt_trades:
return
history = self.History(self.spy, self.lookback, Resolution.Minute)
if history.empty or self.spy not in history.index or 'close' not in history.columns:
self.Log("History data is empty or 'close' column is missing.")
return
closes = history.loc[self.spy]["close"].values
if len(closes) < self.lookback.total_seconds() / 60:
self.Log("Not enough historical data.")
return
current_price = closes[-1]
self.Log(f"Current market price evaluated at {current_price}")
allocation = 0.6 * self.Portfolio.TotalPortfolioValue / current_price # Use 60% of the capital for trading
quantity = int(allocation)
if not self.Portfolio.Invested:
if quantity > 0 and self.CanPlaceOrder(quantity, current_price):
self.MarketOrder(self.spy, quantity)
self.positions.append(current_price)
self.position_count += 1
self.Log(f"Entering market with initial allocation in SPY at {current_price}.")
self.SetStopLoss(self.spy, self.stop_loss_percent)
elif len(self.positions) > 0 and self.position_count < 3:
if current_price > self.positions[-1]:
if quantity > 0 and self.CanPlaceOrder(quantity, current_price):
self.MarketOrder(self.spy, quantity)
self.positions.append(current_price)
self.position_count += 1
self.Log(f"Adding to position in SPY at {current_price}.")
self.SetStopLoss(self.spy, self.stop_loss_percent)
elif current_price < self.initial_price:
self.halt_trades = True
self.Log(f"Holding position due to price dip below initial buy at {current_price}.")
elif len(self.positions) == 3:
if current_price > self.positions[-1]:
if quantity > 0 and self.CanPlaceOrder(quantity, current_price):
self.MarketOrder(self.spy, quantity)
sell_quantity = self.Portfolio[self.spy].Quantity / len(self.positions)
if sell_quantity > 0:
self.MarketOrder(self.spy, -sell_quantity) # Sell the oldest position
self.positions.pop(0)
self.positions.append(current_price)
self.Log(f"Adding to position in SPY at {current_price} and selling the oldest position.")
self.SetStopLoss(self.spy, self.stop_loss_percent)
elif current_price < self.initial_price:
self.halt_trades = True
self.Log(f"Holding position due to price dip below initial buy at {current_price}.")
def CanPlaceOrder(self, quantity, price):
cost = quantity * price
self.Log(f"Checking if we can place order for {quantity} shares at {price}. Cost: {cost}, Cash: {self.Portfolio.Cash}")
if self.Portfolio.Cash >= cost:
return True
self.Log(f"Insufficient buying power to place order for {quantity} shares at {price}.")
return False
def SetStopLoss(self, symbol, stop_loss_percent):
quantity = self.Portfolio[symbol].Quantity
if quantity > 0:
stop_price = self.Portfolio[symbol].AveragePrice * (1 - stop_loss_percent)
self.StopMarketOrder(symbol, -quantity, stop_price)
def SellAllPositions(self):
if self.Portfolio.Invested:
history = self.History(self.spy, self.lookback, Resolution.Minute)
if history.empty or self.spy not in history.index or 'close' not in history.columns:
self.Log("History data is empty or 'close' column is missing. Liquidating positions.")
self.Liquidate(self.spy)
self.positions = []
self.position_count = 0
self.Log("Selling all positions at 12:15 PM.")
return
closes = history.loc[self.spy]["close"].values
if len(closes) < 2: # Ensure there are enough data points for polyfit
self.Log("Not enough data points to evaluate slope for momentum check. Liquidating positions.")
self.Liquidate(self.spy)
self.positions = []
self.position_count = 0
self.Log("Selling all positions at 12:15 PM.")
return
m, _ = np.polyfit(np.arange(len(closes)), closes, 1)
if m > 0:
self.Log(f"Strong upward momentum detected. Holding positions past 12:15 PM.")
return
self.Liquidate(self.spy)
self.positions = []
self.position_count = 0
self.Log("Selling all positions at 12:15 PM.")