Overall Statistics |
Total Orders 8 Average Win 2.24% Average Loss -1.62% Compounding Annual Return 14.318% Drawdown 5.800% Expectancy 0.191 Start Equity 100000 End Equity 101086 Net Profit 1.086% Sharpe Ratio 1.693 Sortino Ratio 0 Probabilistic Sharpe Ratio 90.879% Loss Rate 50% Win Rate 50% Profit-Loss Ratio 1.38 Alpha 0.089 Beta 0.077 Annual Standard Deviation 0.033 Annual Variance 0.001 Information Ratio 3.855 Tracking Error 0.128 Treynor Ratio 0.715 Total Fees $39.00 Estimated Strategy Capacity $510000.00 Lowest Capacity Asset SPXW Y5ZD03V9LCLQ|SPX 31 Portfolio Turnover 0.26% |
from AlgorithmImports import * from collections import deque import config class AvgDown(): def __init__(self, algorithm, symbol, config): self.algo = algorithm self.symbol = symbol self.config = config self.pivot_queue = deque(maxlen=3) self.entered_condor = None self.entered_key = None self.avg_down_counter = 0 self.first_entry = None ####################################################################################################### self.avg_down_layers = self.calculate_avg_down_layers() def calculate_avg_down_layers(self): layers = {} total_contracts = 0 layer_index = 1 while total_contracts < self.config['max_contracts']: # Determine the quantity to buy for this layer quantity_to_buy = min(self.config["contracts_to_buy_per_signal"], self.config['max_contracts'] - total_contracts) layers[str(layer_index)] = [quantity_to_buy, False] # Update the total contracts bought so far total_contracts += quantity_to_buy if total_contracts >= self.config['max_contracts']: break # Move to the next layer layer_index += 1 return layers def append_pivot(self): if self.entered_condor is not None: self.entered_condor.calculate_premium() self.pivot_queue.appendleft(self.entered_condor.Premium) def check_avg_down(self, condor): quantity_to_enter = 0 premium_return = [] for premium, entry in self.avg_down_layers.items(): quantity, entered = entry if not entered: condor.entry_time_algo_2 = self.algo.Time quantity_to_enter += quantity premium_return.append(premium) self.avg_down_layers[premium][1] = True break if quantity_to_enter != 0: return True, quantity_to_enter, premium_return else: return False, 0, "" def check_percentage_avg_down(self): if not self.use_most_recent_entry: self.entered_condor.Calculate_Premium() if self.entered_condor.Premium >= self.first_entry * self.avg_down_percentage: return True else: return False else: self.entered_condor.Calculate_Premium() if self.entered_condor.Premium >= self.entered_condor.Entry_Premium * self.avg_down_percentage: return True else: return False def check_pivot_avg_down(self): if len(self.pivot_queue) == 3 and self.pivot_queue[0] < self.pivot_queue[1] > self.pivot_queue[2]: return True else: return False def reset(self): self.pivot_queue = deque(maxlen=3) self.entered_condor = None self.entered_key = None self.avg_down_counter = 0 self.first_entry = None self.avg_down_layers = self.calculate_avg_down_layers()
#region imports from AlgorithmImports import * USE_POWELL = False POWELL = [ #datetime(2021, 1, 14).strftime("%Y%-m%-d"), #Powell 12:30 # #datetime(2021, 2, 10).strftime("%Y%-m%-d"), #Powell 14:00 # datetime(2021, 2, 23).strftime("%Y%-m%-d"), #Powell 10:00 datetime(2021, 2, 24).strftime("%Y%-m%-d"), #Powell 10:00 #datetime(2021, 3, 4).strftime("%Y%-m%-d"), #Powell 12:05 # datetime(2021, 3, 22).strftime("%Y%-m%-d"), #Powell 08:00 datetime(2021, 3, 23).strftime("%Y%-m%-d"), #Powell 11:00 datetime(2021, 3, 24).strftime("%Y%-m%-d"), #Powell 09:00 datetime(2021, 4, 8).strftime("%Y%-m%-d"), #Powell 11:00 datetime(2021, 4, 14).strftime("%Y%-m%-d"), #Powell 11:00 #datetime(2021, 5, 3).strftime("%Y%-m%-d"), #Powell 13:20 # datetime(2021, 6, 4).strftime("%Y%-m%-d"), #Powell 06:00 #datetime(2021, 6, 22).strftime("%Y%-m%-d"), #Powell 13:00 # datetime(2021, 7, 14).strftime("%Y%-m%-d"), #Powell 11:00 datetime(2021, 7, 15).strftime("%Y%-m%-d"), #Powell 08:30 #datetime(2021, 8, 17).strftime("%Y%-m%-d"), #Powell 12:30 # datetime(2021, 8, 27).strftime("%Y%-m%-d"), #Powell 09:00 datetime(2021, 9, 24).strftime("%Y%-m%-d"), #Powell 09:00 datetime(2021, 9, 28).strftime("%Y%-m%-d"), #Powell 09:00 datetime(2021, 9, 29).strftime("%Y%-m%-d"), #Powell 09:45 datetime(2021, 10, 22).strftime("%Y%-m%-d"), #Powell 10:00 datetime(2021, 11, 8).strftime("%Y%-m%-d"), #Powell 10:30 datetime(2021, 11, 9).strftime("%Y%-m%-d"), #Powell 9:00 # datetime(2021, 11, 29).strftime("%Y%-m%-d"), #Powell 15:05 # datetime(2021, 11, 30).strftime("%Y%-m%-d"), #Powell 10:00 datetime(2021, 12, 1).strftime("%Y%-m%-d"), #Powell 10:00 datetime(2022, 1, 11).strftime("%Y%-m%-d"), #Powell 10:00 datetime(2022, 3, 2).strftime("%Y%-m%-d"), #Powell 10:00 datetime(2022, 3, 3).strftime("%Y%-m%-d"), #Powell 10:00 datetime(2022, 3, 21).strftime("%Y%-m%-d"), #Powell 11:00 datetime(2022, 3, 23).strftime("%Y%-m%-d"), #Powell 07:00 datetime(2022, 4, 21).strftime("%Y%-m%-d"), #Powell 10:00 # datetime(2022, 5, 17).strftime("%Y%-m%-d"), #Powell 13:00 # datetime(2022, 5, 24).strftime("%Y%-m%-d"), #Powell 11:20 datetime(2022, 6, 17).strftime("%Y%-m%-d"), #Powell 07:45 datetime(2022, 6, 22).strftime("%Y%-m%-d"), #Powell 08:30 datetime(2022, 6, 23).strftime("%Y%-m%-d"), #Powell 09:00 datetime(2022, 6, 29).strftime("%Y%-m%-d"), #Powell 08:00 datetime(2022, 8 ,26).strftime("%Y%-m%-d"), #Powell 09:00 datetime(2022, 9, 8).strftime("%Y%-m%-d"), #Powell 08:10 # datetime(2022, 9, 23).strftime("%Y%-m%-d"), #Powell 13:00 # datetime(2022, 9, 27).strftime("%Y%-m%-d"), #Powell 06:30 datetime(2022, 9, 28).strftime("%Y%-m%-d"), #Powell 09:15 #datetime(2022, 11, 30).strftime("%Y%-m%-d"), #Powell 13:30 # ] USE_POWELL_NEXTDAY = False POWELL_NEXTDAY = [ datetime(2021, 1, 15).strftime("%Y%-m%-d"), #DayAfterPowell datetime(2021, 2, 11).strftime("%Y%-m%-d"), #DAP datetime(2021, 3, 5).strftime("%Y%-m%-d"), #DAP datetime(2021, 3, 25).strftime("%Y%-m%-d"), #DAP datetime(2021, 4, 9).strftime("%Y%-m%-d"), #DAP datetime(2021, 4, 15).strftime("%Y%-m%-d"), #DAP datetime(2021, 5, 4).strftime("%Y%-m%-d"), #DAP datetime(2021, 6, 5).strftime("%Y%-m%-d"), #DAP datetime(2021, 6, 23).strftime("%Y%-m%-d"), #DAP datetime(2021, 7, 16).strftime("%Y%-m%-d"), #DAP datetime(2021, 9, 30).strftime("%Y%-m%-d"), #DAP datetime(2021, 11, 10).strftime("%Y%-m%-d"), #DAP datetime(2021, 12, 2).strftime("%Y%-m%-d"), #DAP datetime(2022, 1, 12).strftime("%Y%-m%-d"), #DAP datetime(2022, 3, 4).strftime("%Y%-m%-d"), #DAP datetime(2022, 3, 22).strftime("%Y%-m%-d"), #DAP datetime(2022, 3, 24).strftime("%Y%-m%-d"), #DAP datetime(2022, 4, 22).strftime("%Y%-m%-d"), #DAP datetime(2022, 5, 17).strftime("%Y%-m%-d"), #DAP datetime(2022, 5, 25).strftime("%Y%-m%-d"), #DAP datetime(2022, 6, 24).strftime("%Y%-m%-d"), #DAP datetime(2022, 8, 27).strftime("%Y%-m%-d"), #DAP datetime(2022, 9, 9).strftime("%Y%-m%-d"), #DAP datetime(2022, 9, 29).strftime("%Y%-m%-d"), #DAP datetime(2022, 12, 1).strftime("%Y%-m%-d"), #DAP ] USE_FED_INTEREST_RATE_DECISION = False FED_INTEREST_RATE_DECISION = [ datetime(2021, 1, 27).strftime("%Y%-m%-d"), datetime(2021, 3, 17).strftime("%Y%-m%-d"), datetime(2021, 4, 28).strftime("%Y%-m%-d"), datetime(2021, 6, 16).strftime("%Y%-m%-d"), datetime(2021, 7, 28).strftime("%Y%-m%-d"), datetime(2021, 9, 22).strftime("%Y%-m%-d"), datetime(2021, 11, 3).strftime("%Y%-m%-d"), datetime(2021, 12, 15).strftime("%Y%-m%-d"), datetime(2022, 1, 26).strftime("%Y%-m%-d"), datetime(2022, 3, 16).strftime("%Y%-m%-d"), datetime(2022, 5, 4).strftime("%Y%-m%-d"), datetime(2022, 6, 15).strftime("%Y%-m%-d"), datetime(2022, 7, 27).strftime("%Y%-m%-d"), datetime(2022, 9, 21).strftime("%Y%-m%-d"), datetime(2022, 11, 2).strftime("%Y%-m%-d"), datetime(2022, 12, 14).strftime("%Y%-m%-d"), ] USE_FED_INTEREST_RATE_DECISION_NEXTDAY = False FED_INTEREST_RATE_DECISION_NEXTDAY = [ datetime(2021, 1, 28).strftime("%Y%-m%-d"), datetime(2021, 3, 18).strftime("%Y%-m%-d"), datetime(2021, 4, 29).strftime("%Y%-m%-d"), datetime(2021, 6, 17).strftime("%Y%-m%-d"), datetime(2021, 7, 29).strftime("%Y%-m%-d"), datetime(2021, 9, 23).strftime("%Y%-m%-d"), datetime(2021, 11, 4).strftime("%Y%-m%-d"), datetime(2021, 12, 16).strftime("%Y%-m%-d"), datetime(2022, 1, 27).strftime("%Y%-m%-d"), datetime(2022, 3, 17).strftime("%Y%-m%-d"), datetime(2022, 5, 5).strftime("%Y%-m%-d"), datetime(2022, 6, 17).strftime("%Y%-m%-d"), datetime(2022, 7, 28).strftime("%Y%-m%-d"), datetime(2022, 9, 22).strftime("%Y%-m%-d"), datetime(2022, 11, 3).strftime("%Y%-m%-d"), datetime(2022, 12, 15).strftime("%Y%-m%-d"), ] USE_FOMC_MEETINGS_FIRSTDAY = False FOMC_MEETINGS_FIRSTDAY = [ datetime(2021, 1, 26).strftime("%Y%-m%-d"), datetime(2021, 3, 16).strftime("%Y%-m%-d"), datetime(2021, 4, 27).strftime("%Y%-m%-d"), datetime(2021, 6, 16).strftime("%Y%-m%-d"), datetime(2021, 7, 27).strftime("%Y%-m%-d"), datetime(2021, 9, 21).strftime("%Y%-m%-d"), datetime(2021, 11, 2).strftime("%Y%-m%-d"), datetime(2021, 12, 14).strftime("%Y%-m%-d"), datetime(2022, 1, 25).strftime("%Y%-m%-d"), datetime(2022, 3, 15).strftime("%Y%-m%-d"), datetime(2022, 5, 3).strftime("%Y%-m%-d"), datetime(2022, 6, 14).strftime("%Y%-m%-d"), datetime(2022, 7, 26).strftime("%Y%-m%-d"), datetime(2022, 9, 20).strftime("%Y%-m%-d"), datetime(2022, 11, 1).strftime("%Y%-m%-d"), datetime(2022, 12, 13).strftime("%Y%-m%-d"), ] USE_FOMC_MEETINGS = False FOMC_MEETINGS = [ datetime(2021, 1, 27).strftime("%Y%-m%-d"), datetime(2021, 3, 17).strftime("%Y%-m%-d"), datetime(2021, 4, 28).strftime("%Y%-m%-d"), datetime(2021, 6, 15).strftime("%Y%-m%-d"), datetime(2021, 7, 28).strftime("%Y%-m%-d"), datetime(2021, 9, 22).strftime("%Y%-m%-d"), datetime(2021, 11, 3).strftime("%Y%-m%-d"), datetime(2021, 12, 15).strftime("%Y%-m%-d"), datetime(2022, 1, 26).strftime("%Y%-m%-d"), datetime(2022, 3, 16).strftime("%Y%-m%-d"), datetime(2022, 5, 4).strftime("%Y%-m%-d"), datetime(2022, 6, 15).strftime("%Y%-m%-d"), datetime(2022, 7, 27).strftime("%Y%-m%-d"), datetime(2022, 9 ,21).strftime("%Y%-m%-d"), datetime(2022, 11, 2).strftime("%Y%-m%-d"), datetime(2022, 12, 14).strftime("%Y%-m%-d"), ] USE_FOMC_MINUTES = False FOMC_MINUTES = [ datetime(2021, 1 ,6).strftime("%Y%-m%-d"), datetime(2021, 2 ,17).strftime("%Y%-m%-d"), datetime(2021, 4 ,7).strftime("%Y%-m%-d"), datetime(2021, 5 ,19).strftime("%Y%-m%-d"), datetime(2021, 7 ,7).strftime("%Y%-m%-d"), datetime(2021, 8 ,18).strftime("%Y%-m%-d"), datetime(2021, 10 ,13).strftime("%Y%-m%-d"), datetime(2021, 11 ,24).strftime("%Y%-m%-d"), datetime(2022, 1 ,5).strftime("%Y%-m%-d"), datetime(2022, 2 ,16).strftime("%Y%-m%-d"), datetime(2022, 4, 6).strftime("%Y%-m%-d"), datetime(2022, 5, 25).strftime("%Y%-m%-d"), datetime(2022, 7, 6).strftime("%Y%-m%-d"), datetime(2022, 8, 17).strftime("%Y%-m%-d"), datetime(2022, 10, 12).strftime("%Y%-m%-d"), datetime(2022, 11, 23).strftime("%Y%-m%-d"), ] USE_CPI = False CPI = [ datetime(2021, 1, 13).strftime("%Y%-m%-d"), datetime(2021, 2, 10).strftime("%Y%-m%-d"), datetime(2021, 3, 10).strftime("%Y%-m%-d"), datetime(2021, 4, 13).strftime("%Y%-m%-d"), datetime(2021, 5, 12).strftime("%Y%-m%-d"), datetime(2021, 6, 10).strftime("%Y%-m%-d"), datetime(2021, 7, 13).strftime("%Y%-m%-d"), datetime(2021, 8, 11).strftime("%Y%-m%-d"), datetime(2021, 9, 14).strftime("%Y%-m%-d"), datetime(2021, 10, 13).strftime("%Y%-m%-d"), datetime(2021, 11, 10).strftime("%Y%-m%-d"), datetime(2021, 12, 10).strftime("%Y%-m%-d"), datetime(2022, 1, 12).strftime("%Y%-m%-d"), datetime(2022, 2, 10).strftime("%Y%-m%-d"), datetime(2022, 3, 10).strftime("%Y%-m%-d"), datetime(2022, 4, 12).strftime("%Y%-m%-d"), datetime(2022, 5, 11).strftime("%Y%-m%-d"), datetime(2022, 6, 10).strftime("%Y%-m%-d"), datetime(2022, 7, 13).strftime("%Y%-m%-d"), datetime(2022, 8, 10).strftime("%Y%-m%-d"), datetime(2022, 9, 13).strftime("%Y%-m%-d"), datetime(2022, 10, 13).strftime("%Y%-m%-d"), datetime(2022, 11, 10).strftime("%Y%-m%-d"), datetime(2022, 12, 13).strftime("%Y%-m%-d"), ] USE_CPI_NEXTDAY = False CPI_NEXTDAY = [ datetime(2021, 1, 14).strftime("%Y%-m%-d"), datetime(2021, 2, 11).strftime("%Y%-m%-d"), datetime(2021, 3, 11).strftime("%Y%-m%-d"), datetime(2021, 4, 14).strftime("%Y%-m%-d"), datetime(2021, 5, 13).strftime("%Y%-m%-d"), datetime(2021, 6, 11).strftime("%Y%-m%-d"), datetime(2021, 7, 14).strftime("%Y%-m%-d"), datetime(2021, 8, 12).strftime("%Y%-m%-d"), datetime(2021, 9, 15).strftime("%Y%-m%-d"), datetime(2021, 10, 14).strftime("%Y%-m%-d"), datetime(2021, 11, 11).strftime("%Y%-m%-d"), datetime(2021, 12, 11).strftime("%Y%-m%-d"), datetime(2022, 1, 13).strftime("%Y%-m%-d"), datetime(2022, 2, 11).strftime("%Y%-m%-d"), datetime(2022, 3, 11).strftime("%Y%-m%-d"), datetime(2022, 4, 13).strftime("%Y%-m%-d"), datetime(2022, 5, 12).strftime("%Y%-m%-d"), datetime(2022, 6, 11).strftime("%Y%-m%-d"), datetime(2022, 7, 14).strftime("%Y%-m%-d"), datetime(2022, 8, 11).strftime("%Y%-m%-d"), datetime(2022, 9, 14).strftime("%Y%-m%-d"), datetime(2022, 10, 14).strftime("%Y%-m%-d"), datetime(2022, 11, 11).strftime("%Y%-m%-d"), datetime(2022, 12, 14).strftime("%Y%-m%-d"), ] DISABLE_SPX_WRONG_DATES = False SPX_WRONG_DATES = [ datetime(2021, 3, 23).strftime("%Y%-m%-d"), datetime(2021, 3, 24).strftime("%Y%-m%-d"), datetime(2021, 5, 10).strftime("%Y%-m%-d"), datetime(2021, 5, 11).strftime("%Y%-m%-d"), datetime(2021, 8, 19).strftime("%Y%-m%-d"), datetime(2021, 8, 20).strftime("%Y%-m%-d"), datetime(2021, 12, 3).strftime("%Y%-m%-d"), datetime(2022, 2, 25).strftime("%Y%-m%-d"), datetime(2022, 3, 4).strftime("%Y%-m%-d"), datetime(2022, 5, 6).strftime("%Y%-m%-d"), datetime(2022, 9, 30).strftime("%Y%-m%-d"), datetime(2023, 1, 26).strftime("%Y%-m%-d"), ] DISABLE_SPX_POTENTIALLY_WRONG_DATES = False SPX_POTENTIALLY_WRONG_DATES = [ datetime(2019, 8, 12).strftime("%Y%-m%-d"), datetime(2019, 11, 29).strftime("%Y%-m%-d"), datetime(2021, 11, 26).strftime("%Y%-m%-d"), datetime(2022, 7, 19).strftime("%Y%-m%-d"), datetime(2022, 7, 27).strftime("%Y%-m%-d"), datetime(2022, 7, 28).strftime("%Y%-m%-d"), datetime(2022, 8, 22).strftime("%Y%-m%-d"), datetime(2022, 9, 2).strftime("%Y%-m%-d"), datetime(2022, 9, 6).strftime("%Y%-m%-d"), datetime(2022, 9, 22).strftime("%Y%-m%-d"), datetime(2022, 9, 23).strftime("%Y%-m%-d"), datetime(2022, 9, 26).strftime("%Y%-m%-d"), datetime(2022, 9, 27).strftime("%Y%-m%-d"), datetime(2022, 9, 28).strftime("%Y%-m%-d"), datetime(2022, 9, 29).strftime("%Y%-m%-d"), datetime(2022, 10, 14).strftime("%Y%-m%-d"), datetime(2022, 10, 18).strftime("%Y%-m%-d"), datetime(2022, 10, 25).strftime("%Y%-m%-d"), datetime(2022, 10, 27).strftime("%Y%-m%-d"), datetime(2022, 11, 10).strftime("%Y%-m%-d"), datetime(2022, 11, 17).strftime("%Y%-m%-d"), datetime(2022, 11, 18).strftime("%Y%-m%-d"), datetime(2022, 11, 30).strftime("%Y%-m%-d"), datetime(2022, 12, 15).strftime("%Y%-m%-d"), datetime(2022, 12, 16).strftime("%Y%-m%-d"), ]
from AlgorithmImports import * import calendar_info import config class CheckCalendar(): def __init__(self, algorithm, close_trading_window, open_window, close_window): self.algorithm = algorithm # POWELL self.algorithm.Schedule.On(self.algorithm.DateRules.On(2021, 1, 14), self.algorithm.TimeRules.At(12,0), close_trading_window) self.algorithm.Schedule.On(self.algorithm.DateRules.On(2021, 2, 10), self.algorithm.TimeRules.At(12,0), close_trading_window) self.algorithm.Schedule.On(self.algorithm.DateRules.On(2021, 3, 4), self.algorithm.TimeRules.At(12,0), close_trading_window) self.algorithm.Schedule.On(self.algorithm.DateRules.On(2021, 5, 3), self.algorithm.TimeRules.At(12,0), close_trading_window) self.algorithm.Schedule.On(self.algorithm.DateRules.On(2021, 6, 22), self.algorithm.TimeRules.At(12,0), close_trading_window) self.algorithm.Schedule.On(self.algorithm.DateRules.On(2021, 8, 17), self.algorithm.TimeRules.At(12,0), close_trading_window) self.algorithm.Schedule.On(self.algorithm.DateRules.On(2021, 11, 29), self.algorithm.TimeRules.At(12,0), close_trading_window) self.algorithm.Schedule.On(self.algorithm.DateRules.On(2022, 5, 17), self.algorithm.TimeRules.At(12,0), close_trading_window) self.algorithm.Schedule.On(self.algorithm.DateRules.On(2022, 9, 23), self.algorithm.TimeRules.At(12,0), close_trading_window) self.algorithm.Schedule.On(self.algorithm.DateRules.On(2022, 11, 30), self.algorithm.TimeRules.At(12,0), close_trading_window) # FED INTEREST RATE self.algorithm.Schedule.On(self.algorithm.DateRules.On(2021, 1, 27), self.algorithm.TimeRules.At(12,0), close_trading_window) self.algorithm.Schedule.On(self.algorithm.DateRules.On(2021, 3, 17), self.algorithm.TimeRules.At(12,0), close_trading_window) self.algorithm.Schedule.On(self.algorithm.DateRules.On(2021, 4, 28), self.algorithm.TimeRules.At(12,0), close_trading_window) self.algorithm.Schedule.On(self.algorithm.DateRules.On(2021, 6, 16), self.algorithm.TimeRules.At(12,0), close_trading_window) self.algorithm.Schedule.On(self.algorithm.DateRules.On(2021, 7, 28), self.algorithm.TimeRules.At(12,0), close_trading_window) self.algorithm.Schedule.On(self.algorithm.DateRules.On(2021, 9, 22), self.algorithm.TimeRules.At(12,0), close_trading_window) self.algorithm.Schedule.On(self.algorithm.DateRules.On(2021, 11, 3), self.algorithm.TimeRules.At(12,0), close_trading_window) self.algorithm.Schedule.On(self.algorithm.DateRules.On(2021, 12, 15), self.algorithm.TimeRules.At(12,0), close_trading_window) self.algorithm.Schedule.On(self.algorithm.DateRules.On(2022, 1, 26), self.algorithm.TimeRules.At(12,0), close_trading_window) self.algorithm.Schedule.On(self.algorithm.DateRules.On(2022, 3, 16), self.algorithm.TimeRules.At(12,0), close_trading_window) self.algorithm.Schedule.On(self.algorithm.DateRules.On(2022, 5, 4), self.algorithm.TimeRules.At(12,0), close_trading_window) self.algorithm.Schedule.On(self.algorithm.DateRules.On(2022, 6, 15), self.algorithm.TimeRules.At(12,0), close_trading_window) self.algorithm.Schedule.On(self.algorithm.DateRules.On(2022, 7, 27), self.algorithm.TimeRules.At(12,0), close_trading_window) self.algorithm.Schedule.On(self.algorithm.DateRules.On(2022, 9, 21), self.algorithm.TimeRules.At(12,0), close_trading_window) self.algorithm.Schedule.On(self.algorithm.DateRules.On(2022, 11, 2), self.algorithm.TimeRules.At(12,0), close_trading_window) self.algorithm.Schedule.On(self.algorithm.DateRules.On(2022, 12, 4), self.algorithm.TimeRules.At(12,0), close_trading_window) # FOMC MINUTES self.algorithm.Schedule.On(self.algorithm.DateRules.On(2021, 1, 6), self.algorithm.TimeRules.At(12,0), close_trading_window) self.algorithm.Schedule.On(self.algorithm.DateRules.On(2021, 2, 17), self.algorithm.TimeRules.At(12,0), close_trading_window) self.algorithm.Schedule.On(self.algorithm.DateRules.On(2021, 4, 7), self.algorithm.TimeRules.At(12,0), close_trading_window) self.algorithm.Schedule.On(self.algorithm.DateRules.On(2021, 5, 19), self.algorithm.TimeRules.At(12,0), close_trading_window) self.algorithm.Schedule.On(self.algorithm.DateRules.On(2021, 7, 7), self.algorithm.TimeRules.At(12,0), close_trading_window) self.algorithm.Schedule.On(self.algorithm.DateRules.On(2021, 8, 18), self.algorithm.TimeRules.At(12,0), close_trading_window) self.algorithm.Schedule.On(self.algorithm.DateRules.On(2021, 10, 13), self.algorithm.TimeRules.At(12,0), close_trading_window) self.algorithm.Schedule.On(self.algorithm.DateRules.On(2021, 11, 24), self.algorithm.TimeRules.At(12,0), close_trading_window) self.algorithm.Schedule.On(self.algorithm.DateRules.On(2022, 1, 5), self.algorithm.TimeRules.At(12,0), close_trading_window) self.algorithm.Schedule.On(self.algorithm.DateRules.On(2022, 2, 16), self.algorithm.TimeRules.At(12,0), close_trading_window) self.algorithm.Schedule.On(self.algorithm.DateRules.On(2022, 4, 6), self.algorithm.TimeRules.At(12,0), close_trading_window) self.algorithm.Schedule.On(self.algorithm.DateRules.On(2022, 5, 25), self.algorithm.TimeRules.At(12,0), close_trading_window) self.algorithm.Schedule.On(self.algorithm.DateRules.On(2022, 7, 6), self.algorithm.TimeRules.At(12,0), close_trading_window) self.algorithm.Schedule.On(self.algorithm.DateRules.On(2022, 8, 17), self.algorithm.TimeRules.At(12,0), close_trading_window) self.algorithm.Schedule.On(self.algorithm.DateRules.On(2022, 10, 12), self.algorithm.TimeRules.At(12,0), close_trading_window) self.algorithm.Schedule.On(self.algorithm.DateRules.On(2022, 11, 23), self.algorithm.TimeRules.At(12,0), close_trading_window) self.algorithm.Schedule.On(self.algorithm.DateRules.Every(DayOfWeek.Friday), self.algorithm.TimeRules.At(9, 31), open_window) self.algorithm.Schedule.On(self.algorithm.DateRules.Every(DayOfWeek.Friday), self.algorithm.TimeRules.At(15, 59), close_window) self.algorithm.Schedule.On(self.algorithm.DateRules.Every(DayOfWeek.Monday), self.algorithm.TimeRules.At(9, 31), open_window) self.algorithm.Schedule.On(self.algorithm.DateRules.Every(DayOfWeek.Monday), self.algorithm.TimeRules.At(15, 59), close_window) self.algorithm.Schedule.On(self.algorithm.DateRules.Every(DayOfWeek.Wednesday), self.algorithm.TimeRules.At(9, 31), open_window) self.algorithm.Schedule.On(self.algorithm.DateRules.Every(DayOfWeek.Wednesday), self.algorithm.TimeRules.At(15, 59), close_window) self.algorithm.Schedule.On(self.algorithm.DateRules.Every(DayOfWeek.Tuesday), self.algorithm.TimeRules.At(9, 31), open_window) self.algorithm.Schedule.On(self.algorithm.DateRules.Every(DayOfWeek.Tuesday), self.algorithm.TimeRules.At(15, 59), close_window) self.algorithm.Schedule.On(self.algorithm.DateRules.Every(DayOfWeek.Thursday), self.algorithm.TimeRules.At(9, 31), open_window) self.algorithm.Schedule.On(self.algorithm.DateRules.Every(DayOfWeek.Thursday), self.algorithm.TimeRules.At(15, 59), close_window) def check_calendar(self): if calendar_info.USE_FOMC_MINUTES: if self.algorithm.Time.strftime("%Y%-m%-d") in calendar_info.FOMC_MINUTES: return True if calendar_info.USE_FOMC_MEETINGS: if self.algorithm.Time.strftime("%Y%-m%-d") in calendar_info.FOMC_MEETINGS: return True if calendar_info.USE_CPI: if self.algorithm.Time.strftime("%Y%-m%-d") in calendar_info.CPI: return True if calendar_info.USE_CPI_NEXTDAY: if self.algorithm.Time.strftime("%Y%-m%-d") in calendar_info.CPI_NEXTDAY: return True if calendar_info.USE_POWELL: if self.algorithm.Time.strftime("%Y%-m%-d") in calendar_info.POWELL: return True if calendar_info.USE_POWELL_NEXTDAY: if self.algorithm.Time.strftime("%Y%-m%-d") in calendar_info.POWELL_NEXTDAY: return True if calendar_info.USE_FED_INTEREST_RATE_DECISION: if self.algorithm.Time.strftime("%Y%-m%-d") in calendar_info.FED_INTEREST_RATE_DECISION: return True if calendar_info.USE_FED_INTEREST_RATE_DECISION_NEXTDAY: if self.algorithm.Time.strftime("%Y%-m%-d") in calendar_info.FED_INTEREST_RATE_DECISION_NEXTDAY: return True if calendar_info.USE_FOMC_MEETINGS_FIRSTDAY: if self.algorithm.Time.strftime("%Y%-m%-d") in calendar_info.FOMC_MEETINGS_FIRSTDAY: return True if calendar_info.DISABLE_SPX_WRONG_DATES: if self.algorithm.Time.strftime("%Y%-m%-d") in calendar_info.SPX_WRONG_DATES: return True if calendar_info.DISABLE_SPX_POTENTIALLY_WRONG_DATES: if self.algorithm.Time.strftime("%Y%-m%-d") in calendar_info.SPX_POTENTIALLY_WRONG_DATES: return True return False
# region imports from AlgorithmImports import * # endregion config = { "start_date": "2018-01-11", "end_date": "2024-06-11", "contracts_to_buy_per_signal": 15.241583161505584, "max_contracts": 40, "strike_difference": 30, "score_threshold": 0.0425768751869364, "use_plot": False, "use_print": False, "enable_score_exit": False, "score_exit_threshold": -0.3, "enable_premium_comparison": True, "weight_premium_comparison": 3.7575290481228145, "premium_comparison_lower_threshold": -1, "premium_comparison_upper_threshold": 1, "enable_cef": True, "weight_cef": 3.4039516813385275, "enable_ret": True, "weight_ret": 2.753352525305311, "enable_pf": True, "weight_pf": 1.6917137902045185, "pf_baseline": 3, "enable_tf": True, "weight_tf": 3.303661286696551, "enable_scaled_risk_score": True, "weight_scaled_risk_score": 5.9797665864779015, "profit_percentage": 0.3173681672192944, "midpoint": 0.5, "scale_factor": 15, "enable_rtf": True, "weight_rtf": 3.386706690784247, "rtf_lower_threshold": -1, "rtf_upper_threshold": 1, "enable_risk_score_diff": True, "weight_risk_score_diff": 3.8579905623219872, "rsd_lower_threshold": -1, "rsd_upper_threshold": 1, "enable_npf": True, "weight_npf": 1.7369795094005807, "enable_itmf": True, "weight_itmf": 1.0686406445710226, "enable_cf": False, "weight_cf": 0.8676121851188381, "enable_cnnf": False, "weight_cnnf": 0.9980020139821485, "rank_limit": 5, "enable_cf_risk": True, "cf_risk_weight": 0, "enable_cnnf_risk": True, "cnnf_risk_weight": 0, "enable_itmf_risk": True, "itmf_risk_weight": 0, "risk_score_threshold": -0.3, "factors_version": { "ret": "v2", "npf": "v1", "itmf": "v1", "cef": "v1", "rsd": "v1", "rtf": "v1", "rss": "v1", "pf": "v3", "pd": "v1", "cf": "v2" }, "cut_out_periods": [ { "start_date": "2020-02-20", "end_date": "2020-04-01" } ], "enable_noise_addition": False, "na_mean": 0, "na_std": 0.0001, "na_num_knots": 15, "na_trend_strength": 0.01, "na_cluster_size": 20, "na_volatility_factor": 2, "is_additional": False }
from AlgorithmImports import * from iron_condor import IronCondor import config class CreateCondor(): def __init__(self, algo): self.algo = algo def select_strikes(self, symbol, open_price, range_used, df, current_date, bar_time): call_sell_strike = (math.ceil(open_price * (1 + range_used) / 5) * 5) put_sell_strike = (math.floor(open_price * (1 - range_used) / 5) * 5) df['date'] = pd.to_datetime(df['date'], errors='coerce') df['date'] = df['date'].dt.date call_buy_strike = call_sell_strike + 30 put_buy_strike = put_sell_strike - 30 call_buy_symbol = Symbol.CreateOption(symbol, "SPXW", Market.USA, OptionStyle.European, OptionRight.Call, call_buy_strike, datetime(bar_time.year, bar_time.month, bar_time.day)) call_sell_symbol = Symbol.CreateOption(symbol, "SPXW", Market.USA, OptionStyle.European, OptionRight.Call, call_sell_strike, datetime(bar_time.year, bar_time.month, bar_time.day)) put_sell_symbol = Symbol.CreateOption(symbol, "SPXW", Market.USA, OptionStyle.European, OptionRight.Put, put_sell_strike, datetime(bar_time.year, bar_time.month, bar_time.day)) put_buy_symbol = Symbol.CreateOption(symbol, "SPXW", Market.USA, OptionStyle.European, OptionRight.Put, put_buy_strike, datetime(bar_time.year, bar_time.month, bar_time.day)) call_buy_symbol = self.algo.AddIndexOptionContract(call_buy_symbol, Resolution.Minute) call_sell_symbol = self.algo.AddIndexOptionContract(call_sell_symbol, Resolution.Minute) put_sell_symbol = self.algo.AddIndexOptionContract(put_sell_symbol, Resolution.Minute) put_buy_symbol = self.algo.AddIndexOptionContract(put_buy_symbol, Resolution.Minute) #self.algo.Debug(f"{call_buy_symbol} {call_sell_symbol} {put_sell_symbol} {put_buy_symbol}") #current_date = self.algo.time.date() #self.algo.Debug(f"{current_date} {df['date']} //////") #self.algo.Debug(f"bar_date: {repr(current_date)} - df['date']: {repr(df['date'].iloc[-1])}") df.loc[df['date'] == current_date, 'put_buy_strike'] = put_buy_strike df.loc[df['date'] == current_date, 'put_sell_strike'] = put_sell_strike df.loc[df['date'] == current_date, 'call_sell_strike'] = call_sell_strike df.loc[df['date'] == current_date, 'call_buy_strike'] = call_buy_strike df.loc[df['date'] == current_date, 'put_buy_symbol'] = put_buy_symbol df.loc[df['date'] == current_date, 'put_sell_symbol'] = put_sell_symbol df.loc[df['date'] == current_date, 'call_sell_symbol'] = call_sell_symbol df.loc[df['date'] == current_date, 'call_buy_symbol'] = call_buy_symbol return df
# region imports from AlgorithmImports import * import pandas as pd import numpy as np from iron_condor import IronCondor # endregion # Your New Python File def premium_threshold_vectorized(risk_scores): thresholds = np.select( [ risk_scores < 0.3, (risk_scores >= 0.3) & (risk_scores < 0.5), (risk_scores >= 0.5) & (risk_scores <= 0.6), risk_scores > 0.6 ], [ 1.5 + 3.5 * (risk_scores / 0.3), 5 + 5 * ((risk_scores - 0.3) / 0.2), 10 + 10 * ((risk_scores - 0.5) / 0.1), 20 ] ) return thresholds def compute_base_columns(algo, data): columns_to_check = ['close', 'call_sell_strike', 'open', 'daily_range_used'] # Convert the relevant columns to numeric, forcing any non-numeric values to NaN data[columns_to_check] = data[columns_to_check].apply(pd.to_numeric, errors='coerce') # Check for any NaN values (which now include any non-numeric values) for column in columns_to_check: nan_mask = data[column].isna() if nan_mask.any(): nan_timestamps = data['timestamp'][nan_mask].tolist() #algo.Debug(f"Found non-numeric value in column '{column}' at timestamps: {nan_timestamps}") # Only proceed with calculation if there are no NaN values if not data[columns_to_check].isna().any().any(): data['proximity_call'] = (data['close'] - data['call_sell_strike']) / (data['open'] * data['daily_range_used']) #algo.Debug(f"proximity_call: {repr(data['timestamp'].iloc[-1])} {repr(data['close'].iloc[-1])} {repr(data['call_sell_strike'].iloc[-1])} {repr(data['open'].iloc[-1])} {repr(data['daily_range_used'].iloc[-1])}") else: algo.Debug("Cannot calculate 'proximity_call' due to non-numeric values in required columns.") #algo.Debug(f"proximity_call: {repr(data['timestamp'].iloc[0])} {repr(data['close'].iloc[0])} {repr(data['call_sell_strike'].iloc[0])} {repr(data['open'].iloc[0])} {repr(data['daily_range_used'].iloc[0])}") data['proximity_put'] = (data['put_sell_strike'] - data['close']) / (data['open'] * data['daily_range_used']) data['normalized_proximity_call'] = 1 / (1 + np.exp(-data['proximity_call'])) data['normalized_proximity_put'] = 1 / (1 + np.exp(-data['proximity_put'])) market_open_times = pd.to_datetime(data['date'].astype(str) + ' 09:30') data['minutes_since_open'] = (data['timestamp'] - market_open_times).dt.total_seconds() / 60 data['normalized_minutes'] = data['minutes_since_open'] / 390 scaled_minutes = 2 * data['normalized_minutes'] - 0.5 data['time_elapsed'] = 1 / (1 + np.exp(-20 * scaled_minutes)) data['time_elapsed'] = 1 data['risk_score_call'] = data['normalized_proximity_call'] * data['time_elapsed'] data['risk_score_put'] = data['normalized_proximity_put'] * data['time_elapsed'] data['max_risk_score'] = data[['risk_score_call', 'risk_score_put']].max(axis=1) data['ma20_max_risk_score'] = data['max_risk_score'].rolling(window=20).mean() data['premium_threshold'] = premium_threshold_vectorized(data['max_risk_score']) #data['premium'] = -data['premium'] if 'contracts_entered' not in data.columns: data['contracts_entered'] = 0 if 'recent_entry_time' not in data.columns: data['recent_entry_time'] = pd.to_datetime(data['date'].astype(str) + ' 09:30') if 'average_premium' not in data.columns: data['average_premium'] = 0 data['score'] = np.nan data['trading_minutes_per_day'] = 390 # Assuming trading_minutes_per_day is constant def compute_backtest_columns(df, config): df['score_threshold'] = config["score_threshold"] df['max_contracts'] = config["max_contracts"] df['contracts_to_buy'] = config["contracts_to_buy_per_signal"] from factors import factors class TradeUpdater: def __init__(self, algo, config, entry_manager, exit_manager, progress_callback=None): self.current_date = None self.algo = algo self.progress_callback = progress_callback self.entry_manager = entry_manager self.exit_manager = exit_manager self.config = config self.factor_versions = config.get("factors_version", {}) self.relevant_min = 0.2 self.relevant_max = 0.6 self.max_profit_percentage = 1 self.scaling_method = 'log' self.threshold_query = 15 self.went_back = 0 self.lock = False # Instance variable to manage trade lock status self.premium_15_analysis = {"premium_above_lower": 0, "premium_above_higher": 0, "premium_above_lower_amount": 0, "premium_above_higher_amount": 0} def normalize_risk_score(self, score): if score < self.relevant_min: score = self.relevant_min elif score > self.relevant_max: score = self.relevant_max return (score - self.relevant_min) / (self.relevant_max - self.relevant_min) def scale_profit_percentage_linear(self, risk_score): normalized_score = self.normalize_risk_score(risk_score) return (1 - normalized_score) * self.max_profit_percentage def scale_profit_percentage_log(self, risk_score): normalized_score = self.normalize_risk_score(risk_score) adjusted_score = np.log1p(normalized_score) / np.log1p(1) return (1 - adjusted_score) * self.max_profit_percentage def scale_profit_percentage_exp(self, risk_score): normalized_score = self.normalize_risk_score(risk_score) adjusted_score = np.expm1(normalized_score) / np.expm1(1) return (1 - adjusted_score) * self.max_profit_percentage def get_adjusted_profit_percentage(self, risk_score): if self.scaling_method == 'linear': return self.scale_profit_percentage_linear(risk_score) elif self.scaling_method == 'log': return self.scale_profit_percentage_log(risk_score) elif self.scaling_method == 'exp': return self.scale_profit_percentage_exp(risk_score) else: raise ValueError("Invalid scaling method. Choose from 'linear', 'log', or 'exp'.") def get_factor_function(self, factor_name): factor_version = self.factor_versions.get(factor_name, "v1") function_key = f"{factor_name}_{factor_version}" function_name = factors.function_dict.get(function_key) if function_name: return getattr(factors, function_name) else: raise ValueError(f"Unknown {factor_name} version: {factor_version}") def calculate_RET(self, time_since_last_entry_minutes, trading_minutes_per_day): func = self.get_factor_function("ret") return factors.convert_and_calculate(func, time_since_last_entry_minutes, trading_minutes_per_day) def calculate_NPF(self, contracts_entered, entry_premiums, new_premium, strike_difference=30): func = self.get_factor_function("npf") if not entry_premiums or contracts_entered == 0: return 0 else: return func(contracts_entered, entry_premiums, new_premium, strike_difference) def calculate_ITMF(self, current_price, call_strike, put_strike): func = self.get_factor_function("itmf") return factors.convert_and_calculate(func, current_price, call_strike, put_strike) def calculate_CEF(self, max_contracts, contracts_entered): func = self.get_factor_function("cef") return func(max_contracts, contracts_entered) def calculate_cf(self, prediction): func = self.get_factor_function("cf") return factors.convert_and_calculate(func, prediction) def calculate_CEF_score(self, CEF, enable_cef, weight_cef): if enable_cef: return CEF * weight_cef return 0 def calculate_DEF_score(self, DEF): return DEF * 0.3 def calculate_RET_score(self, RET, enable_ret, weight_ret): if enable_ret: return RET * weight_ret return 0 def calculate_NPF_score(self, NPF, enable_npf, weight_npf): if enable_npf: return NPF * weight_npf return 0 def calculate_RTF(self, last_risk_score, risk_score, lower_threshold, upper_threshold): func = self.get_factor_function("rtf") return func(last_risk_score, risk_score, lower_threshold, upper_threshold) def calculate_RTF_score(self, RTF, enable_rtf, weight_rtf): if enable_rtf: return RTF * weight_rtf return 0 def calculate_Risk_Score_Diff(self, max_risk_score, ma20_max_risk_score, lower_threshold, upper_threshold): func = self.get_factor_function("rsd") return func(max_risk_score, ma20_max_risk_score, lower_threshold, upper_threshold) def calculate_Risk_Score_Diff_score(self, risk_score_diff, enable_risk_score_diff, weight_risk_score_diff): if enable_risk_score_diff: return risk_score_diff * weight_risk_score_diff return 0 def calculate_ITMF_score(self, ITMF, enable_itmf, weight_itmf): if enable_itmf: return ITMF * weight_itmf return 0 def scaled_risk_score_sigmoid(self, risk_score, midpoint, scale): func = self.get_factor_function("rss") return factors.convert_and_calculate(func, risk_score, midpoint, scale) def calculate_PF(self, premium, avg_premium_10_days): func = self.get_factor_function("pf") premium_baseline = self.config.get("pf_baseline", 0) return factors.convert_and_calculate(func, self.algo, premium, avg_premium_10_days, premium_baseline) def calculate_PF_score(self, PF, enable_pf, weight_pf): if enable_pf: return PF * weight_pf return 0 def calculate_premium_factor(self, premium, current_average_premium, lower_threshold, upper_threshold): func = self.get_factor_function("pd") return factors.convert_and_calculate(func, premium, current_average_premium, lower_threshold, upper_threshold) def calculate_premium_factor_score(self, premium_factor, enable_premium_comparison, weight_premium_comparison): if enable_premium_comparison: return premium_factor * weight_premium_comparison return 0 def calculate_cf_score(self, cf, enable_cf, cf_weight): if enable_cf: return cf * cf_weight return 0 def calculate_cnnf(self, df, rank_limit): func = self.get_factor_function("cnnf") result = func(df, rank_limit) # If the result is a DataFrame, we need to extract a single column if isinstance(result, pd.DataFrame): # Assuming the first column is the one we want return result.iloc[:, 0] # If it's already a Series, return it as is return result def calculate_cnnf_score(self, cnnf, enable_cnnf, weight_cnnf): if enable_cnnf: return cnnf * weight_cnnf return 0 def calculate_cf_risk_score(self, cf, enable_cf_risk, cf_risk_weight): if enable_cf_risk: return cf * cf_risk_weight return 0 def calculate_cnnf_risk_score(self, cnnf, enable_cnnf_risk, weight_cnnf_risk): if enable_cnnf_risk: return cnnf * weight_cnnf_risk return 0 def calculate_ITMF_risk_score(self, ITMF, enable_itmf_risk, weight_itmf_risk): if enable_itmf_risk: return ITMF * weight_itmf_risk return 0 def calculate_risk_score(self, current_row): risk_score = 0.0 len_enabled = 0.0 if self.config["enable_cf_risk"]: risk_score += self.calculate_cf_risk_score( current_row['cf_risk_score'], self.config["enable_cf_risk"], self.config["cf_risk_weight"] ) len_enabled += 1 if self.config["enable_cnnf_risk"]: risk_score += self.calculate_cnnf_risk_score( current_row['cnnf_risk_score'], self.config["enable_cnnf_risk"], self.config["cnnf_risk_weight"] ) len_enabled += 1 if self.config["enable_itmf_risk"]: risk_score += self.calculate_ITMF_risk_score( current_row['itmf_risk_score'], self.config["enable_itmf_risk"], self.config["itmf_risk_weight"] ) len_enabled += 1 return risk_score, len_enabled def calculate_score(self, results, current_row): score = 0.0 len_enabled = 0.0 if self.config["enable_cef"]: if pd.isna(current_row['CEF_score']): self.algo.debug("CEF_score is NaN") else: score += current_row['CEF_score'] len_enabled +=1 if self.config["enable_ret"]: if pd.isna(current_row['RET_score']): #self.algo.debug("RET_score is NaN") pass else: score += current_row['RET_score'] len_enabled += 1 if self.config["enable_npf"]: if pd.isna(current_row['NPF_score']): self.algo.debug("NPF_score is NaN") else: score += current_row['NPF_score'] len_enabled += 1 if self.config["enable_rtf"]: if pd.isna(current_row['RTF_score']): self.algo.debug("RTF_score is NaN") else: score += current_row['RTF_score'] len_enabled += 1 if self.config["enable_itmf"]: if pd.isna(current_row['ITMF_score']): self.algo.debug("ITMF_score is NaN") else: score += current_row['ITMF_score'] len_enabled += 1 if self.config["enable_pf"]: if pd.isna(current_row['PF_score']): self.algo.debug("PF_score is NaN") else: score += current_row['PF_score'] len_enabled += 1 if self.config["enable_premium_comparison"]: if pd.isna(current_row['premium_factor_score']): self.algo.debug("premium_factor_score is NaN") else: score += current_row['premium_factor_score'] len_enabled += 1 if self.config["enable_scaled_risk_score"]: if pd.isna(current_row['scaled_risk_score_weighted']): self.algo.debug("scaled_risk_score_weighted is NaN") else: score += current_row['scaled_risk_score_weighted'] len_enabled += 1 if self.config["enable_risk_score_diff"]: if pd.isna(current_row['risk_score_diff_score']): self.algo.debug("risk_score_diff_score is NaN") else: score += current_row['risk_score_diff_score'] len_enabled += 1 if self.config["enable_cf"]: if pd.isna(current_row['cf_score']): self.algo.debug("cf_score is NaN") else: score += current_row['cf_score'] len_enabled += 1 if self.config["enable_cnnf"]: if pd.isna(current_row['cnnf_score']): self.algo.debug("cnnf_score is NaN") else: score += current_row['cnnf_score'] len_enabled += 1 return score, len_enabled # Function to normalize the risk score def calculate_dynamic_threshold(self, avg_entry_premium): if avg_entry_premium <= 5: # Linear interpolation between 70% and 50% for premiums between $0 and $5 return avg_entry_premium * np.interp(avg_entry_premium, [0, 5], [0.70, 0.50]) elif avg_entry_premium <= 10: # Linear interpolation between 50% and 30% for premiums between $5 and $10 return avg_entry_premium * np.interp(avg_entry_premium, [5, 10], [0.50, 0.30]) elif avg_entry_premium <= 20: # Linear interpolation between 30% and 20% for premiums between $10 and $20 return avg_entry_premium * np.interp(avg_entry_premium, [10, 20], [0.30, 0.20]) else: # Linear interpolation between 20% and 10% for premiums between $20 and $30 return avg_entry_premium * np.interp(avg_entry_premium, [20, 30], [0.20, 0.10]) def recalculate_scores(self, results, i, current_row, config_to_use): current_row['ITMF_score'] = self.calculate_ITMF_score(current_row['ITMF'], config_to_use["enable_itmf"], config_to_use["weight_itmf"]) current_row['PF_score'] = self.calculate_PF_score(current_row['PF'], config_to_use["enable_pf"], config_to_use["weight_pf"]) current_row['premium_factor_score'] = self.calculate_premium_factor_score(current_row['premium_factor'], config_to_use[ "enable_premium_comparison"], config_to_use[ "weight_premium_comparison"]) if config_to_use["enable_scaled_risk_score"]: current_row['scaled_risk_score'] = self.scaled_risk_score_sigmoid(current_row['max_risk_score'], config_to_use["midpoint"], config_to_use["scale_factor"]) current_row['scaled_risk_score_weighted'] = current_row['scaled_risk_score'] * config_to_use[ "weight_scaled_risk_score"] else: current_row['scaled_risk_score_weighted'] = 0.0 current_row['risk_score_diff_score'] = self.calculate_Risk_Score_Diff_score(current_row['risk_score_diff'], config_to_use[ "enable_risk_score_diff"], config_to_use[ "weight_risk_score_diff"]) current_row['cf_score'] = self.calculate_cf_score(current_row['cf'], config_to_use["enable_cf"], config_to_use["weight_cf"]) current_row['cnnf_score'] = self.calculate_cnnf_score(current_row['cnnf'], config_to_use["enable_cnnf"], config_to_use["weight_cnnf"]) results['NPF_score'][i] = self.calculate_NPF_score(results['NPF'][i], config_to_use["enable_npf"], config_to_use["weight_npf"]) results['RTF_score'][i] = self.calculate_RTF_score(results['RTF'][i], config_to_use["enable_rtf"], config_to_use["weight_rtf"]) current_row['CEF_score'] = self.calculate_CEF_score(current_row['CEF'], config_to_use["enable_cef"], config_to_use["weight_cef"]) current_row['DEF_score'] = self.calculate_DEF_score(current_row['CEF']) current_row['RET_score'] = self.calculate_RET_score(current_row['RET'], config_to_use["enable_ret"], config_to_use["weight_ret"]) return current_row def update(self, df): # Reset index to ensure it starts at 0 df.reset_index(drop=True, inplace=True) # Convert dates to datetime df['time_now'] = pd.to_datetime(df['time_now']) df['recent_entry_time'] = pd.to_datetime(df['recent_entry_time']) # List of columns to check for NaN values columns_to_check = [ 'premium', 'date', 'max_risk_score', 'close', 'call_sell_strike', 'put_sell_strike', 'avg_premium_10_days', 'current_average_premium', 'risk_score_diff', 'ITMF', 'PF', ] # Function to print columns with NaN values def print_nan_columns(columns, step): nan_columns = [column for column in columns if df[column].isna().any()] #if nan_columns: #self.algo.debug(f"NaN values found in columns before {step}: {nan_columns}") # Initial check for NaN values before any calculations #print_nan_columns(columns_to_check, "initial calculations") # Calculate moving averages and initialize entry premiums df['avg_premium_10_days'] = df['premium'].rolling(window=3900, min_periods=1).mean() # Check for NaN after calculating avg_premium_10_days print_nan_columns(['avg_premium_10_days'], "calculating avg_premium_10_days") df['current_average_premium'] = df.groupby('date')['premium'].transform(lambda x: x.expanding().mean()) # Check for NaN after calculating current_average_premium print_nan_columns(['current_average_premium'], "calculating current_average_premium") # Initialize other columns df['average_positional_premium'] = 0.0 # To keep track of the average entry premium df['exited_contracts'] = 0 # To keep track of the number of exited contracts df['profit_exit_premium'] = 0.0 # To keep track of the exit premium at profit-taking df['realized_pnl'] = 0.0 # To keep track of realized PnL df['unrealized_pnl'] = 0.0 # To keep track of unrealized PnL lower_threshold = self.config["premium_comparison_lower_threshold"] upper_threshold = self.config["premium_comparison_upper_threshold"] # Calculate various metrics df['risk_score_diff'] = self.calculate_Risk_Score_Diff( df['max_risk_score'], df['ma20_max_risk_score'], self.config["rsd_lower_threshold"], self.config["rsd_upper_threshold"] ) # Check for NaN after calculating risk_score_diff print_nan_columns(['risk_score_diff'], "calculating risk_score_diff") df['ITMF'] = self.calculate_ITMF(df['close'], df['call_sell_strike'], df['put_sell_strike']) # Check for NaN after calculating ITMF print_nan_columns(['ITMF'], "calculating ITMF") print_nan_columns(['premium'], "premium") df['PF'] = self.calculate_PF(df['premium'], df['avg_premium_10_days']) # Check for NaN after calculating PF print_nan_columns(['PF'], "calculating PF") df['premium_factor'] = self.calculate_premium_factor(df['premium'], df['current_average_premium'], lower_threshold, upper_threshold) df['ITMF_score'] = self.calculate_ITMF_score(df['ITMF'], self.config["enable_itmf"], self.config["weight_itmf"]) df['PF_score'] = self.calculate_PF_score(df['PF'], self.config["enable_pf"], self.config["weight_pf"]) df['premium_factor_score'] = self.calculate_premium_factor_score( df['premium_factor'], self.config["enable_premium_comparison"], self.config["weight_premium_comparison"] ) if self.config["enable_scaled_risk_score"]: df['scaled_risk_score'] = self.scaled_risk_score_sigmoid( df['max_risk_score'], self.config["midpoint"], self.config["scale_factor"] ) df['scaled_risk_score_weighted'] = df['scaled_risk_score'] * self.config["weight_scaled_risk_score"] else: df['scaled_risk_score_weighted'] = 0.0 df['risk_score_diff_score'] = self.calculate_Risk_Score_Diff_score(df['risk_score_diff'], self.config["enable_risk_score_diff"], self.config["weight_risk_score_diff"]) # Calculate CF df['cf'] = self.calculate_cf(df['prediction']) df['cf_score'] = self.calculate_cf_score(df['cf'], self.config["enable_cf"], self.config["weight_cf"]) #df['cnnf'] = self.calculate_cnnf(df, self.config["rank_limit"]) #df['cnnf_score'] = self.calculate_cnnf_score(df['cnnf'], self.config["enable_cnnf"], self.config["weight_cnnf"]) df['cf_risk'] = df['cf'] df['cf_risk_score'] = self.calculate_cf_risk_score(df['cf_risk'], self.config["enable_cf_risk"], self.config["cf_risk_weight"]) #df['cnnf_risk'] = df['cnnf'] #df['cnnf_risk_score'] = self.calculate_cnnf_risk_score(df['cnnf_risk'], self.config["enable_cnnf_risk"], #self.config["cnnf_risk_weight"]) df['itmf_risk'] = df['ITMF'] df['itmf_risk_score'] = self.calculate_ITMF_risk_score(df['itmf_risk'], self.config["enable_itmf_risk"], self.config["itmf_risk_weight"]) def create_ic(self, put_buy_symbol, put_sell_symbol, call_sell_symbol, call_buy_symbol): condor = IronCondor(put_buy_symbol, put_sell_symbol, call_sell_symbol, call_buy_symbol) return condor def calculate_dict(self, df, config, invested_condor=None, model_configs=None): if len(df) == 0: return df, invested_condor, None # Convert the DataFrame to a dictionary for the most recent row current_row = df.iloc[-1].to_dict() orders_trim = [] initial_equity = 100000 total_realized_pnl = df['realized_pnl'].iloc[-1] if len(df) > 1 else 0.0 recent_entry_time = df['recent_entry_time'].iloc[-1] if len(df) > 1 else current_row['time_now'] # Accessing previous row's data if len(df) > 1: previous_row = df.iloc[-2].to_dict() else: previous_row = current_row # Initialize the results for the current row entry_premiums = [] if current_row['date'] != previous_row['date']: entry_premiums.clear() recent_entry_time = current_row['time_now'] else: current_row['contracts_entered'] = df['contracts_entered'].iloc[-2] entry_premiums = entry_premiums.copy() avg_entry_premium = np.mean(entry_premiums) if entry_premiums else 0 current_row['average_positional_premium'] = avg_entry_premium if current_row['timestamp'].time() == pd.Timestamp("16:00:00").time(): if current_row['contracts_entered'] > 0: if current_row['close'] > current_row['put_strike'] and current_row['close'] < current_row['call_strike']: realized_pnl = current_row['contracts_entered'] * avg_entry_premium * 100 elif current_row['close'] <= current_row['put_strike']: loss = min(30, current_row['put_strike'] - current_row['close']) * 100 realized_pnl = current_row['contracts_entered'] * (avg_entry_premium - loss / 100) * 100 elif current_row['close'] >= current_row['call_strike']: loss = min(30, current_row['close'] - current_row['call_strike']) * 100 realized_pnl = current_row['contracts_entered'] * (avg_entry_premium - loss / 100) * 100 realized_pnl -= current_row['contracts_entered'] total_realized_pnl += realized_pnl current_row['realized_pnl'] = total_realized_pnl current_row['exited_contracts'] = current_row['contracts_entered'] current_row['contracts_entered'] = 0 entry_premiums.clear() current_row['profit_exit_premium'] = current_row['premium'] current_row['new_contracts'] = 0 else: current_row['realized_pnl'] = total_realized_pnl else: profit_p = config["profit_percentage"] exit_condition = current_row['premium'] <= (avg_entry_premium * (1 - profit_p)) if exit_condition and current_row['contracts_entered'] > 0: profit_exit_contracts = current_row['contracts_entered'] current_row['contracts_entered'] = 0 entry_premiums.clear() current_row['exited_contracts'] = profit_exit_contracts current_row['profit_exit_premium'] = current_row['premium'] realized_pnl = profit_exit_contracts * (avg_entry_premium - current_row['premium']) * 100 realized_pnl -= profit_exit_contracts total_realized_pnl += realized_pnl current_row['realized_pnl'] = total_realized_pnl current_row['new_contracts'] = 0 self.exit_manager.exit_now(invested_condor) self.lock = True else: current_row['NPF'] = self.calculate_NPF(current_row['contracts_entered'], entry_premiums, current_row['premium']) current_row['NPF_score'] = self.calculate_NPF_score(current_row['NPF'], config["enable_npf"], config["weight_npf"]) current_row['RTF'] = self.calculate_RTF(previous_row['max_risk_score'], current_row['max_risk_score'], config["rtf_lower_threshold"], config["rtf_upper_threshold"]) current_row['RTF_score'] = self.calculate_RTF_score(current_row['RTF'], config["enable_rtf"], config["weight_rtf"]) time_since_last_entry = (current_row['time_now'] - recent_entry_time).total_seconds() / 60 current_row['RET'] = self.calculate_RET(time_since_last_entry, current_row['trading_minutes_per_day']) current_row['CEF'] = self.calculate_CEF(current_row['max_contracts'], current_row['contracts_entered']) current_row['CEF_score'] = self.calculate_CEF_score(current_row['CEF'], config["enable_cef"], config["weight_cef"]) current_row['DEF_score'] = self.calculate_DEF_score(current_row['CEF']) current_row['RET_score'] = self.calculate_RET_score(current_row['RET'], config["enable_ret"], config["weight_ret"]) if model_configs is not None and model is not None: data_point = prepare_real_time_data(current_row, current_row) prediction = make_prediction(model, data_point) config_to_use = model_configs[prediction[0]] current_row['config_to_use'] = prediction[0] current_row = self.recalculate_scores(current_row, current_row, config_to_use) current_row['score'], len_enabled = self.calculate_score(current_row, current_row) score_threshold = len_enabled * config["score_threshold"] date_found = current_row.get('date_found', False) #self.algo.debug(f"{current_row['score']} {score_threshold}") if not self.lock and current_row['timestamp'].time() < pd.Timestamp("15:00:00").time() and not date_found and (current_row['score'] > score_threshold) and (current_row['contracts_entered'] < current_row['max_contracts']): remaining_contracts = min(current_row['contracts_to_buy'], (current_row['max_contracts'] - current_row['contracts_entered'])) current_row['new_contracts'] = remaining_contracts current_row['contracts_entered'] += remaining_contracts current_row['contracts_entered'] = min(current_row['contracts_entered'], current_row['max_contracts']) entry_premiums.extend([current_row['premium']] * remaining_contracts) recent_entry_time = current_row['time_now'] avg_entry_premium = np.mean(entry_premiums) current_row['average_positional_premium'] = avg_entry_premium self.lock = True if invested_condor is None: invested_condor = self.create_ic(current_row["put_buy_symbol"], current_row["put_sell_symbol"], current_row["call_sell_symbol"], current_row["call_buy_symbol"]) invested_condor, orders_trim = self.entry_manager.place_trade(current_row, invested_condor) if current_row['contracts_entered'] > 0: unrealized_pnl = current_row['contracts_entered'] * (avg_entry_premium - current_row['premium']) * 100 else: unrealized_pnl = 0 current_row['unrealized_pnl'] = unrealized_pnl current_row['account_equity'] = initial_equity + total_realized_pnl + unrealized_pnl # Update the DataFrame with the current row's results for key, value in current_row.items(): df.at[len(df) - 1, key] = value return df, invested_condor, orders_trim
from AlgorithmImports import * import config class EntryManager(): def __init__(self, algorithm, symbol, avg_down_manager): self.algo = algorithm self.symbol = symbol self.avg_down_manager = avg_down_manager ############################################################################################ def get_target_quantity(self, contract, target_quantity, condor): total_quantity = 0 # Iterate over each entry for entry, info in condor.entry_premiums_trading.items(): # Check if the boolean value is True if info and len(info) >= 3: if info[2]: # Add the quantity to the total total_quantity += info[1] if target_quantity < 0: total_quantity = -total_quantity target_quantity += total_quantity # Get the current quantity of the contract in the portfolio. It can be negative or positive. current_quantity = self.algo.Portfolio[contract.Symbol].Quantity self.algo.Debug(f"Current quantity {contract.Symbol} {current_quantity}") # Calculate the quantity needed to reach the target. If the target or current is negative, # this calculation will account for it and return the correct adjustment amount. quantity_needed = target_quantity - current_quantity # Return the calculated quantity needed to reach the target. return quantity_needed def complete_trade(self, layer, condor, quantity, key): for key_ in layer: condor.active_layers.append(key_) self.avg_down_manager.entered_key = key self.avg_down_manager.avg_down_counter += 1 # condor.entry_premium = condor.Premium # condor.calculate_avg_entry() if condor.first_entry_time is None: condor.first_entry_time = int(self.algo.Time.strftime("%H%M").replace(":", "")) condor.entry_time = int(self.algo.Time.strftime("%H%M").replace(":", "")) condor.entry_time_algo = self.algo.Time condor.entry_time_algo_2 = self.algo.Time condor.is_re_entry = False if self.avg_down_manager.first_entry is None: self.avg_down_manager.first_entry = condor.Premium if condor.first_entry is None: condor.first_entry = condor.Premium # self.ago.Debug("ENTRY") return condor def trim_condor(self, condor): legs = [] # legs.append(Leg.Create(condor.Put_Buy.Symbol, 1)) # legs.append(Leg.Create(condor.Put_Sell.Symbol, -1)) # legs.append(Leg.Create(condor.Call_Buy.Symbol, 1)) # legs.append(Leg.Create(condor.Call_Sell.Symbol, -1)) # self.algo.ComboMarketOrder(legs, quantity) weekly_canonical_symbol = Symbol.CreateCanonicalOption(self.symbol, "SPXW", Market.USA, "?SPXW") call_buy = condor.Call_Buy call_sell = condor.Call_Sell put_sell = condor.Put_Sell put_buy = condor.Put_Buy call_buy_quantity = self.get_target_quantity(call_buy, quantity, condor) call_sell_quantity = self.get_target_quantity(call_sell, -quantity, condor) put_sell_quantity = self.get_target_quantity(put_sell, -quantity, condor) put_buy_quantity = self.get_target_quantity(put_buy, quantity, condor) if call_buy_quantity != quantity: self.algo.MarketOrder(call_buy.Symbol, call_buy_quantity) if put_buy_quantity != quantity: self.algo.MarketOrder(put_buy.Symbol, put_buy_quantity) def place_trade(self, row, condor): avg_down_sig, quantity, layer = self.avg_down_manager.check_avg_down(condor) call_buy = condor.Call_Buy call_sell = condor.Call_Sell put_sell = condor.Put_Sell put_buy = condor.Put_Buy call_buy_quantity = self.get_target_quantity(call_buy, quantity, condor) call_sell_quantity = self.get_target_quantity(call_sell, -quantity, condor) put_sell_quantity = self.get_target_quantity(put_sell, -quantity, condor) put_buy_quantity = self.get_target_quantity(put_buy, quantity, condor) condor.time_exit = True self.algo.Debug(f"{quantity} {call_buy_quantity} {call_sell_quantity} {put_sell_quantity} {put_buy_quantity}") cs_premium = self.algo.Securities[condor.Call_Sell.Symbol].BidPrice cb_premium = self.algo.Securities[condor.Call_Buy.Symbol].AskPrice ps_premium = self.algo.Securities[condor.Put_Sell.Symbol].BidPrice pb_premium = self.algo.Securities[condor.Put_Buy.Symbol].AskPrice legs = [] key = 1 if call_buy_quantity == quantity and call_sell_quantity == -quantity and put_sell_quantity == -quantity and put_buy_quantity == quantity and ps_premium < 0.1: self.algo.Debug("IN ENTRY NORMAL NO PUTS") condor.partial = False legs.append(Leg.Create(condor.Call_Buy.Symbol, 1)) legs.append(Leg.Create(condor.Call_Sell.Symbol, -1)) order_trim = self.algo.ComboMarketOrder(legs, quantity, tag="REGULAR TRADING ENTRY") for order in order_trim: # Append each 'OrderId' to the 'condor.entry_orders' list condor.entry_orders.append(order.OrderId) return (self.complete_trade(layer, condor, quantity, key), []) elif call_buy_quantity == quantity and call_sell_quantity == -quantity and put_sell_quantity == -quantity and put_buy_quantity == quantity and cs_premium < 0.1: self.algo.Debug("IN ENTRY NORMAL NO CALLS") condor.partial = False legs.append(Leg.Create(condor.Put_Sell.Symbol, -1)) legs.append(Leg.Create(condor.Put_Buy.Symbol, 1)) order_trim = self.algo.ComboMarketOrder(legs, quantity, tag="REGULAR TRADING ENTRY") for order in order_trim: # Append each 'OrderId' to the 'condor.entry_orders' list condor.entry_orders.append(order.OrderId) return (self.complete_trade(layer, condor, quantity, key), []) elif call_buy_quantity == quantity and call_sell_quantity == -quantity and put_sell_quantity == -quantity and put_buy_quantity == quantity: self.algo.Debug("IN ENTRY NORMAL") condor.partial = False legs.append(Leg.Create(condor.Call_Buy.Symbol, 1)) legs.append(Leg.Create(condor.Call_Sell.Symbol, -1)) legs.append(Leg.Create(condor.Put_Sell.Symbol, -1)) legs.append(Leg.Create(condor.Put_Buy.Symbol, 1)) order_trim = self.algo.ComboMarketOrder(legs, quantity, tag="REGULAR TRADING ENTRY") for order in order_trim: # Append each 'OrderId' to the 'condor.entry_orders' list condor.entry_orders.append(order.OrderId) return (self.complete_trade(layer, condor, quantity, key), []) elif call_buy_quantity == quantity and call_sell_quantity == -quantity and put_sell_quantity == -quantity and put_buy_quantity <= 0: self.algo.Debug("IN ENTRY 1") condor.partial = False legs.append(Leg.Create(condor.Call_Buy.Symbol, 1)) legs.append(Leg.Create(condor.Call_Sell.Symbol, -1)) legs.append(Leg.Create(condor.Put_Sell.Symbol, -1)) order_trim = self.algo.ComboMarketOrder(legs, quantity, tag="REGULAR TRADING ENTRY") for order in order_trim: # Append each 'OrderId' to the 'condor.entry_orders' list condor.entry_orders.append(order.OrderId) return (self.complete_trade(layer, condor, quantity, key), []) elif call_buy_quantity <= 0 and call_sell_quantity == -quantity and put_sell_quantity == -quantity and put_buy_quantity == quantity: self.algo.Debug("IN ENTRY 2") condor.partial = False legs.append(Leg.Create(condor.Call_Sell.Symbol, -1)) legs.append(Leg.Create(condor.Put_Sell.Symbol, -1)) legs.append(Leg.Create(condor.Put_Buy.Symbol, 1)) order_trim = self.algo.ComboMarketOrder(legs, quantity, tag="REGULAR TRADING ENTRY") for order in order_trim: # Append each 'OrderId' to the 'condor.entry_orders' list condor.entry_orders.append(order.OrderId) return (self.complete_trade(layer, condor, quantity, key), []) elif call_sell_quantity == -quantity and put_sell_quantity == -quantity and call_buy_quantity == 0 and put_buy_quantity == 0: self.algo.Debug("IN ENTRY 3") condor.partial = False legs.append(Leg.Create(condor.Call_Sell.Symbol, -1)) legs.append(Leg.Create(condor.Put_Sell.Symbol, -1)) order_trim = self.algo.ComboMarketOrder(legs, quantity, tag="REGULAR TRADING SELL LEGS ENTRY") for order in order_trim: # Append each 'OrderId' to the 'condor.entry_orders' list condor.entry_orders.append(order.OrderId) return (self.complete_trade(layer, condor, quantity, key), []) # trim later elif call_sell_quantity == -quantity and put_sell_quantity == -quantity and call_buy_quantity < 0 and put_buy_quantity == 0: self.algo.Debug("IN TRIM 1") condor.partial = False legs.append(Leg.Create(condor.Call_Sell.Symbol, -1)) legs.append(Leg.Create(condor.Put_Sell.Symbol, -1)) order_trim = self.algo.ComboMarketOrder(legs, quantity, tag="REGULAR TRADING ENTRY TRIM CALL BUY LATER") for order in order_trim: # Append each 'OrderId' to the 'condor.entry_orders' list condor.entry_orders.append(order.OrderId) return (self.complete_trade(layer, condor, quantity, key), order_trim) # trim later elif call_sell_quantity == -quantity and put_sell_quantity == -quantity and call_buy_quantity == 0 and put_buy_quantity < 0: self.algo.Debug("IN TRIM 2") condor.partial = False legs.append(Leg.Create(condor.Call_Sell.Symbol, -1)) legs.append(Leg.Create(condor.Put_Sell.Symbol, -1)) order_trim = self.algo.ComboMarketOrder(legs, quantity, tag="REGULAR TRADING ENTRY TRIM PUT BUY LATER") for order in order_trim: # Append each 'OrderId' to the 'condor.entry_orders' list condor.entry_orders.append(order.OrderId) return (*self.complete_trade(layer, condor, quantity, key), order_trim) # trim later elif call_sell_quantity == -quantity and put_sell_quantity == -quantity and call_buy_quantity < 0 and put_buy_quantity < 0: self.algo.Debug("IN TRIM 3") condor.partial = False legs.append(Leg.Create(condor.Call_Sell.Symbol, -1)) legs.append(Leg.Create(condor.Put_Sell.Symbol, -1)) order_trim = self.algo.ComboMarketOrder(legs, quantity, tag="REGULAR TRADING ENTRY TRIM BOTH LATER") for order in order_trim: # Append each 'OrderId' to the 'condor.entry_orders' list condor.entry_orders.append(order.OrderId) return (self.complete_trade(layer, condor, quantity, key), order_trim) else: self.algo.Debug("IN ELSE") condor.partial = True if call_buy_quantity != quantity: condor.partial_orders.append(self.algo.MarketOrder(call_buy.Symbol, call_buy_quantity, tag="REGULAR TRADING ENTRY TRIM CB")) if call_sell_quantity != -quantity: condor.partial_orders.append(self.algo.MarketOrder(call_sell.Symbol, call_sell_quantity, tag="REGULAR TRADING ENTRY TRIM Cs")) if put_sell_quantity != -quantity: condor.partial_orders.append(self.algo.MarketOrder(put_sell.Symbol, put_sell_quantity, tag="REGULAR TRADING ENTRY TRIM PS")) if put_buy_quantity != quantity: condor.partial_orders.append(self.algo.MarketOrder(put_buy.Symbol, put_buy_quantity, tag="REGULAR TRADING ENTRY TRIM PB"))
from AlgorithmImports import * import config class ExitManager(): def __init__(self, algorithm, symbol, close_trading_window): self.algo = algorithm self.symbol = symbol self.avg_down_manager = None ################################################################################################### self.is_exit_process = False self.is_safety_legs_exit_process = False self.last_action_time = None #self.algorithm.Schedule.On(self.algorithm.DateRules.EveryDay(), self.algorithm.TimeRules.BeforeMarketClose(self.symbol, 1), close_trading_window) def check_stop_loss(self, condor): if self.stop_loss_mode == "FIRST ENTRY": if condor.Calculate_Premium >= condor.first_entry * self.stop_loss_multiplier: self.algo.Liquidate(condor.Call_Sell.Symbol) self.algo.Liquidate(condor.Call_Buy.Symbol) self.algo.Liquidate(condor.Put_Sell.Symbol) self.algo.Liquidate(condor.Put_Buy.Symbol) elif self.stop_loss_mode == "MOST RECENT ENTRY": if condor.Calculate_Premium >= condor.entry_premium * self.stop_loss_multiplier: self.algo.Liquidate(condor.Call_Sell.Symbol) self.algo.Liquidate(condor.Call_Buy.Symbol) self.algo.Liquidate(condor.Put_Sell.Symbol) self.algo.Liquidate(condor.Put_Buy.Symbol) elif self.stop_loss_mode == "AVG ENTRY": if condor.Calculate_Premium >= condor.avg_entry * self.stop_loss_multiplier: self.algo.Liquidate(condor.Call_Sell.Symbol) self.algo.Liquidate(condor.Call_Buy.Symbol) self.algo.Liquidate(condor.Put_Sell.Symbol) self.algo.Liquidate(condor.Put_Buy.Symbol) else: raise Exception(f"Please input a correct STOP LOSS MODE in the config file. {config.STOP_LOSS_MODE} is incorrect") def check_absolute_stop_loss(self, condor): if self.stop_loss_mode == "FIRST ENTRY": if condor.Calculate_Premium() >= condor.first_entry + self.absolute_stop_loss: self.algo.Liquidate(condor.Call_Sell.Symbol) self.algo.Liquidate(condor.Call_Buy.Symbol) self.algo.Liquidate(condor.Put_Sell.Symbol) self.algo.Liquidate(condor.Put_Buy.Symbol) elif self.stop_loss_mode == "MOST RECENT ENTRY": if condor.Calculate_Premium() >= condor.entry_premium + self.absolute_stop_loss: self.algo.Liquidate(condor.Call_Sell.Symbol) self.algo.Liquidate(condor.Call_Buy.Symbol) self.algo.Liquidate(condor.Put_Sell.Symbol) self.algo.Liquidate(condor.Put_Buy.Symbol) elif self.stop_loss_mode == "AVG ENTRY": if condor.Calculate_Premium() >= condor.avg_entry + self.absolute_stop_loss: self.algo.Liquidate(condor.Call_Sell.Symbol) self.algo.Liquidate(condor.Call_Buy.Symbol) self.algo.Liquidate(condor.Put_Sell.Symbol) self.algo.Liquidate(condor.Put_Buy.Symbol) def check_regular_profit_take(self, condor): if not self.use_trading_quantity: if self.profit_taking_mode == "FIRST ENTRY": if condor.Calculate_Premium() <= condor.first_entry * self.regular_profit_percentage: self.algo.Liquidate(condor.Call_Sell.Symbol, tag=f"REGULAR PROFIT TAKE at {config.REGULAR_PROFIT_PERCENTAGE}% from FIRST ENTRY Premium {condor.Premium}") self.algo.Liquidate(condor.Call_Buy.Symbol, tag=f"REGULAR PROFIT TAKE at {config.REGULAR_PROFIT_PERCENTAGE}% from FIRST ENTRY Premium {condor.Premium}") self.algo.Liquidate(condor.Put_Sell.Symbol, tag=f"REGULAR PROFIT TAKE at {config.REGULAR_PROFIT_PERCENTAGE}% from FIRST ENTRY Premium {condor.Premium}") self.algo.Liquidate(condor.Put_Buy.Symbol, tag=f"REGULAR PROFIT TAKE at {config.REGULAR_PROFIT_PERCENTAGE}% from FIRST ENTRY Premium {condor.Premium}") elif self.profit_taking_mode == "MOST RECENT ENTRY": if condor.Calculate_Premium() <= condor.entry_premium * self.regular_profit_percentage: self.algo.Liquidate(condor.Call_Sell.Symbol, tag=f"REGULAR PROFIT TAKE at {config.REGULAR_PROFIT_PERCENTAGE}% from MOST RECENT ENTRY Premium {condor.Premium}") self.algo.Liquidate(condor.Call_Buy.Symbol, tag=f"REGULAR PROFIT TAKE at {config.REGULAR_PROFIT_PERCENTAGE}% from MOST RECENT ENTRY Premium {condor.Premium}") self.algo.Liquidate(condor.Put_Sell.Symbol, tag=f"REGULAR PROFIT TAKE at {config.REGULAR_PROFIT_PERCENTAGE}% from MOST RECENT ENTRY Premium {condor.Premium}") self.algo.Liquidate(condor.Put_Buy.Symbol, tag=f"REGULAR PROFIT TAKE at {config.REGULAR_PROFIT_PERCENTAGE}% from MOST RECENT ENTRY Premium {condor.Premium}") elif self.profit_taking_mode == "AVG ENTRY": if condor.Calculate_Premium() <= condor.avg_entry * self.regular_profit_percentage: self.algo.Liquidate(condor.Call_Sell.Symbol, tag=f"REGULAR PROFIT TAKE at {config.REGULAR_PROFIT_PERCENTAGE}% from AVG ENTRY Premium {condor.Premium}") self.algo.Liquidate(condor.Call_Buy.Symbol, tag=f"REGULAR PROFIT TAKE at {config.REGULAR_PROFIT_PERCENTAGE}% from AVG ENTRY Premium {condor.Premium}") self.algo.Liquidate(condor.Put_Sell.Symbol, tag=f"REGULAR PROFIT TAKE at {config.REGULAR_PROFIT_PERCENTAGE}% from AVG ENTRY Premium {condor.Premium}") self.algo.Liquidate(condor.Put_Buy.Symbol, tag=f"REGULAR PROFIT TAKE at {config.REGULAR_PROFIT_PERCENTAGE}% from AVG ENTRY Premium {condor.Premium}") ############################################################################################################# if self.use_trading_quantity and condor: premium = condor.calculate_premium() condor.calculate_trading_entry_avg_premium() if condor.avg_trading_premium != 0: current_profit = (condor.avg_trading_premium - premium) self.algo.Debug(current_profit) if self.trading_profit_taking_mode == "FIXED" and current_profit >= self.trading_quantity_fixed_profit: # for layer in condor.active_layers: # self.avg_down_manager.avg_down_layers[layer][1] = False # condor.active_layers.clear() # condor.current_trading_quantity = 0 self.is_exit_process = True quantity_held = self.algo.Portfolio[condor.Call_Sell.Symbol].Quantity if quantity_held != 0: quantity_send = -quantity_held exit_1 = self.algo.MarketOrder(condor.Call_Sell.Symbol, quantity_send, tag="REGULAR PROFIT TAKE") condor.exits["cs"] = exit_1.OrderId quantity_held_2 = self.algo.Portfolio[condor.Put_Sell.Symbol].Quantity if quantity_held_2 != 0: quantity_send_2 = -quantity_held_2 exit_3 = self.algo.MarketOrder(condor.Put_Sell.Symbol, quantity_send_2, tag="REGULAR PROFIT TAKE") condor.exits["ps"] = exit_3.OrderId profit_percentage = self.trading_profit_percentage # if self.use_adjusted_profit_take: # start_day_con = True if int( # self.spx_info.time.replace(":", "")) >= self.profit_adjust_time else False # profit_percentage = self.adjusted_profit if self.trading_profit_taking_mode == "PERCENTAGE" and current_profit >= (condor.avg_trading_premium * profit_percentage): # for layer in condor.active_layers: # self.avg_down_manager.avg_down_layers[layer][1] = False self.is_exit_process = True quantity_held = self.algo.Portfolio[condor.Call_Sell.Symbol].Quantity if quantity_held != 0: quantity_send = -quantity_held exit_1 = self.algo.MarketOrder(condor.Call_Sell.Symbol, quantity_send, tag="REGULAR PROFIT TAKE") condor.exits["cs"] = exit_1.OrderId quantity_held_2 = self.algo.Portfolio[condor.Put_Sell.Symbol].Quantity if quantity_held_2 != 0: quantity_send_2 = -quantity_held_2 exit_3 = self.algo.MarketOrder(condor.Put_Sell.Symbol, quantity_send_2, tag="REGULAR PROFIT TAKE") condor.exits["ps"] = exit_3.OrderId # condor.active_layers.clear() # condor.entry_premiums_trading.clear() # condor.current_trading_quantity = 0 # quantity_held = self.algo.Portfolio[condor.Call_Sell.Symbol].Quantity # if quantity_held != 0: # quantity_send = -quantity_held # exit_1 = self.algo.MarketOrder(condor.Call_Sell.Symbol, quantity_send, tag="REGULAR PROFIT TAKE") # self.algo.Transactions.WaitForOrder(exit_1.OrderId) # if self.algo.Portfolio[condor.Call_Buy.Symbol].Quantity != 0: # exit_2 = self.algo.Liquidate(condor.Call_Buy.Symbol, tag="REGULAR PROFIT TAKE") # quantity_held_2 = self.algo.Portfolio[condor.Put_Sell.Symbol].Quantity # if quantity_held_2 != 0: # quantity_send_2 = -quantity_held_2 # exit_3 = self.algo.MarketOrder(condor.Put_Sell.Symbol, quantity_send_2, tag="REGULAR PROFIT TAKE") # self.algo.Transactions.WaitForOrder(exit_3.OrderId) # if self.algo.Portfolio[condor.Put_Buy.Symbol].Quantity != 0: # exit_4 = self.algo.Liquidate(condor.Put_Buy.Symbol, tag="REGULAR PROFIT TAKE") def check_timed_profit_exit(self, condor): if self.algo.Time <= condor.entry_time + self.minutes_after_entry: if self.profit_taking_mode == "FIRST ENTRY": if condor.Calculate_Premium() <= condor.first_entry * self.timed_profit_percentage: exit_1 = self.algo.Liquidate(condor.Call_Sell.Symbol) exit_2 = self.algo.Liquidate(condor.Call_Buy.Symbol) exit_3 = self.algo.Liquidate(condor.Put_Sell.Symbol) exit_4 = self.algo.Liquidate(condor.Put_Buy.Symbol) elif self.profit_taking_mode == "MOST RECENT ENTRY": if condor.Calculate_Premium() <= condor.entry_premium * self.timed_profit_percentage: exit_1 = self.algo.Liquidate(condor.Call_Sell.Symbol) exit_2 = self.algo.Liquidate(condor.Call_Buy.Symbol) exit_3 = self.algo.Liquidate(condor.Put_Sell.Symbol) exit_4 = self.algo.Liquidate(condor.Put_Buy.Symbol) elif self.profit_taking_mode == "AVG ENTRY": if condor.Calculate_Premium() <= condor.avg_entry * self.timed_profit_percentage: self.algo.Liquidate(condor.Call_Sell.Symbol) self.algo.Liquidate(condor.Call_Buy.Symbol) self.algo.Liquidate(condor.Put_Sell.Symbol) self.algo.Liquidate(condor.Put_Buy.Symbol) def exit_now(self, condor): self.is_exit_process = True quantity_held = self.algo.Portfolio[condor.Call_Sell.Symbol].Quantity if quantity_held != 0: quantity_send = -quantity_held exit_1 = self.algo.MarketOrder(condor.Call_Sell.Symbol, quantity_send, tag="TIME EXIT") condor.exits["cs"] = exit_1.OrderId quantity_held_2 = self.algo.Portfolio[condor.Put_Sell.Symbol].Quantity if quantity_held_2 != 0: quantity_send_2 = -quantity_held_2 exit_3 = self.algo.MarketOrder(condor.Put_Sell.Symbol, quantity_send_2, tag="TIME EXIT") condor.exits["ps"] = exit_3.OrderId def check_timed_exit(self, condor): if condor.entry_time_algo is not None: if self.timed_exit_mode == "SIMPLE" and condor.time_exit: self.algo.Debug(f"Entry {condor.entry_time_algo} {self.timed_Exit_minutes}") if condor.entry_time_algo + self.timed_Exit_minutes <= self.algo.Time: self.is_exit_process = True quantity_held = self.algo.Portfolio[condor.Call_Sell.Symbol].Quantity if quantity_held != 0: quantity_send = -quantity_held exit_1 = self.algo.MarketOrder(condor.Call_Sell.Symbol, quantity_send, tag="TIME EXIT") condor.exits["cs"] = exit_1.OrderId quantity_held_2 = self.algo.Portfolio[condor.Put_Sell.Symbol].Quantity if quantity_held_2 != 0: quantity_send_2 = -quantity_held_2 exit_3 = self.algo.MarketOrder(condor.Put_Sell.Symbol, quantity_send_2, tag="TIME EXIT") condor.exits["ps"] = exit_3.OrderId # for layer in condor.active_layers: # self.avg_down_manager.avg_down_layers[layer][1] = False # condor.active_layers.clear() # condor.entry_premiums_trading.clear() # condor.current_trading_quantity = 0 # quantity_held = self.algo.Portfolio[condor.Call_Sell.Symbol].Quantity # if quantity_held != 0: # quantity_send = -quantity_held # exit_1 = self.algo.MarketOrder(condor.Call_Sell.Symbol, quantity_send, tag="TIMED EXIT") # self.algo.Transactions.WaitForOrder(exit_1.OrderId) # if self.algo.Portfolio[condor.Call_Buy.Symbol].Quantity != 0: # exit_2 = self.algo.Liquidate(condor.Call_Buy.Symbol, tag="TIMED EXIT") # quantity_held_2 = self.algo.Portfolio[condor.Put_Sell.Symbol].Quantity # if quantity_held_2 != 0: # quantity_send_2 = -quantity_held_2 # exit_3 = self.algo.MarketOrder(condor.Put_Sell.Symbol, quantity_send_2, tag="TIMED EXIT") # self.algo.Transactions.WaitForOrder(exit_3.OrderId) # if self.algo.Portfolio[condor.Put_Buy.Symbol].Quantity != 0: # exit_4 = self.algo.Liquidate(condor.Put_Buy.Symbol, tag="TIMED EXIT") # condor.entry_time_algo = self.algo.Time # condor_time_exit = False elif self.timed_exit_mode == "FIXED TIME": if self.timed_exit_fixed.strftime("%H%M") >= self.algo.Time.strftime("%H%M"): self.algo.Liquidate(condor.Call_Sell.Symbol) self.algo.Liquidate(condor.Call_Buy.Symbol) self.algo.Liquidate(condor.Put_Sell.Symbol) self.algo.Liquidate(condor.Put_Buy.Symbol)
# region imports from AlgorithmImports import * # endregion # Your New Python File
# region imports from AlgorithmImports import * # endregion import numpy as np import pandas as pd from numba import njit #@njit(cache=True) def calculate_pd_v1(premium, current_average_premium, lower_threshold, upper_threshold): # Calculate the percentage difference percentage_difference = (premium - current_average_premium) / current_average_premium # Linearly scale the percentage difference between -1 and 1 scaled_difference = (percentage_difference - lower_threshold) / (upper_threshold - lower_threshold) * 2 - 1 # Clamp the scaled difference between -1 and 1 scaled_difference = np.clip(scaled_difference, -1, 1) return scaled_difference #@njit(cache=True) def calculate_pf_v1(premium, avg_premium_10_days, premium_baseline): PF = np.zeros_like(premium) # Linear scale part mask_linear = (premium > premium_baseline) & (premium <= avg_premium_10_days) linear_scale = (premium[mask_linear] - premium_baseline) / (avg_premium_10_days[mask_linear] - premium_baseline) PF[mask_linear] = linear_scale * 0.5 # Scale to fit in the first half of 0 to 1 range # Exponential scale part mask_exponential = premium > avg_premium_10_days capped_premium = np.minimum(premium[mask_exponential], 30) # Cap premium at $30 exponential_scale = (capped_premium - avg_premium_10_days[mask_exponential]) / ( 30 - avg_premium_10_days[mask_exponential]) exponential_scale = np.exp(exponential_scale) - 1 # Exponential function max_exponential = np.exp(1) - 1 # Maximum of exponential function from 0 to 1 PF[mask_exponential] = 0.5 + ( 0.5 * exponential_scale / max_exponential) # Scale to fit in the second half of 0 to 1 range return PF #@njit(cache=True) def calculate_pf_v2(premium, avg_premium_10_days, premium_baseline): PF = np.zeros_like(premium) # Linear scale part below the baseline (scaling from -1 to 0) mask_below_baseline = premium <= premium_baseline linear_scale_below = (premium[mask_below_baseline] - premium_baseline) / premium_baseline PF[mask_below_baseline] = np.clip(-1 + linear_scale_below, -1, 0) # Scale to fit in the range of -1 to 0 # Linear scale part above the baseline (scaling from 0 to 1) mask_above_baseline_linear = (premium > premium_baseline) & (avg_premium_10_days <= premium_baseline) linear_scale_above = (premium[mask_above_baseline_linear] - premium_baseline) / premium_baseline PF[mask_above_baseline_linear] = np.clip(linear_scale_above, 0, 1) # Scale to fit in the range of 0 to 1 # Exponential scale part (scaling from 0 to 1) if avg_premium_10_days is higher than the baseline mask_above_baseline_exponential = (premium > premium_baseline) & (avg_premium_10_days > premium_baseline) capped_premium = np.minimum(premium[mask_above_baseline_exponential], 30) # Cap premium at $30 exponential_scale = (capped_premium - premium_baseline) / (30 - premium_baseline) exponential_scale = np.exp(exponential_scale) - 1 # Exponential function max_exponential = np.exp(1) - 1 # Maximum of exponential function from 0 to 1 PF[mask_above_baseline_exponential] = np.clip(exponential_scale / max_exponential, 0, 1) # Scale to fit in the range of 0 to 1 return PF #@njit(cache=True) import numpy as np def calculate_pf_v3(algo, premium, avg_premium_10_days, premium_baseline): # Set up PF and masks PF = np.zeros_like(premium) # Calculate masks and intermediate values mask_premium_low = premium <= 2 PF[mask_premium_low] = -10 mask_below_baseline = (premium <= premium_baseline) & ~mask_premium_low linear_scale_below = (premium[mask_below_baseline] - premium_baseline) / premium_baseline PF[mask_below_baseline] = np.clip(-1 + linear_scale_below, -1, 0) mask_above_baseline_linear = (premium > premium_baseline) & (avg_premium_10_days <= premium_baseline) & ~mask_premium_low linear_scale_above = (premium[mask_above_baseline_linear] - premium_baseline) / premium_baseline PF[mask_above_baseline_linear] = np.clip(linear_scale_above, 0, 1) mask_above_baseline_exponential = (premium > premium_baseline) & (avg_premium_10_days > premium_baseline) & ~mask_premium_low capped_premium = np.minimum(premium[mask_above_baseline_exponential], 30) exponential_scale = (capped_premium - premium_baseline) / (30 - premium_baseline) # This is the critical check if exponential_scale.size > 0 and not np.issubdtype(exponential_scale.dtype, np.number): algo.debug("Debugging Info:") algo.debug(f"premium: {premium}") algo.debug(f"avg_premium_10_days: {avg_premium_10_days}") algo.debug(f"premium_baseline: {premium_baseline}") algo.debug(f"PF (partial): {PF}") algo.debug(f"mask_premium_low: {mask_premium_low}") algo.debug(f"mask_below_baseline: {mask_below_baseline}") algo.debug(f"mask_above_baseline_linear: {mask_above_baseline_linear}") algo.debug(f"mask_above_baseline_exponential: {mask_above_baseline_exponential}") algo.debug(f"exponential_scale: {exponential_scale}") # Now, let the error happen naturally by proceeding with np.exp exponential_scale = np.exp(exponential_scale) - 1 # This line may cause the error max_exponential = np.exp(1) - 1 PF[mask_above_baseline_exponential] = np.clip(exponential_scale / max_exponential, 0, 1) return PF #@njit(cache=True) def scaled_rss_v1(risk_score, midpoint, scale): return -1 / (1 + np.exp(-scale * (risk_score - midpoint))) def calculate_rsd_v1(max_risk_score, ma20_max_risk_score, lower_threshold, upper_threshold): def scale_risk_score(x, lower_threshold, upper_threshold): """ This function scales the risk score based on the provided thresholds. It scales negatively exponentially below zero and linearly above zero. Parameters: x (float): The risk score difference. lower_threshold (float): The lower threshold for scaling. upper_threshold (float): The upper threshold for scaling. Returns: float: The scaled risk score. """ if x < lower_threshold: return -1.0 elif x > upper_threshold: return 1.0 elif x < 0: # Exponential scaling for negative values return -np.exp(x / abs(lower_threshold)) else: # Linear scaling for non-negative values return x / upper_threshold risk_score_diff = -((max_risk_score - ma20_max_risk_score) / max_risk_score) scaled_risk_score_diff = np.vectorize(lambda x: scale_risk_score(x, lower_threshold, upper_threshold))(risk_score_diff) return scaled_risk_score_diff def calculate_rsd_v2(max_risk_score, ma20_max_risk_score, lower_threshold, upper_threshold): # Calculate risk score difference vectorized risk_score_diff = -(max_risk_score - ma20_max_risk_score) / max_risk_score # Initialize scaled risk score difference array scaled_risk_score_diff = np.zeros_like(risk_score_diff) # Apply vectorized conditional logic # Case 1: x < lower_threshold scaled_risk_score_diff[risk_score_diff < lower_threshold] = -1.0 # Case 2: x > upper_threshold scaled_risk_score_diff[risk_score_diff > upper_threshold] = 1.0 # Case 3: 0 <= x <= upper_threshold (Linear scaling) mask_linear = (risk_score_diff >= 0) & (risk_score_diff <= upper_threshold) scaled_risk_score_diff[mask_linear] = risk_score_diff[mask_linear] / upper_threshold @staticmethod def calculate_rtf_v1(last_risk_score, risk_score, lower_threshold, upper_threshold): # Calculate the difference difference = risk_score - last_risk_score # Linearly scale the difference between -1 and 1 scaled_difference = (difference - lower_threshold) / (upper_threshold - lower_threshold) * 2 - 1 # Clamp the scaled difference between -1 and 1 scaled_difference = max(-1, min(1, scaled_difference)) return scaled_difference @staticmethod def calculate_cf_v1(prediction): return prediction.apply(lambda x: 0 if x == "Winning" else -1) @staticmethod def calculate_cf_v2(prediction): # Use numpy where for vectorized conditional logic return np.where(prediction == "Winning", 0, -1) @staticmethod def calculate_cef_v1(max_contracts, contracts_entered): return (max_contracts - contracts_entered) / max_contracts @njit(cache=True) def fermi_dirac(x, mu, k): return 1 / (1 + np.exp((x - mu) / k)) @njit(cache=True) def calculate_itmf_v1(current_price, call_strike, put_strike): k1 = 3.0 # Slope parameter for the left side k2 = -3.0 # Slope parameter for the right side mu = (call_strike + put_strike) / 2 k = np.where(current_price < mu, k1, k2) mu = np.where(k > 0, mu-53, mu+53) return -fermi_dirac(current_price, mu, k) @staticmethod def calculate_npf_v1(contracts_entered, entry_premiums, new_premium, strike_difference=30): if contracts_entered > 0: current_avg_premium = np.sum(entry_premiums) / contracts_entered difference = new_premium - current_avg_premium if difference < 0: # Apply exponential scaling for negative differences scaled_difference = -np.exp(-difference / (strike_difference / 2)) + 1 else: # Apply linear scaling for positive differences scaled_difference = difference / (strike_difference / 2) scaled_difference = np.clip(scaled_difference, -1, 1) return scaled_difference return 0 #@njit(cache=True) def calculate_ret_v1(time_since_last_entry_minutes, trading_minutes_per_day): normalized_time = time_since_last_entry_minutes / trading_minutes_per_day return np.exp(normalized_time) - 1 #@njit(cache=True) def calculate_ret_v2(time_since_last_entry_minutes, trading_minutes_per_day): threshold = 30 if time_since_last_entry_minutes <= threshold: normalized_time = time_since_last_entry_minutes / trading_minutes_per_day return (1 - normalized_time) * -1 else: # Scale to stay within -1 and 0 after the threshold remaining_time = trading_minutes_per_day - threshold normalized_time = (time_since_last_entry_minutes - threshold) / remaining_time return (1 - (np.exp(normalized_time) - 1) / (np.exp(1) - 1)) * -1 # Note: The `calculate_cnnf_v1` function is not compatible with Numba's njit due to its reliance on Pandas operations. @staticmethod def calculate_cnnf_v1(df, rank_limit): # Validate the rank_limit if rank_limit < 1 or rank_limit > 10: raise ValueError("rank_limit must be between 1 and 10") # Generate the list of rank columns to sum based on rank_limit rank_columns = [f'rank_{i}' for i in range(1, rank_limit + 1)] # Sum the ranks rank_sum = df[rank_columns].sum(axis=1) # Scale the sum between 0 and -1 min_val = rank_sum.min() max_val = rank_sum.max() scaled_rank_sum = -1 * ((rank_sum - min_val) / (max_val - min_val)) return scaled_rank_sum def convert_and_calculate(func, *args): numpy_args = [arg.to_numpy() if isinstance(arg, pd.Series) else arg for arg in args] return func(*numpy_args) function_dict = { "pd_v1": "calculate_pd_v1", "pf_v2": "calculate_pf_v2", "pf_v3": "calculate_pf_v3", "rss_v1": "scaled_rss_v1", "rsd_v1": "calculate_rsd_v1", "rsd_v2": "calculate_rsd_v2", "rtf_v1": "calculate_rtf_v1", "cef_v1": "calculate_cef_v1", "itmf_v1": "calculate_itmf_v1", "npf_v1": "calculate_npf_v1", "ret_v1": "calculate_ret_v1", "ret_v2": "calculate_ret_v2", "cf_v1": "calculate_cf_v1", "cf_v2": "calculate_cf_v2", "cnnf_v1": "calculate_cnnf_v1", # Not compatible with Numba }
# region imports from AlgorithmImports import * import datetime from datetime import timedelta import pytz # endregion # Your New Python File def get_historic_ranges(algo, range_selector, history_period, df, condor_selector): # Timezones ny_tz = pytz.timezone('America/New_York') central_tz = pytz.timezone('America/Chicago') # Central Time (1 hour behind New York) # Determine the history to be used history = algo.History[TradeBar](range_selector.symbol, timedelta(20), Resolution.Daily) # Fetch minute-level history, enough to cover 9:30 AM candles history_minute = algo.History[TradeBar](range_selector.symbol, timedelta(20), Resolution.Minute) history_minute_vix = algo.History[TradeBar](range_selector.vix, timedelta(20), Resolution.Minute) # Iterate over each bar in the historical daily data for bar in history: # Localize the naive datetime to Central Time, then convert to New York Time bar_date = pd.Timestamp(bar.EndTime).tz_localize(central_tz).astimezone(ny_tz).date() # Initialize the 9:30 AM candle's open price as None vix_open = None # Iterate over the minute-level data to find the 9:30 AM candle for minute_bar in history_minute_vix: minute_bar_time = pd.Timestamp(minute_bar.EndTime).tz_localize(central_tz).astimezone(ny_tz) # algo.Debug(f"Last bar_timestamp: {repr(minute_bar_time.time())}") if minute_bar_time.date() == bar_date and minute_bar_time.time() == datetime.time(9, 31): vix_open = minute_bar.Open break if vix_open is not None: # Update the range_selector.vix_open with the open price of the 9:30 AM candle range_selector.vix_open = vix_open # Consolidate the daily bar into the range selector range_selector.bar_consolidator.Update(bar) range_today = range_selector.set_range_today() # Calculate the range for this specific day # Update the df with the range values for the corresponding bar_date df_date_mask = df['date'] == bar_date if df_date_mask.any(): # Ensure the date exists in the DataFrame df.loc[df_date_mask, 'range_used'] = range_today df.loc[df_date_mask, 'daily_range_used'] = range_today # Call the select_strikes function from the condor_selector open_price = bar.Open # You may want to replace this with the correct open price if it's different df = condor_selector.select_strikes(range_selector.symbol, open_price, range_today, df, bar_date, bar.EndTime) history_minute_list = list(history_minute) for index, bar in enumerate(history_minute_list): # Localize the naive datetime to Central Time, then convert to New York Time, and strip the timezone bar_timestamp = pd.Timestamp(bar.EndTime).tz_localize(central_tz).astimezone(ny_tz).tz_localize(None) # Skip bars after 16:00 (4:00 PM) if bar_timestamp.time() > datetime.time(16, 0): continue # Extracting OHLC data open_price = bar.Open high_price = bar.High low_price = bar.Low close_price = bar.Close # Update the DataFrame with OHLC data where the timestamp matches df_timestamp_mask = df['timestamp'] == bar_timestamp if df_timestamp_mask.any(): # Ensure the timestamp exists in the DataFrame df.loc[df_timestamp_mask, 'open'] = open_price df.loc[df_timestamp_mask, 'high'] = high_price df.loc[df_timestamp_mask, 'low'] = low_price df.loc[df_timestamp_mask, 'close'] = close_price # Print the bar_timestamp repr if this is the last iteration if index == len(history_minute_list) - 1: algo.Debug(f"Last bar_timestamp: {repr(bar_timestamp)}") return df def get_historic_options_data(algo, df): # Get unique dates from the DataFrame unique_dates = df['date'].unique() # Iterate over each unique date for current_date in unique_dates: # Filter the DataFrame for the current date date_df = df[df['date'] == current_date] #algo.Debug(f"{repr(current_date)}") # Since the symbols are the same for the same date, we can use the first row's symbols put_buy_symbol = date_df.iloc[0]['put_buy_symbol'] put_sell_symbol = date_df.iloc[0]['put_sell_symbol'] call_sell_symbol = date_df.iloc[0]['call_sell_symbol'] call_buy_symbol = date_df.iloc[0]['call_buy_symbol'] #algo.Debug(f"{put_buy_symbol} {put_sell_symbol} {call_sell_symbol} {call_buy_symbol}") # Request historical data for the specific date put_buy_history = algo.History([put_buy_symbol.Symbol], current_date, current_date, Resolution.Minute) put_sell_history = algo.History([put_sell_symbol.Symbol], current_date, current_date, Resolution.Minute) call_sell_history = algo.History([call_sell_symbol.Symbol], current_date, current_date, Resolution.Minute) call_buy_history = algo.History([call_buy_symbol.Symbol], current_date, current_date, Resolution.Minute) # Get the latest price (premium) from the historical data for the current date put_buy_premium = put_buy_history['close'].iloc[-1] if not put_buy_history.empty else 0 put_sell_premium = put_sell_history['close'].iloc[-1] if not put_sell_history.empty else 0 call_sell_premium = call_sell_history['close'].iloc[-1] if not call_sell_history.empty else 0 call_buy_premium = call_buy_history['close'].iloc[-1] if not call_buy_history.empty else 0 # Calculate the total premium (credit received) premium = put_sell_premium + call_sell_premium - put_buy_premium - call_buy_premium premium = max(0.05, premium) # Update all rows for the current date with the premium values df.loc[df['date'] == current_date, 'put_buy_premium'] = put_buy_premium df.loc[df['date'] == current_date, 'put_sell_premium'] = put_sell_premium df.loc[df['date'] == current_date, 'call_sell_premium'] = call_sell_premium df.loc[df['date'] == current_date, 'call_buy_premium'] = call_buy_premium df.loc[df['date'] == current_date, 'premium'] = premium return df
#region imports from AlgorithmImports import * from collections import deque #endregion import config import statistics as stats class IronCondor(): def __init__(self, put_buy, put_sell, call_buy, call_sell): self.Put_Buy = put_buy self.Put_Sell = put_sell self.Call_Buy = call_buy self.Call_Sell = call_sell self.time_exit = False self.Sent_Already = False self.Premium = None self.entry_premium = None self.Entry_Fees = None self.partial = False self.partial_orders = [] self.first_entry = None self.entry_premiums = [] self.avg_entry = None self.Lowest_Premium = None self.Scanned_Already = False self.entry_orders = [] self.entry_time = None self.entry_time_algo = None self.Reached_Threshold = False self.Profit = None self.Stopped_Out = False self.Profit_Taken = False self.Current_Profit = None self.Margin_Required = None self.Sent_Open_Range = False self.Highest_Premium = -1000000 self.Weight = 0 self.Entered_Increments = [] self.exits = {"pb": None, "ps": None, "cs": None, "cb": None} #self.Avg_Down_Type = config.AVG_DOWN_TYPE #self.Underlying_Increment = config.UNDERLYING_PERCENT_MOVE_INCREMENT/100 self.Pivot_Queue = deque(maxlen=3) ########################################################################################## self.active_layers = [] self.first_entry_time = None self.entry_premiums_trading = {} self.avg_trading_premium = None self.trading_entries = 0 self.max_trading_quantity = 40 self.current_trading_quantity = 0 def check_range(self, min_range, max_range): if self.open_call_range >= min_range and self.open_put_range >= min_range and self.open_call_range <= max_range and self.open_put_range <= max_range: return True else: return False def calculate_trading_entry_avg_premium(self): length = 0 total_quantity = 0 total_value = 0 for entry, info in self.entry_premiums_trading.items(): premium, quantity, is_open = info if is_open: length += 1 total_quantity += quantity total_value += (premium * quantity) if length > 0: self.avg_trading_premium = total_value / total_quantity else: self.avg_trading_premium = 0 self.current_trading_quantity = total_quantity # self.algo.Debug(self.entry_premiums_trading) return self.avg_trading_premium def calculate_avg_entry(self): self.entry_premiums.append(self.entry_premium) self.avg_entry = stats.mean(self.entry_premiums) def calculate_premium(self): cs_premium = self.algo.Securities[self.Call_Sell.Symbol].BidPrice cb_premium = self.algo.Securities[self.Call_Buy.Symbol].AskPrice ps_premium = self.algo.Securities[self.Put_Sell.Symbol].BidPrice pb_premium = self.algo.Securities[self.Put_Buy.Symbol].AskPrice call_premium = cs_premium - cb_premium put_premium = ps_premium - pb_premium list_of_ba = [-pb_premium, ps_premium, -cb_premium, cs_premium] self.Premium = sum(list_of_ba) #self.algo.Debug(f"pb ask {pb_premium} ps bid {ps_premium} cs bid {cs_premium} cb ask {cb_premium}") #self.algo.Debug(f"pb ask {self.Call_Sell.AskPrice} ps bid {self.Put_Sell.BidPrice} cs bid {self.Call_Sell.BidPrice} cb ask {self.Call_Buy.AskPrice}") return self.Premium #def Calculate_Underlying_Re_Entry(self): #if
# region imports from AlgorithmImports import * from select_range import SelectRange import pandas as pd import numpy as np from datetime import datetime, timedelta from utils import columns, load_config from history_warmup import get_historic_ranges, get_historic_options_data from df_utils import compute_base_columns, compute_backtest_columns, TradeUpdater from check_calendar import CheckCalendar from create_condor import CreateCondor from avg_down import AvgDown from entry import EntryManager from exits import ExitManager import config import yfinance as yf import requests # endregion class HipsterFluorescentOrangeAnguilline(QCAlgorithm): def initialize(self): self.set_start_date(2023, 2, 12) self.set_cash(100000) self.set_time_zone(TimeZones.NEW_YORK) self.Portfolio.MarginCallModel = MarginCallModel.Null self.Portfolio.SetPositions(SecurityPositionGroupModel.Null) self.SetSecurityInitializer(self.CustomSecurityInitializer) self.calendar_checker = CheckCalendar(self, self.close_trading_window, self.open_window, self.close_window) self.condor_creator = CreateCondor(self) self.config = load_config() self.symbol = self.AddIndex("SPX", Resolution.Minute).Symbol self.avg_down_manager = AvgDown(self, self.symbol, self.config) self.entry_manager = EntryManager(self, self.symbol, self.avg_down_manager) self.exit_manager = ExitManager(self, self.symbol, self.close_trading_window) self.range_selector = SelectRange(self, self.symbol) self.trade_updater = TradeUpdater(self, self.config, self.entry_manager, self.exit_manager) self.trading_window = False self.open_price = None self.invested_condor = None self.first_data_point = True self.waiting_on_cancel = False self.orders_to_cancel = [] self.order_trim = [] self.reset = False self.id_ = "-723291833" self.token_ = "5616527268:AAGkUitVosgYOntpLb_JU5HK_SSXm86GsVs" self.created_ic_today = False # Define the columns you want to add # Add other columns as needed self.df = pd.DataFrame(columns=columns) # Calculate the last 10 business days relative to self.Time history_period = self.get_last_10_business_days(self.Time) # Populate the DataFrame with rows for each date with timestamps from 9:30 AM to 4:00 PM # Initialize an empty list to collect the rows rows = [] for date in history_period: start_time = datetime.combine(date, datetime.strptime('09:31', '%H:%M').time()) end_time = datetime.combine(date, datetime.strptime('16:00', '%H:%M').time()) time_range = pd.date_range(start=start_time, end=end_time, freq='T') # Minute frequency for timestamp in time_range: # Initialize a row dictionary with all columns set to None row = {col: None for col in columns} # Set the specific values for date and timestamp row['date'] = date row['timestamp'] = timestamp # Append the row to the list rows.append(row) # Assign the created DataFrame from the list of rows to self.df self.df = pd.DataFrame(rows, columns=columns) # Pass the dynamic history_period to the get_historic_ranges function self.df = get_historic_ranges(self, self.range_selector, history_period, self.df, self.condor_creator) self.df = get_historic_options_data(self, self.df) compute_base_columns(self, self.df) self.trade_updater.update(self.df) def get_last_10_business_days(self, current_time): # Create a date range for the last 10 business days up to current_time end_date = (current_time - pd.Timedelta(days=1)).date() business_days = pd.bdate_range(end=end_date, periods=10).to_pydatetime() return business_days def get_open_yfinance(self): try: spx = yf.Ticker('^GSPC') # Get historical data for the SPX historical_data = spx.history(period='1d') # Get the most recent date and open price latest_open_price = historical_data['Open'].iloc[-1] # Get the most recent date and open price latest_date = historical_data.index[-1].strftime('%Y-%m-%d') if latest_date == self.Time.strftime("%Y-%m-%d"): if latest_open_price is not None and latest_open_price != 0 and latest_open_price < 100000: self.open_price = latest_open_price except: self.Debug(f"No yahoo data at {self.Time}") def open_window(self): self.Debug(self.Time) self.open_price = self.Securities[self.symbol].Open #self.get_open_yfinance() self.trading_window = True if self.calendar_checker.check_calendar(): self.trading_window = False self.Debug(f"{self.Time} {self.open_price}") self.reset = False def close_window(self): #self.close_if_at_loss() self.created_combinations_today = False self.trading_window = False self.today_condors = None self.today_keys = None self.todays_premium = None self.todays_min_range = None self.todays_max_range = None self.invested_condor = None self.open_price = None self.avg_down_manager.reset() def CustomSecurityInitializer(self, security: Security) -> None: # Disable trading fees security.SetBuyingPowerModel(BuyingPowerModel.Null) def close_trading_window(self): self.Liquidate() self.created_combinations_today = False self.trading_window = False self.today_condors = None self.today_keys = None self.todays_premium = None self.todays_min_range = None self.todays_max_range = None self.invested_condor = None self.avg_down_manager.reset() def send_message(self, message_): # URL to send the message through Telegram API url = f"https://api.telegram.org/bot{self.token_}/sendMessage" # Payload to send payload = { "chat_id": self.id_, "text": message_, "parse_mode": "HTML" } # Sending the message response = requests.post(url, data=payload) # Check if the request was successful if response.status_code == 200: self.Debug("Message sent successfully.") else: self.Debug(f"Failed to send message. Status code: {response.status_code}") def generate_uncovered_short_message(self, cb_quantity, cs_quantity, ps_quantity, pb_quantity): # Construct the message with a warning about uncovered shorts message_ = (f"Warning: There may be uncovered shorts that require attention." f"Currently held quantities:\n" f"CB Quantity {cb_quantity}\n" f"CS Quantity: {cs_quantity}\n" f"PS Quantity: {ps_quantity}\n" f"PB Quantity: {pb_quantity}\n\n" ) return message_ def reset_options(self): self.send_message("Attempting to reset trading.") self.Liquidate() self.invested_condor.exits = {key: None for key in self.invested_condor.exits} self.exit_manager.is_exit_process = False self.exit_manager.is_safety_legs_exit_process = False self.exit_manager.last_action_time = None for layer in self.invested_condor.active_layers: self.avg_down_manager.avg_down_layers[layer][1] = False self.invested_condor.active_layers.clear() self.invested_condor.current_trading_quantity = 0 self.invested_condor.entry_premiums_trading = {} self.invested_condor = None self.reset = True def check_open_positions(self): reset = False if self.invested_condor is not None: # Retrieve quantities for each option in the condor strategy cb_quantity = self.Portfolio[self.invested_condor.Call_Buy.Symbol].Quantity cs_quantity = self.Portfolio[self.invested_condor.Call_Sell.Symbol].Quantity ps_quantity = self.Portfolio[self.invested_condor.Put_Sell.Symbol].Quantity pb_quantity = self.Portfolio[self.invested_condor.Put_Buy.Symbol].Quantity #self.Debug(f"cb {cb_quantity} cs {cs_quantity} ps {ps_quantity} pb {pb_quantity}") # Collect symbols from the invested condor invested_condor_symbols = [self.invested_condor.Call_Buy.Symbol, self.invested_condor.Call_Sell.Symbol, self.invested_condor.Put_Sell.Symbol, self.invested_condor.Put_Buy.Symbol] # Check if Call Sell position is covered by Call Buy position if abs(cs_quantity) > cb_quantity: self.send_message(self.generate_uncovered_short_message(cb_quantity, cs_quantity, ps_quantity, pb_quantity)) #self.Debug("Call Sell position is not fully covered by Call Buy.") self.liquidate_uncovered_short(self.invested_condor.Call_Sell.Symbol) self.liquidate_uncovered_short(self.invested_condor.Put_Sell.Symbol) reset = True # Check if Put Sell position is covered by Put Buy position if abs(ps_quantity) > pb_quantity: self.send_message(self.generate_uncovered_short_message(cb_quantity, cs_quantity, ps_quantity, pb_quantity)) #self.Debug("Put Sell position is not fully covered by Put Buy.") self.liquidate_uncovered_short(self.invested_condor.Put_Sell.Symbol) self.liquidate_uncovered_short(self.invested_condor.Call_Sell.Symbol) reset = True # Find all invested option symbols option_invested = [x.Key for x in self.Portfolio if x.Value.Invested and x.Value.Type == SecurityType.IndexOption] # Check for any option invested symbols not in the invested_condor_symbols uncovered_options = [symbol for symbol in option_invested if symbol not in invested_condor_symbols] if uncovered_options: self.send_message(self.generate_uncovered_short_message(cb_quantity, cs_quantity, ps_quantity, pb_quantity)) #self.Debug("There are invested option symbols not covered by the condor strategy:", uncovered_options) # Check for short positions among uncovered options short_uncovered_options = [symbol for symbol in uncovered_options if self.Portfolio[symbol].Quantity < 0] if short_uncovered_options: #self.Debug("There are uncovered options with short positions:", short_uncovered_options) for option in short_uncovered_options: self.liquidate_uncovered_short(option) reset = True if reset: self.reset_options() def liquidate_uncovered_short(self, symbol): # Check if the symbol is in the portfolio and has a negative quantity if symbol in self.Portfolio and self.Portfolio[symbol].Quantity < 0: # Calculate the quantity needed to liquidate (close) the short position quantity_to_buy = abs(self.Portfolio[symbol].Quantity) # Place a buy order for the quantity needed to liquidate the position order = self.MarketOrder(symbol, quantity_to_buy) self.Transactions.WaitForOrder(order.OrderId) #self.Debug(f"Liquidated short position for {symbol} by buying {quantity_to_buy} units.") def OnOrderEvent(self, orderEvent: OrderEvent) -> None: order = self.Transactions.GetOrderById(orderEvent.OrderId) if orderEvent.Status == OrderStatus.Filled: if self.invested_condor is not None: # Check if the order is an entry order if orderEvent.OrderId in self.invested_condor.entry_orders: # Remove the filled order from the entry orders list self.invested_condor.entry_orders.remove(orderEvent.OrderId) # Check if all entry orders have been processed if len(self.invested_condor.entry_orders) == 0 and not self.invested_condor.partial and not self.exit_manager.is_exit_process and not self.reset and len(self.Transactions.GetOpenOrders()) == 0: # Check that no exit orders are active before printing self.invested_condor.all_entries_filled = True self.trade_updater.lock = False #self.Debug("All entry orders have been filled and no exits are pending.") if orderEvent.Status == OrderStatus.Filled: if self.invested_condor is not None and orderEvent.OrderId in self.invested_condor.partial_orders: self.invested_condor.partial_orders.remove(orderEvent.OrderId) if self.order_trim and order.Id in [x.OrderId for x in self.order_trim]: self.order_trim = [x for x in self.order_trim if x.OrderId != order.Id] if len(self.order_trim) == 0: self.entry_manager.trim_condor(self.invested_condor) if self.invested_condor is not None and len(self.invested_condor.partial_orders) == 0 and self.invested_condor.partial: self.invested_condor.partial = False self.entry_manager.check_avg_down(self.today_condors, self.today_keys, self.todays_min_range, self.todays_max_range, self.todays_premium) order_id = orderEvent.OrderId if self.invested_condor is not None and order_id in self.invested_condor.exits.values(): if orderEvent.Status == OrderStatus.Filled: if order_id == self.invested_condor.exits["ps"]: self.invested_condor.exits["ps"] = None quantity_held = self.Portfolio[self.invested_condor.Put_Buy.Symbol].Quantity if quantity_held != 0: quantity_send = -quantity_held exit_1 = self.MarketOrder(self.invested_condor.Put_Buy.Symbol, quantity_send, tag="TIMED EXIT") self.invested_condor.exits["pb"] = exit_1.OrderId self.exit_manager.last_action_time = self.Time self.exit_manager.is_safety_legs_exit_process = True if order_id == self.invested_condor.exits["cs"]: self.invested_condor.exits["cs"] = None quantity_held = self.Portfolio[self.invested_condor.Call_Buy.Symbol].Quantity if quantity_held != 0: quantity_send = -quantity_held exit_2 = self.MarketOrder(self.invested_condor.Call_Buy.Symbol, quantity_send, tag="TIMED EXIT") self.invested_condor.exits["cb"] = exit_2.OrderId self.exit_manager.last_action_time = self.Time self.exit_manager.is_safety_legs_exit_process = True if self.invested_condor is not None and order_id in self.invested_condor.exits.values(): if orderEvent.Status == OrderStatus.Filled or orderEvent.Status == OrderStatus.Canceled: if self.invested_condor.exits["pb"] is not None: pb_ticket = self.Transactions.GetOrderTicket(self.invested_condor.exits["pb"]).Status if order_id == self.invested_condor.exits["pb"] and pb_ticket == OrderStatus.Filled or pb_ticket == OrderStatus.Canceled: self.invested_condor.exits["pb"] = None if self.invested_condor.exits["cb"] is not None: cb_ticket = self.Transactions.GetOrderTicket(self.invested_condor.exits["cb"]).Status if order_id == self.invested_condor.exits["cb"] and cb_ticket == OrderStatus.Filled or cb_ticket == OrderStatus.Canceled: self.invested_condor.exits["cb"] = None if self.invested_condor is not None and all(x is None for x in self.invested_condor.exits.values()) and self.exit_manager.is_exit_process and self.exit_manager.is_safety_legs_exit_process: self.invested_condor.exits = {key: None for key in self.invested_condor.exits} self.exit_manager.is_exit_process = False self.exit_manager.is_safety_legs_exit_process = False self.exit_manager.last_action_time = None for layer in self.invested_condor.active_layers: self.avg_down_manager.avg_down_layers[layer][1] = False self.invested_condor.active_layers.clear() self.invested_condor.current_trading_quantity = 0 self.invested_condor.entry_premiums_trading = {} self.trade_updater.lock = False if self.exit_manager.last_action_time is not None and self.Time >= self.exit_manager.last_action_time + timedelta(minutes=5): if self.invested_condor is not None: if self.invested_condor.exits["cb"] is not None: #self.Debug(f"Canceling CB {self.Time}") self.Transactions.CancelOrder(self.invested_condor.exits["cb"]) if self.invested_condor.exits["pb"] is not None: #self.Debug(f"Canceling PB {self.Time}") self.Transactions.CancelOrder(self.invested_condor.exits["pb"]) def print_non_unique_index(self): # Check if the index of the DataFrame is unique if not self.df.index.is_unique: # Find the duplicate index values duplicate_indices = self.df.index[self.df.index.duplicated()].unique() self.Debug(f"Non-unique index values found: {duplicate_indices}") else: self.Debug("No non-unique index values found.") # Call this method after any operation where you suspect index issues. def initialize_day_data(self, date): try: # Define the timestamp for the current time current_timestamp = datetime.combine(date, self.time.time()) # Initialize a dictionary to store the new data new_data = {col: None for col in self.df.columns if col not in ['date']} # Set 'timestamp' as the index if it isn't already if 'timestamp' not in self.df.index.names: self.df.set_index('timestamp', inplace=True) # Check if the current timestamp is already in the DataFrame if current_timestamp in self.df.index: self.Debug(f"Timestamp {current_timestamp} already exists in DataFrame.") else: # If timestamp is not present, create a new row new_row = {'date': date} new_row.update(new_data) new_row['range_used'] = self.todays_min_range new_row['daily_range_used'] = self.todays_min_range # Append the new row to the DataFrame with the timestamp as the index self.df.loc[current_timestamp] = new_row # Sort by index (timestamp) self.df.sort_index(inplace=True) #self.Debug(f"Added new row for timestamp {current_timestamp}.") # Reset index to have 'timestamp' as a column if needed self.df.reset_index(inplace=True) # Debug information #self.Debug(f"DataFrame shape after initialization: {self.df.shape}") #self.Debug(f"DataFrame columns: {self.df.columns}") #self.Debug(f"DataFrame dtypes:\n{self.df.dtypes}") #self.Debug(f"Sample of DataFrame:\n{self.df.head()}") except Exception as e: self.Debug(f"Error in initialize_day_data: {str(e)}") self.Debug(f"DataFrame before error:\n{self.df.head()}") self.Debug(f"DataFrame info:\n{self.df.info()}") raise def on_data(self, data: Slice): self.check_open_positions() if self.trading_window: if not self.created_ic_today: min_range = self.range_selector.set_range_today() self.todays_min_range = min_range if self.open_price is None: self.open_price = self.Securities[self.symbol].Open current_date = self.time.date() self.initialize_day_data(current_date) #self.debug("PAST INIT") self.df = self.condor_creator.select_strikes(self.symbol, self.open_price, min_range, self.df, current_date, self.time) # Find the existing row by date and timestamp #self.debug(f"{repr(self.df['timestamp'].iloc[-1])} {self.time.time()}") row_index = self.df[(self.df['date'] == self.Time.date()) & (self.df['timestamp'] == self.Time)].index if not row_index.empty: # Retrieve the correct row index row_index = row_index[0] # Retrieve the option symbols for the current row put_buy_symbol = self.df.at[row_index, 'put_buy_symbol'] put_sell_symbol = self.df.at[row_index, 'put_sell_symbol'] call_sell_symbol = self.df.at[row_index, 'call_sell_symbol'] call_buy_symbol = self.df.at[row_index, 'call_buy_symbol'] # Check if the symbols are missing in the current row and carry forward from the previous row if pd.isna(put_buy_symbol) and row_index > 0: put_buy_symbol = self.df.at[row_index - 1, 'put_buy_symbol'] if pd.isna(put_sell_symbol) and row_index > 0: put_sell_symbol = self.df.at[row_index - 1, 'put_sell_symbol'] if pd.isna(call_sell_symbol) and row_index > 0: call_sell_symbol = self.df.at[row_index - 1, 'call_sell_symbol'] if pd.isna(call_buy_symbol) and row_index > 0: call_buy_symbol = self.df.at[row_index - 1, 'call_buy_symbol'] # Retrieve the premiums for each option using the symbols from the current row put_buy_premium = self.Securities[put_buy_symbol.Symbol].Price if put_buy_symbol.Symbol in self.Securities else 0 put_sell_premium = self.Securities[put_sell_symbol.Symbol].Price if put_sell_symbol.Symbol in self.Securities else 0 call_sell_premium = self.Securities[call_sell_symbol.Symbol].Price if call_sell_symbol.Symbol in self.Securities else 0 call_buy_premium = self.Securities[call_buy_symbol.Symbol].Price if call_buy_symbol.Symbol in self.Securities else 0 # Calculate the total premium for the Iron Condor total_condor_premium = (put_sell_premium - put_buy_premium) + (call_sell_premium - call_buy_premium) total_condor_premium = max(0.05, total_condor_premium) # Update the existing row with all relevant data self.df.at[row_index, 'open'] = self.Securities[self.symbol].open self.df.at[row_index, 'high'] = self.Securities[self.symbol].high self.df.at[row_index, 'low'] = self.Securities[self.symbol].low self.df.at[row_index, 'close'] = self.Securities[self.symbol].close self.df.at[row_index, 'put_buy_premium'] = put_buy_premium self.df.at[row_index, 'put_sell_premium'] = put_sell_premium self.df.at[row_index, 'call_sell_premium'] = call_sell_premium self.df.at[row_index, 'call_buy_premium'] = call_buy_premium self.df.at[row_index, 'premium'] = total_condor_premium self.df.at[row_index, 'realized_pnl'] = 0 # or update with actual value self.df.at[row_index, 'unrealized_pnl'] = 0 # or update with actual value self.df.at[row_index, 'account_equity'] = 0 # or update with actual value self.df.at[row_index, 'contracts_entered'] = 0 # Initialize or update self.df.at[row_index, 'exited_contracts'] = 0 # Initialize or update self.df.at[row_index, 'profit_exit_premium'] = 0 # Initialize or update self.df.at[row_index, 'new_contracts'] = 0 # Initialize or update self.df.at[row_index, 'average_positional_premium'] = 0 # Initialize or update self.df.at[row_index, 'NPF'] = 0 # Initialize or update self.df.at[row_index, 'NPF_score'] = 0 # Initialize or update self.df.at[row_index, 'RTF'] = 0 # Initialize or update self.df.at[row_index, 'RTF_score'] = 0 # Initialize or update self.df.at[row_index, 'RET'] = 0 # Initialize or update self.df.at[row_index, 'CEF'] = 0 # Initialize or update self.df.at[row_index, 'CEF_score'] = 0 # Initialize or update self.df.at[row_index, 'DEF_score'] = 0 # Initialize or update self.df.at[row_index, 'RET_score'] = 0 # Initialize or update self.df.at[row_index, 'score'] = 0 # Initialize or update self.df.at[row_index, 'config_to_use'] = None # Initialize or update self.df.at[row_index, 'outcome'] = None # Initialize or update # Fallback in case the row doesn't exist, though with proper pre-population this should not happen #self.Debug(f"Row for {self.Time} not found, this should not happen if pre-population is correct.") # You can handle this scenario if it actually occurs # Compute base columns on the updated DataFrame compute_base_columns(self, self.df) # Optionally, apply backtest computations compute_backtest_columns(self.df, self.config) self.trade_updater.update(self.df) self.df, invested_condor, orders_trim = self.trade_updater.calculate_dict(self.df, self.config, self.invested_condor) # Implement your logic based on the updated DataFrame
from AlgorithmImports import * import config class SelectRange(): def __init__(self, algorithm, symbol, history_period=None): self.algorithm = algorithm self.symbol = symbol self.range_mode = "AUTOMATIC" self.range_max_today = None self.range_min_today = None self.automatic_range_percentages = [1, 1.5, 2, 2.5, 3] self.automatic_range_max_add = 0.3 self.vix_threshold_1 = 30 self.vix_threshold_2 = 25 self.vix_threshold_3 = 20 self.atr_threshold_1 = 0.02 self.atr_threshold_2 = 0.02 self.atr_21_indie = NormalizedAverageTrueRange(21) self.atr_21 = None self.vix = self.algorithm.add_index("VIX", Resolution.MINUTE).symbol # Initialize VIX symbol self.vix_open = None self.use_1 = False self.use_1_5 = False self.use_2 = False self.use_2_5 = False self.use_3 = False self.bar_consolidator = TradeBarConsolidator(timedelta(days=1)) self.algorithm.SubscriptionManager.AddConsolidator(self.symbol, self.bar_consolidator) self.bar_consolidator.DataConsolidated += self.temporary_receive_bar self.bar_consolidator_minute = TradeBarConsolidator(timedelta(days=1)) self.algorithm.SubscriptionManager.AddConsolidator(self.symbol, self.bar_consolidator_minute) # self.bar_consolidator_minute.DataConsolidated += self.temporary_receive_bar_minute if history_period: history = self.algorithm.History[TradeBar](self.symbol, history_period, Resolution.Daily) for bar in history: self.bar_consolidator.Update(bar) self.set_range_today() # Calculate and store the range for each historical day else: history = self.algorithm.History[TradeBar](self.symbol, 100, Resolution.Daily) for bar in history: self.bar_consolidator.Update(bar) # Schedule the retrieval of VIX at 9:30 AM (market open) #self.algorithm.Schedule.On(self.algorithm.DateRules.EveryDay(), self.algorithm.TimeRules.At(9, 30), self.temporary_get_vix) self.algorithm.Schedule.On(self.algorithm.DateRules.EveryDay(), self.algorithm.TimeRules.At(9, 30), self.get_vix_open) if self.range_mode == "AUTOMATIC": if 1 in self.automatic_range_percentages: self.use_1 = True if 1.5 in self.automatic_range_percentages: self.use_1_5 = True if 2 in self.automatic_range_percentages: self.use_2 = True if 2.5 in self.automatic_range_percentages: self.use_2_5 = True if 3 in self.automatic_range_percentages: self.use_3 = True else: raise Exception(f"Selected RANGE MODE {self.range_mode} is incorrect, please check the input and either choose 'AUTOMATIC' or 'FIXED'") def temporary_get_vix(self): # Get the VIX open price at 9:30 AM vix_history = self.algorithm.History([self.vix], 1, Resolution.Minute) vix_open_price = vix_history.loc[self.vix].iloc[0]['open'] self.vix_open = vix_open_price self.algorithm.Debug(f"VIX Open Price at 9:30 AM: {self.vix}") def temporary_receive_bar(self, sender, bar): self.atr_21_indie.Update(bar) if self.atr_21_indie.IsReady: self.atr_21 = self.atr_21_indie.Current.Value / 100 def select_range_callable(self, atr_21, vix): self.atr_21 = atr_21 / 100 self.vix = vix def get_vix_open(self): # Get the VIX open price from the security at 9:30 AM vix_security = self.algorithm.Securities[self.vix] vix_open_price = vix_security.Open self.vix_open = vix_open_price self.algorithm.Debug(f"VIX Open Price at 9:30 AM: {self.vix}") def set_range_today(self): if self.range_mode == "AUTOMATIC": if self.use_1_5 and self.atr_21 <= self.atr_threshold_1 and self.vix_open <= self.vix_threshold_2: self.range_max_today = 0.015 + self.automatic_range_max_add self.range_min_today = 0.015 elif self.use_2 and self.atr_21 <= self.atr_threshold_1: self.range_max_today = 0.02 + self.automatic_range_max_add self.range_min_today = 0.02 elif self.use_2 and self.vix_open <= self.vix_threshold_2: self.range_max_today = 0.02 + self.automatic_range_max_add self.range_min_today = 0.02 elif self.use_2_5: self.range_max_today = 0.025 + self.automatic_range_max_add self.range_min_today = 0.025 if self.use_3 and self.vix_open >= self.vix_threshold_1: self.range_max_today = 0.03 + self.automatic_range_max_add self.range_min_today = 0.03 if self.use_1 and self.atr_21 < self.atr_threshold_2 and self.vix_open < self.vix_threshold_3: self.range_max_today = 0.01 + self.automatic_range_max_add self.range_min_today = 0.01 elif self.range_mode == "FIXED": pass else: raise Exception(f"Selected RANGE MODE {self.range_mode} is incorrect, please check the input and either choose 'AUTOMATIC' or 'FIXED'") self.algorithm.Plot("Range Used", "Range", self.range_min_today) return self.range_min_today
# region imports from AlgorithmImports import * from config import config as cfg # endregion # Your New Python File columns = [ 'date', 'timestamp', 'open', 'high', 'low', 'close', 'put_buy_strike', 'put_buy_strike', 'call_sell_strike', 'call_buy_strike', 'put_buy_symbol', 'put_sell_symbol', 'call_sell_symbol', 'call_buy_symbol' , 'put_buy_premium', 'put_sell_premium', 'call_sell_premium', 'call_buy_premium', 'premium', 'proximity_put', 'proximity_call', 'normalized_proximity_put', 'normalized_proximity_call', 'minutes_since_open', 'normalized_minutes', 'time_elapsed', 'risk_score_put', 'risk_score_call', 'max_risk_score', 'ma20_max_risk_score', 'premium_threshold', 'contracts_entered', 'recent_entry_time', 'average_premium', 'range_used', 'daily_range_used', 'score', 'cef', 'ret', 'tf', 'scaled_risk_score', 'rtf', 'risk_score_diff', 'itmf', 'premium_factor', 'npf', 'pf', 'total_premium', 'cumulative_premium', 'score_threshold', 'max_contracts', 'contracts_to_buy', 'new_contracts', 'temp_mult', 'time_now', 'trading_minutes_per_day', 'current_average_premium', 'avg_premium_10_days', 'average_positional_premium', 'exited_contracts', 'profit_exit_premium', 'realized_pnl', 'unrealized_pnl', 'itmf_score', 'pf_score', 'premium_factor_score', 'scaled_risk_score_weighted', 'risk_score_diff_score', 'npf_score', 'rtf_score', 'cef_score', 'def_score', 'ret_score', 'account_equity', 'cf', 'cnnf', 'cf_score', 'cnnf_score', 'cf_risk', 'cf_risk_score', 'cnnf_risk', 'cnnf_risk_score', 'itmf_risk', 'itmf_risk_score', 'risk_score', 'adjusted_profit_percentage', 'pnl_diff', 'drawdown', 'premium_max_rolling', 'premium_mean_rolling', 'max_risk_score_max_rolling', 'max_risk_score_mean_rolling', 'volatility_mean_rolling', 'true_range_mean_rolling', 'atr_percent_mean_rolling', 'volatility_spike_sum_rolling', 'rolling_max_exceeded_percentage', 'prediction', 'rank_1', 'rank_2', 'rank_3', 'rank_4', 'rank_5', 'rank_6', 'rank_7', 'rank_8', 'rank_9', 'rank_10', 'outcome', 'year', 'winning_trade', 'losing_trade', 'date_found'] def load_config(config_to_load=None): config = cfg # Convert dates config['start_date'] = pd.to_datetime(config['start_date']).strftime('%Y-%m-%d') config['end_date'] = pd.to_datetime(config['end_date']).strftime('%Y-%m-%d') # logging.info(config) # Convert integers config['contracts_to_buy_per_signal'] = int(float(config['contracts_to_buy_per_signal'])) config['max_contracts'] = int(config['max_contracts']) config['strike_difference'] = int(config['strike_difference']) config['rank_limit'] = int(config['rank_limit']) # Convert cut-out periods if they exist if 'cut_out_periods' in config: for period in config['cut_out_periods']: period['start_date'] = pd.to_datetime(period['start_date']).strftime('%Y-%m-%d') period['end_date'] = pd.to_datetime(period['end_date']).strftime('%Y-%m-%d') # Convert floats config['score_threshold'] = float(config['score_threshold']) config['weight_premium_comparison'] = float(config['weight_premium_comparison']) config['premium_comparison_lower_threshold'] = float(config['premium_comparison_lower_threshold']) config['premium_comparison_upper_threshold'] = float(config['premium_comparison_upper_threshold']) config['weight_cef'] = float(config['weight_cef']) config['weight_ret'] = float(config['weight_ret']) config['weight_pf'] = float(config['weight_pf']) config['weight_tf'] = float(config['weight_tf']) config['weight_scaled_risk_score'] = float(config['weight_scaled_risk_score']) config['midpoint'] = float(config['midpoint']) config['scale_factor'] = float(config['scale_factor']) config['weight_rtf'] = float(config['weight_rtf']) config['rtf_lower_threshold'] = float(config['rtf_lower_threshold']) config['rtf_upper_threshold'] = float(config['rtf_upper_threshold']) config['weight_risk_score_diff'] = float(config['weight_risk_score_diff']) config['rsd_lower_threshold'] = float(config['rsd_lower_threshold']) config['rsd_upper_threshold'] = float(config['rsd_upper_threshold']) config['weight_npf'] = float(config['weight_npf']) config['weight_itmf'] = float(config['weight_itmf']) config['weight_cf'] = float(config['weight_cf']) config['weight_cnnf'] = float(config['weight_cnnf']) config['pf_baseline'] = float(config['pf_baseline']) config['profit_percentage'] = float(config['profit_percentage']) config['cf_risk_weight'] = float(config['cf_risk_weight']) config['cnnf_risk_weight'] = float(config['cnnf_risk_weight']) config['itmf_risk_weight'] = float(config['itmf_risk_weight']) config['score_exit_threshold'] = float(config.get('score_exit_threshold', 0)) config['risk_score_threshold'] = float(config['risk_score_threshold']) # Convert booleans config['use_plot'] = bool(config['use_plot']) config['use_print'] = bool(config['use_print']) config['enable_premium_comparison'] = bool(config['enable_premium_comparison']) config['enable_cef'] = bool(config['enable_cef']) config['enable_ret'] = bool(config['enable_ret']) config['enable_pf'] = bool(config['enable_pf']) config['enable_tf'] = bool(config['enable_tf']) config['enable_scaled_risk_score'] = bool(config['enable_scaled_risk_score']) config['enable_rtf'] = bool(config['enable_rtf']) config['enable_risk_score_diff'] = bool(config['enable_risk_score_diff']) config['enable_npf'] = bool(config['enable_npf']) config['enable_itmf'] = bool(config['enable_itmf']) config['enable_cf'] = bool(config['enable_cf']) config['enable_cf_risk'] = bool(config['enable_cf_risk']) config['enable_cnnf_risk'] = bool(config['enable_cnnf_risk']) config['enable_itmf_risk'] = bool(config['enable_itmf_risk']) config['enable_cnnf'] = bool(config['enable_cnnf']) config['enable_score_exit'] = bool(config.get('enable_score_exit', False)) return config