Overall Statistics |
Total Trades 43 Average Win 9.09% Average Loss -0.47% Compounding Annual Return -20.398% Drawdown 14.900% Expectancy -0.028 Net Profit -9.044% Sharpe Ratio -1.423 Probabilistic Sharpe Ratio 2.164% Loss Rate 95% Win Rate 5% Profit-Loss Ratio 19.41 Alpha -0.131 Beta -0.058 Annual Standard Deviation 0.1 Annual Variance 0.01 Information Ratio -2.027 Tracking Error 0.166 Treynor Ratio 2.449 Total Fees $369.80 Estimated Strategy Capacity $1300000000.00 Lowest Capacity Asset NQ Y9CDFY0C6TXD Portfolio Turnover 26.89% |
# --- # --- # Region Imports from AlgorithmImports import * # --- # --- # Alpha model class class alpha_1_month(AlphaModel): def __init__(self, algorithm, ndx): # --- # Initialize various variables self.level = None self.ndx = {} self.last_consolidation_time = None self.minute = 0 self.x = 0 self.ndx_close = RollingWindow[float](2) # Scheduler that calls for the end of the month to liquidate all positions algorithm.Schedule.On(algorithm.DateRules.MonthEnd(ndx), algorithm.TimeRules.BeforeMarketClose(ndx, 10), Action(lambda: self.LiquidatePositions(algorithm))) # --- # Called on every data point def Update(self, algorithm, data): # --- # Initialize some variables to store insights insights = [] insight = None # --- # Function that's handling consolidation. Checking if certain time period has passed and we can continue if self.last_consolidation_time is not None and self.minute != self.last_consolidation_time.minute: self.minute = self.last_consolidation_time.minute # Update the minute value # In case we need to use a rolling window, we update it with 15min Close price here if data.ContainsKey("NDX"): self.ndx_close.Add(data["NDX"].Close) # --- # Created the level and there is data for NDX index if self.level is not None and self.ndx is not None: # Plotting level and NDX on both (Main Chart and Breakout Price) charts for visual purposes algorithm.Plot("Breakout Price", "Level", self.level) algorithm.Plot("Main Chart", "Level", self.level) algorithm.Plot("Main Chart", "NDX", data["NDX"].Close) algorithm.Plot("Main Chart", "Futures", data["/NQ"].Close) # --- # Iterate through all the symbols that are in data (data is a dictionary) for symbol in data.Keys: # If symbol is "NDX" or "/NQ", skip the plotting if str(symbol.Value) == "NDX" or str(symbol.Value) == "/NQ": continue # Plot data for future contracts only algorithm.Plot("Breakout Price", str(symbol.Value) + " Price Close", data[symbol].Close) # First check if we have a future selected as well as check if level has been set if self.latest_contract is not None and self.level is not None: # We are using a rolling window for ndx_close. Thats if we need to access the [2] value. Otherwise can use data["NDX"].Close if self.ndx_close[1] < self.level and self.ndx_close[0] > self.level: insight = Insight(self.latest_contract.Symbol, timedelta(minutes=20), InsightType.Price, InsightDirection.Up) insight.SourceModel = "1 Month" insights.append(insight) algorithm.Plot("Main Chart", "Buy", data[symbol].Close) if self.ndx_close[1] > self.level and self.ndx_close[0] < self.level: insight = Insight(self.latest_contract.Symbol, timedelta(minutes=20), InsightType.Price, InsightDirection.Flat) insight.SourceModel = "1 Month" insights.append(insight) algorithm.Plot("Main Chart", "Sell", data[symbol].Close) return insights # --- # Setting the consolidation time at consolidater bars close # Updates every period, so self.minute is equal to last_cons... every period def consolidation_handler(self, sender, bar): self.last_consolidation_time = bar.Time # --- # Function that's called every time new securities have been added to our universe of securities def OnSecuritiesChanged(self, algorithm, changes): # --- # We want to create data consolidator for NDX Index only for security in changes.AddedSecurities: if security.Symbol == "NDX": self.consolidator = TradeBarConsolidator(timedelta(minutes=15)) self.consolidator.DataConsolidated += self.consolidation_handler algorithm.SubscriptionManager.AddConsolidator(security.Symbol, self.consolidator) self.ndx["NDX"] = security # Here I make self.ndx to be NDX index # --- # Those two schedules have to be here so that we can send NDX index to set level! It has to be the newest contract. algorithm.Schedule.On(algorithm.DateRules.MonthStart(security.Symbol), algorithm.TimeRules.At(0, 0), Action(lambda: self.SetLevel(algorithm, security.Symbol))) # --- # If its future, we add it to array and sort it so we have the latest selected added_contracts = [x for x in changes.AddedSecurities if x.Symbol.SecurityType == SecurityType.Future] if added_contracts: self.latest_contract = sorted(added_contracts, key=lambda x: x.Expiry, reverse=True)[0] # --- # This function sets our breakout level def SetLevel(self, algorithm, symbol): try: history = algorithm.History(symbol, 1, Resolution.Minute) # Get one row of the last history for the given symbol (NDX) except: history = None if history is not None and not history.empty and str(symbol) in history.index: self.level = history.loc[str(symbol)].open[0] # If it's not empty we set our level at the open price. else: self.level = None # --- # Should send an insight (not liquidate()) to liquidate or enter a flat position def LiquidatePositions(self, algorithm): # --- # Initialize some variables to store insights insights = [] insight = None # We send insight here to close all remaining positions at the end of the month insight = Insight(self.latest_contract.Symbol, timedelta(minutes=20), InsightType.Price, InsightDirection.Flat) insights.append(insight) return insights
# --- # Regional imports from alpha_1_month import * # Global imports from AlgorithmImports import * from datetime import datetime, timedelta import threading # --- # Main class class MyAlgorithm(QCAlgorithm): def Initialize(self): self.SetStartDate(2023, 1, 1) self.SetEndDate(2023, 6, 1) self.SetCash(1000000) self.ndx = self.AddIndex("NDX", Resolution.Minute).Symbol self.qqq = self.AddFuture(Futures.Indices.NASDAQ100EMini, Resolution.Minute) self.qqq.SetFilter(timedelta(0), timedelta(180)) # Track symbols in our universe self.universe_symbols = [self.ndx] # Set algorithm framework models self.SetUniverseSelection(ManualUniverseSelectionModel(self.universe_symbols)) # Add Alpha model self.AddAlpha(alpha_1_month(self, self.ndx)) self.InsightsGenerated += self.OnInsightsGenerated self.SetExecution(ImmediateExecutionModel()) # Create a new Chart named "Buy Points" buy_series = Chart("Main Chart") # Create a new Series named "Asset Name" buy_series.AddSeries(Series("Buy", SeriesType.Scatter, 0)) buy_series.AddSeries(Series("Sell", SeriesType.Scatter, 0)) buy_series.AddSeries(Series("Level", SeriesType.Line, 0)) buy_series.AddSeries(Series("Futures", SeriesType.Line, 0)) buy_series.AddSeries(Series("Take Profit", SeriesType.Scatter, 0)) # Add the Chart to the algorithm self.AddChart(buy_series) self.current_holdings = {} self.desired_exposure_per_alpha = 1000000 def OnInsightsGenerated(self, algorithm: IAlgorithm, insights_collection: GeneratedInsightsCollection): insights = insights_collection.Insights for insight in insights: symbol = insight.Symbol multiplier = self.Securities[symbol].SymbolProperties.ContractMultiplier price = self.Securities[symbol].Close # Calculate the closest quantity of future contracts quantity = int(self.desired_exposure_per_alpha / (price * multiplier)) #self.Debug(f"printing some stuff. Price -> {price}, Quantity -> {quantity}, Symbol -> {symbol}") # Update current holdings if insight.SourceModel not in self.current_holdings: self.current_holdings[insight.SourceModel] = 0 if insight.Direction == InsightDirection.Up: if insight.SourceModel in self.current_holdings and self.current_holdings[insight.SourceModel] == 0: orderTicket = self.MarketOrder(symbol, quantity, tag="1 Month") takeProfitPrice = price * 1.10 self.LimitOrder(symbol, -quantity, int(takeProfitPrice), "Take Profit") algorithm.Plot("Main Chart", "Take Profit", takeProfitPrice) else: pass if insight.Direction == InsightDirection.Flat: if insight.SourceModel in self.current_holdings and self.current_holdings[insight.SourceModel] == quantity: orderTicket = self.MarketOrder(symbol, -self.current_holdings[insight.SourceModel], tag="1 Month") else: pass def OnSecuritiesChanged(self, changes): # --- # Put all added futures contracts into an array added_contracts = [x for x in changes.AddedSecurities if x.Symbol.SecurityType == SecurityType.Future] if added_contracts: # if any were added, then we append them to our universe_symbols array which we input into universe model self.universe_symbols.append(sorted(added_contracts, key=lambda x: x.Expiry, reverse=True)[0]) # --- # Put all delisted futures into an array deleted_contracts = [x for x in changes.RemovedSecurities if x.Symbol.SecurityType == SecurityType.Future] if deleted_contracts: # if any were deleted, then we remove them from our universe_symbols array which we input into universe model self.universe_symbols.remove(deleted_contracts) # --- # This is the main function that's handling the data management in the current_holdings dictionary def OnOrderEvent(self, orderEvent): # Check if the order event is filled if orderEvent.Status != OrderStatus.Filled: return # Retrieve the order from the Transactions object order = self.Transactions.GetOrderById(orderEvent.OrderId) # Get the symbol from the filled order symbol = order.Symbol quantity = order.Quantity model = order.Tag # This is assuming your 'SourceModel' is set in the tag if model not in self.current_holdings: self.current_holdings[model] = 0 self.current_holdings[model] += quantity # If the order is a sell order, cancel any open limit orders for the same symbol if order.Direction == OrderDirection.Sell: openOrders = self.Transactions.GetOpenOrders(symbol) for openOrder in openOrders: if openOrder.Type == OrderType.Limit: self.Transactions.CancelOrder(openOrder.Id)