Overall Statistics |
Total Orders 169 Average Win 1.88% Average Loss -1.62% Compounding Annual Return 99.685% Drawdown 18.500% Expectancy 0.544 Start Equity 100000 End Equity 201458.90 Net Profit 101.459% Sharpe Ratio 2.843 Sortino Ratio 3.056 Probabilistic Sharpe Ratio 96.175% Loss Rate 29% Win Rate 71% Profit-Loss Ratio 1.16 Alpha 0.433 Beta 1.24 Annual Standard Deviation 0.209 Annual Variance 0.044 Information Ratio 2.638 Tracking Error 0.176 Treynor Ratio 0.479 Total Fees $919.80 Estimated Strategy Capacity $950000.00 Lowest Capacity Asset SVXY V0H08FY38ZFP Portfolio Turnover 15.26% |
#region imports from AlgorithmImports import * #endregion import os import re def extract_stock_symbols(project_dir): # Create a set to store unique stock symbols symbols_set = set() # Traverse through all subdirectories in the project directory for root, dirs, files in os.walk(project_dir): for file in files: if file.endswith(".py"): file_path = os.path.join(root, file) with open(file_path, "r") as f: content = f.read() # Extract stock symbols from the file content symbols_array = extract_symbols(content) # Add unique symbols to the set symbols_set.update(symbols_array) # Print the unique stock symbols to the terminal print("Unique Stock Symbols:") print("'" + "','".join(sorted(symbols_set)) + "'") def extract_symbols(text): pattern = r"'\b([A-Z]{2,5})\b'" matches = re.findall(pattern, text) return list(set(matches)) # Provide the path to your project directory project_directory = "Strategies" # Run the script extract_stock_symbols(project_directory)
#region imports from AlgorithmImports import * #endregion import os import re def replace_pattern(project_dir): pattern = r"CumReturn\(self\.algo,\s*'([A-Z]+)',\s*(\d+)\)\s*([<>]=?)\s*([-+]?\d+(?:\.\d+)?)" for root, dirs, files in os.walk(project_dir): for file in files: if file.endswith(".py"): file_path = os.path.join(root, file) with open(file_path, "r") as f: content = f.read() matches = list(re.finditer(pattern, content)) modified_content = content for match in reversed(matches): found = match.group() number = float(match.group(4)) if abs(number) >= 1: fixed = replace_match(match) modified_content = modified_content[:match.start()] + fixed + modified_content[match.end():] print(f"File: {file_path}") print(f"Found: {found}") print(f"Fixed: {fixed}") print("---") if modified_content != content: with open(file_path, "w") as f: f.write(modified_content) def replace_match(match): ticker = match.group(1) period = match.group(2) operator = match.group(3) number = float(match.group(4)) if abs(number) >= 1: number /= 100 return f"CumReturn(self.algo, '{ticker}', {period}) {operator} {number}" # Provide the path to your project directory project_directory = "Strategies" # Run the script replace_pattern(project_directory)
#region imports from AlgorithmImports import * #endregion # Your New Python File
#CombinedStrategy1/version1.py from AlgorithmImports import * from indicators import * from main import DogeStrat #https://app.composer.trade/symphony/FdIbE2K1ktRIfMQDMJYo/details class TQQQFTLTStrategy(DogeStrat): def __init__(self, algo): super().__init__() self.algo = algo def Execute(self): self.tqqq_ftlt() def sideways_market_deleverage(self): if GetCurrentPrice(self.algo, 'SPY') > SMA(self.algo, 'SPY', 20): AH(self.algo, 'SPY', 1, 0.25) else: if RSI(self.algo, 'TLT', 10) > RSI(self.algo, 'SQQQ', 10): AH(self.algo, 'QQQ', 1, 0.25) else: AH(self.algo, 'PSQ', 1, 0.25) if RSI(self.algo, 'TLT', 10) > RSI(self.algo, 'SQQQ', 10): AH(self.algo, 'QQQ', 1, 0.25) else: AH(self.algo, 'PSQ', 1, 0.25) def nasdaq_in_crash_territory_time_to_deleverage(self): if GetCurrentPrice(self.algo, 'QQQ') < SMA(self.algo, 'QQQ', 20): if CumReturn(self.algo, 'QQQ', 60) <= -0.12: self.sideways_market_deleverage() else: if RSI(self.algo, 'TLT', 10) > RSI(self.algo, 'SQQQ', 10): AH(self.algo, 'TQQQ', 1, 0.5) else: AH(self.algo, 'SQQQ', 1, 0.5) else: if RSI(self.algo, 'SQQQ', 10) < 31: AH(self.algo, 'PSQ', 1, 0.5) else: if CumReturn(self.algo, 'QQQ', 10) > 0.055: AH(self.algo, 'PSQ', 1, 0.5) else: Sort(self.algo,'RSI',('QQQ','SMH'),10,True,1,1,0.5) def bear_market_sideways_protection(self): if CumReturn(self.algo, 'QQQ', 252) < -0.2: self.nasdaq_in_crash_territory_time_to_deleverage() else: if GetCurrentPrice(self.algo, 'QQQ') < SMA(self.algo, 'QQQ', 20): if RSI(self.algo, 'TLT', 10) > RSI(self.algo, 'SQQQ', 10): AH(self.algo, 'TQQQ', 1, 0.5) else: AH(self.algo, 'SQQQ', 1, 0.5) else: if RSI(self.algo, 'SQQQ', 10) < 31: AH(self.algo, 'SQQQ', 1, 0.5) else: if CumReturn(self.algo, 'QQQ', 10) > 0.055: AH(self.algo, 'SQQQ', 1, 0.5) else: Sort(self.algo,'RSI',('TQQQ','SOXL'),10,True,1,1,0.5) if GetCurrentPrice(self.algo, 'QQQ') < SMA(self.algo, 'QQQ', 20): if CumReturn(self.algo, 'QQQ', 60) <= -0.12: self.sideways_market_deleverage() else: if RSI(self.algo, 'TLT', 10) > RSI(self.algo, 'SQQQ', 10): AH(self.algo, 'TQQQ', 1, 0.5) else: AH(self.algo, 'SQQQ', 1, 0.5) else: if RSI(self.algo, 'SQQQ', 10) < 31: AH(self.algo, 'SQQQ', 1, 0.5) else: if CumReturn(self.algo, 'QQQ', 70) < -0.15: Sort(self.algo,'RSI',('TQQQ','SOXL'),10,True,1,1,0.5) else: Sort(self.algo,'CumReturn',('SPY','QQQ','DIA','XLP'),15,True,2,1,0.5) def dip_buy_strategy(self): if RSI(self.algo, 'TQQQ', 10) < 31: AH(self.algo, 'TECL', 1, 1) else: if RSI(self.algo, 'SMH', 10) < 30: AH(self.algo, 'SOXL', 1, 1) else: if RSI(self.algo, 'SPY', 10) < 30: AH(self.algo, 'UPRO', 1, 1) else: self.bear_market_sideways_protection() def svxy_ftlt(self): if RSI(self.algo, 'QQQ', 10) > 80: AH(self.algo, 'VIXY', 1, 0.33) else: if RSI(self.algo, 'SPY', 10) > 80: AH(self.algo, 'VIXY', 1, 0.33) else: if RSI(self.algo, 'QQQ', 10) < 30: AH(self.algo, 'XLK', 1, 0.33) else: if GetCurrentPrice(self.algo, 'SVXY') > SMA(self.algo, 'SVXY', 24): Sort(self.algo,'SMADayRet',('SVXY','SPY'),20,True,1,1,0.33) else: AH(self.algo, 'BTAL', 1, 0.33) def spy_cooloff_period(self): AH(self.algo, 'SHV', 1, 1) def bull_market(self): if RSI(self.algo, 'QQQ', 10) > 80: AH(self.algo, 'UVXY', 1, 1) else: if RSI(self.algo, 'SPY', 10) > 80: AH(self.algo, 'UVXY', 1, 1) else: if RSI(self.algo, 'SPY', 60) > 60: self.spy_cooloff_period() else: Sort(self.algo,'SMADayRet',('TQQQ','TECL','UDOW','UPRO'),21,True,2,1,0.67) self.svxy_ftlt() def tqqq_ftlt(self): if GetCurrentPrice(self.algo, 'SPY') > SMA(self.algo, 'SPY', 200): self.bull_market() else: self.dip_buy_strategy()
#indicators.py from AlgorithmImports import * from AlgorithmImports import * import math import pandas as pd from cmath import sqrt from clr import AddReference AddReference("System") AddReference("QuantConnect.Algorithm") AddReference("QuantConnect.Common") from System import * from QuantConnect import * from QuantConnect.Algorithm import * from QuantConnect.Data.Custom import * from QuantConnect.Python import PythonData import csv import io import time import json import random import re def RSI(self, equity, period): extension = min(period * 5, 250) r_w = RollingWindow[float](extension) history = self.History(equity, extension - 1, Resolution.Daily) for historical_bar in history: r_w.Add(historical_bar.Close) while r_w.Count < extension: current_price = self.Securities[equity].Price r_w.Add(current_price) if r_w.IsReady: average_gain = 0 average_loss = 0 gain = 0 loss = 0 for i in range(extension - 1, extension - period - 1, -1): if i - 1 < 0: diff = random.randint(0, 99) else: diff = r_w[i - 1] - r_w[i] gain += max(diff, 0) loss += abs(min(diff, 0)) average_gain = gain / period average_loss = loss / period for i in range(extension - period - 1, 0, -1): if i - 1 < 0: diff = random.randint(0, 99) else: diff = r_w[i - 1] - r_w[i] average_gain = (average_gain * (period - 1) + max(diff, 0)) / period average_loss = (average_loss * (period - 1) + abs(min(diff, 0))) / period if average_loss == 0: return 100 else: rsi = 100 - (100 / (1 + average_gain / average_loss)) return rsi else: return None def CumReturn(self,equity,period): history = self.History(equity,period,Resolution.Daily) closing_prices = pd.Series([bar.Close for bar in history]) current_price = self.Securities[equity].Price closing_prices = pd.concat([closing_prices, pd.Series([current_price])]) first_price = closing_prices.iloc[0] if first_price == 0: return 0 else: return_val = (current_price/first_price) - 1 return return_val def STD(self, equity, period): r_w = RollingWindow[float](period + 1) r_w_return = RollingWindow[float](period) history = self.History(equity, period, Resolution.Daily) for historical_bar in history: r_w.Add(historical_bar.Close) while r_w.Count < period + 1: current_price = self.Securities[equity].Price r_w.Add(current_price) for i in range(period, 0, -1): if r_w[i] != 0: daily_return = (r_w[i-1] / r_w[i] - 1) else: daily_return = 0 # or any other appropriate value r_w_return.Add(daily_return) dfstd = pd.DataFrame({'r_w_return': r_w_return}) if r_w.IsReady: std = dfstd['r_w_return'].std() if std == 0: return 0 else: return std else: return 0 def MaxDD(self, equity, period): history = self.History(equity,period - 1,Resolution.Daily) closing_prices = pd.Series([bar.Close for bar in history]) current_price = self.Securities[equity].Price closing_prices = pd.concat([closing_prices, pd.Series([current_price])]) rolling_max = closing_prices.cummax() drawdowns = (rolling_max - closing_prices)/rolling_max max_dd = drawdowns.min() return max_dd def SMA(self,equity,period): r_w = RollingWindow[float](period) history = self.History(equity,period - 1,Resolution.Daily) for historical_bar in history: r_w.Add(historical_bar.Close) while r_w.Count < period: current_price = self.Securities[equity].Price r_w.Add(current_price) if r_w.IsReady: sma = sum(r_w)/period return sma else: return 0 def IV(self,equity,period): r_w = RollingWindow[float](period + 1) r_w_return = RollingWindow[float](period) history = self.History(equity,period,Resolution.Daily) for historical_bar in history: r_w.Add(historical_bar.Close) while r_w.Count < period + 1: current_price = self.Securities[equity].Price r_w.Add(current_price) for i in range (period,0,-1): if r_w[i] == 0: return 0 else: daily_return = (r_w[i-1]/r_w[i] - 1) r_w_return.Add(daily_return) dfinverse = pd.DataFrame({'r_w_return':r_w_return}) if r_w.IsReady: std = dfinverse['r_w_return'].std() if std == 0: return 0 else: inv_vol = 1/std return inv_vol else: return 0 def SMADayRet(self, equity, period): r_w = RollingWindow[float](period + 1) r_w_return = RollingWindow[float](period) history = self.History(equity, period, Resolution.Daily) for historical_bar in history: r_w.Add(historical_bar.Close) while r_w.Count < period + 1: current_price = self.Securities[equity].Price r_w.Add(current_price) for i in range(period, 0, -1): if r_w[i] == 0: return 0 # Return 0 instead of None daily_return = (r_w[i-1] / r_w[i] - 1) r_w_return.Add(daily_return) if r_w.IsReady: smareturn = sum(r_w_return) / period return smareturn else: return 0 def EMA(self,equity,period): extension = period + 50 r_w = RollingWindow[float](extension) history = self.History(equity,extension - 1,Resolution.Daily) for historical_bar in history: r_w.Add(historical_bar.Close) while r_w.Count < extension: current_price = self.Securities[equity].Price r_w.Add(current_price) if r_w.IsReady: total_price = 0 for i in range(extension - 1,extension - period - 2,-1): total_price += r_w[i] average_price = total_price/period for i in range(extension - period - 2,-1,-1): average_price = r_w[i]*2/(period+1) + average_price*(1-2/(period+1)) return average_price else: return None def Sort(self, sort_type, equities, period, reverse, num_assets, number, multiplier): # Update the PT value for the given strategy number PT = getattr(self, f"PT{number}") * multiplier # Define a dictionary to map sort types to functions indicator_functions = { 'EMA': EMA, 'RSI': RSI, 'CumReturn': CumReturn, 'STD': STD, 'MaxDD': MaxDD, 'SMA': SMA, 'IV': IV, 'SMADayRet': SMADayRet, # Add other indicators here as needed } # Compute the indicator value for each equity returns = {} for equity in equities: indicator_function = indicator_functions.get(sort_type, None) if callable(indicator_function): indicator_value = indicator_function(self, equity, period) if indicator_value is not None: returns[equity] = indicator_value # Sort the equities based on the indicator values sorted_equities = sorted(returns.items(), key=lambda x: x[1], reverse=reverse) # Select the top 'num_assets' from the sorted list top_equities = sorted_equities[:num_assets] # Get the current HT and HTS attributes for the strategy HT = getattr(self, f"HT{number}") HTS = getattr(self, f"HTS{number}") # Assign each of the top equities to the next available slot in HT and HTS for equity, value in top_equities: for i in HT.keys(): if HT[i] == 0: HT[i] = PT / num_assets # Distribute PT evenly among the assets HTS[i].append(equity) # Append the asset symbol break # Move to the next asset after appending # Update the HT and HTS attributes for the strategy setattr(self, f"HT{number}", HT) setattr(self, f"HTS{number}", HTS) def AH(self,equities,PTnumber,multiplier): #AppendHolding if not isinstance(equities,list): equities = [equities] HT = getattr(self,f"HT{PTnumber}") HTS = getattr(self,f"HTS{PTnumber}") PT = getattr(self,f"PT{PTnumber}") * multiplier for equity in equities: for i in HT.keys(): if HT[i] == 0: HT[i] = PT HTS[i].append(equity) break def GetCurrentPrice(self, symbol): """ Gets the current price of a security. :param self: The self instance containing the securities. :param symbol: The symbol of the security. :return: The current price of the security or None if not available. """ if symbol in self.Securities: return self.Securities[symbol].Price else: self.Debug(f"Symbol {symbol} not found in securities.") return None def AHIV(self, assets, period, PTnumber, multiplier): if isinstance(assets, str): assets = [assets] # Calculate sum_IV_assets tickers_IV = [] for ticker in assets: try: IV_value = eval(f"IV(self, '{ticker}', {period})") tickers_IV.append(IV_value) except: self.Log(f"Error calculating IV for ticker: {ticker}") continue sum_IV_assets_value = sum(tickers_IV) HT = getattr(self, f"HT{PTnumber}") HTS = getattr(self, f"HTS{PTnumber}") PT = getattr(self, f"PT{PTnumber}") * multiplier # Call the AH function for each asset with the calculated portion for ticker in assets: try: IV_value = eval(f"IV(self, '{ticker}', {period})") portion = IV_value / sum_IV_assets_value except: self.Log(f"Error calculating portion for ticker: {ticker}") continue for i in HT.keys(): if HT[i] == 0: HT[i] = portion * PT HTS[i].append(ticker) break def GroupSort(self, filter_type, group_methods, window, select_type, num_assets, number, multiplier): try: # Define a dictionary to map filter types to their corresponding functions indicator_functions = { 'EMA': EMA, 'RSI': RSI, 'CumReturn': CumReturn, 'STD': STD, 'MaxDD': MaxDD, 'SMA': SMA, 'IV': IV, 'SMADayRet': SMADayRet, } # Ensure the filter type is supported if filter_type not in indicator_functions: raise ValueError(f"Unsupported filter type: {filter_type}") # Calculate the weighted indicator values for each group group_indicator_values = {} for group_method in group_methods: if group_method is None: continue # Skip None values in group_methods # Execute the group method to get the equities and weights group_method_func = getattr(self, group_method) result = group_method_func() if result is not None and isinstance(result, tuple) and len(result) == 2: equities, weights = result # Calculate the weighted indicator value for the group weighted_indicator_value = 0 for equity, weight in zip(equities, weights): indicator_value = indicator_functions[filter_type](self, equity, window) if indicator_value is not None: weighted_indicator_value += weight * indicator_value group_indicator_values[group_method] = weighted_indicator_value # Determine sorting direction reverse = True if select_type == 'Top' else False # Sort groups based on the weighted indicator values sorted_groups = sorted(group_indicator_values.items(), key=lambda x: x[1], reverse=reverse)[:num_assets] # Get the current HT and HTS attributes for the strategy HT = getattr(self, f"HT{number}") HTS = getattr(self, f"HTS{number}") # Adjust the multiplier based on the number of assets adjusted_multiplier = multiplier / num_assets # Update HT and HTS based on the sorted groups total_weight = sum(weight for _, weight in sorted_groups) allocated_tickers = set() # Track allocated tickers total_allocation = 0 # Track total allocation for group_method, _ in sorted_groups: group_method_func = getattr(self, group_method) equities, weights = group_method_func() for equity, weight in zip(equities, weights): if equity not in allocated_tickers: # Check if the ticker is already allocated for i in HT.keys(): if HT[i] == 0: allocation = (weight / total_weight) * adjusted_multiplier total_allocation += allocation if total_allocation <= 1.0: # Check if total allocation exceeds 100% HT[i] = allocation HTS[i].append(equity) allocated_tickers.add(equity) # Mark the ticker as allocated break # Scale down the allocations if the total allocation exceeds 100% if total_allocation > 1.0: scale_factor = 1.0 / total_allocation for i in HT.keys(): HT[i] *= scale_factor # Update the HT and HTS attributes for the strategy setattr(self, f"HT{number}", HT) setattr(self, f"HTS{number}", HTS) except: # If an error occurs, capture the group methods using regex and execute them with equal allocation group_method_pattern = re.compile(r'self\.(\w+)\(\)') group_methods_str = str(group_methods) captured_group_methods = group_method_pattern.findall(group_methods_str) num_groups = len(captured_group_methods) equal_allocation = 1.0 / num_groups for group_method in captured_group_methods: group_method_func = getattr(self, group_method) if callable(group_method_func): equities, weights = group_method_func() total_weight = sum(weights) for equity, weight in zip(equities, weights): for i in HT.keys(): if HT[i] == 0: allocation = (weight / total_weight) * equal_allocation HT[i] = allocation HTS[i].append(equity) break # Update the HT and HTS attributes for the strategy setattr(self, f"HT{number}", HT) setattr(self, f"HTS{number}", HTS)
# main.py from AlgorithmImports import * import math import pandas as pd from cmath import sqrt from clr import AddReference AddReference("System") AddReference("QuantConnect.Algorithm") AddReference("QuantConnect.Common") from System import * from QuantConnect import * from QuantConnect.Algorithm import * from QuantConnect.Data.Custom import * from QuantConnect.Python import PythonData import csv import io import time import json import os import importlib import inspect import re import numpy as np import types from indicators import * class DogeStrat(QCAlgorithm): def Initialize(self): self.cash = 100000 self.SetStartDate(2023,6,25) self.SetEndDate(2024,6,28) self.SetCash(self.cash) self.equities = ['AAPL','ADBE','AGG','AMZN','BA','BIL','BSV','BTAL','CSX','DBC','DIA','DOG','DPST','DRN','DRV','EEM','ERX','ERY','FAS','FAZ','GE','GLD','GOOG','IBM','IEF','INTC','IWM','JPM','LLY','META','MSFT','NAIL','NFLX','NVDA','PSQ','QID','QQQ','RSI','RWM','SH','SHV','SHY','SMH','SOXL','SOXS','SOXX','SPHB','SPLV','SPTL','SPXL','SPXS','SPXU','SPY','SQQQ','SRTY','STX','SVXY','TBF','TECL','TECS','TLH','TLT','TMF','TMV','TNA','TQQQ','TSLA','UCO','UDOW','UPRO','UPS','URTY','USRT','UUP','UVXY','VIXM','VIXY','WDC','WFC','XLE','XLK','XLP','XLRE','YANG','YINN'] for equity in self.equities: self.AddEquity(equity,Resolution.Minute) self.Securities[equity].SetDataNormalizationMode(DataNormalizationMode.Adjusted) self.AddEquity('BIL',Resolution.Minute) self.Securities['BIL'].SetDataNormalizationMode(DataNormalizationMode.TotalReturn) self.exclude_symbols = ['BIL','BOXX','BSV','BUCK','CSHI','ICSH','IEI','MINT','SHV','SHY','SOF','TFLO','USFR','UTWO','STIP','VCIT','LQD','VTIP','TLT','BND','IEF','TIP','VGIT','IYK'] # Base directory for strategies strategy_base_dir = "Strategies" # List of strategy names (you would populate this list dynamically) self.strategy_names = [ "CombinedStrategy1", ] # Get the absolute path of the current file (main.py) current_file_path = os.path.abspath(__file__) # Get the directory of the current file current_directory = os.path.dirname(current_file_path) # Construct the path to the Strategies folder strategies_directory = os.path.join(current_directory, "Strategies") # Dynamically import strategies and create instances self.strategies = {} # Generate setattr(self, f'HT...') from 1 to 1000 for i in range(1, 500): setattr(self, f'HT{i}', {str(j).zfill(2): 0 for j in range(1, 10)}) setattr(self, f'HTS{i}', {str(j).zfill(2): [] for j in range(1, 10)}) for strategy_name in self.strategy_names: strategy_module_name = f"Strategies.{strategy_name}.version1" strategy_module = importlib.import_module(strategy_module_name) # Get the source code of the strategy module strategy_module_source = inspect.getsource(strategy_module) # Find all occurrences of "class ...Strategy" in the source code strategy_class_names = re.findall(r'class\s+(\w+Strategy)', strategy_module_source) for strategy_class_name in strategy_class_names: # Get the strategy class from the module strategy_class = getattr(strategy_module, strategy_class_name) # Extract the assigned number from the strategy class name if available assigned_numbers = re.findall(r'\d+', strategy_class_name) if assigned_numbers: assigned_number = int(assigned_numbers[0]) else: assigned_number = None # Create an instance of the strategy class, passing 'self' as the 'context' argument strategy_instance = strategy_class(self) # Store the strategy instance in the dictionary self.strategies[strategy_class_name] = strategy_instance self.PTMaster = 1 self.filter_value = 0.02 self.buffer_pct = 0.05 #Rebalance self.number_of_PT = len(self.strategies) + 10 # Fixed strategies and their percentages self.fixed_strategies = { 1: 0.94, # Strategy 1 with 94% } # Initialize PT variables for each strategy for i in range(1, self.number_of_PT + 1): if i in self.fixed_strategies: setattr(self, f'PT{i}', self.fixed_strategies[i] * self.PTMaster) # Assign self.algo to self self.algo = self # Dictionary to store group definitions self.group_definitions = {} # Iterate over the strategies for strategy_name, strategy in self.strategies.items(): # Get the source code of the strategy module strategy_module_source = inspect.getsource(strategy.__class__) # Find all occurrences of "GroupSort(" and "Sort(" in the source code group_sort_calls = re.findall(r"GroupSort\(.*?\)", strategy_module_source) # Extract the group names from the GroupSort and Sort calls and add them to the dictionary for call in group_sort_calls: group_names = re.findall(r"'(.*?)'", call) for group_name in group_names: if group_name not in self.group_definitions: # Add the group name to the dictionary with a default definition self.group_definitions[group_name] = f""" def {group_name}(self): # Define the logic for the {group_name} method # Return a tuple of equities and weights equities = [] weights = [] return equities, weights """.lstrip() # Remove leading whitespace # Dynamically generate the group methods in the DogeStrat instance for group_name, group_def in self.group_definitions.items(): exec(group_def, globals(), locals()) setattr(self, group_name, types.MethodType(locals()[group_name], self)) self.Schedule.On(self.DateRules.EveryDay("SPY"), self.TimeRules.BeforeMarketClose("SPY",5), self.FunctionBeforeMarketClose) def OnData(self, data): # This function is called every time new data is received pass def FunctionBeforeMarketClose(self): # Execute all strategies stored in the dictionary for strategy_name, strategy in self.strategies.items(): strategy.Execute() self.LogStrategies() self.ExecuteTrade() self.SetVarToZero() def ExecuteTrade(self): df_list = [] # Process each top-performing strategy for strategy_number in self.FinalTickers: HTS_attr = getattr(self, f'HTS{strategy_number}') HT_attr = getattr(self, f'HT{strategy_number}') group = { 'HTS': [HTS_attr[i][0] if len(HTS_attr[i]) == 1 else HTS_attr[i] for i in HTS_attr], 'HT': [HT_attr[i]/1 for i in HT_attr] } df = pd.DataFrame(group) df_list.append(df) # Combine all dataframes df_combined = pd.concat(df_list) df_combined['HTS'] = df_combined['HTS'].astype(str) result = df_combined.groupby(['HTS']).sum().reset_index() # Dictionary with pairs pairs_dict = {'SOXL':'SOXS','TQQQ':'SQQQ','SPXL':'SPXS','WEBL':'WEBS','TECL':'TECS','UPRO':'SPXU','QQQ':'PSQ','SPY':'SH','TMV':'TMF','HIBL':'HIBS','BITO':'BITI','TSLA':'TSLS','AAPL':'AAPD','ERX':'ERY','BOIL':'KOLD','LABU':'LABD','JNUG':'JDST','ARKK':'SARK','IBIT':'BITI','TNA':'TZA','UCO':'SCO','FAS':'FAZ','NRGU':'NRGD','DDM':'SDOW','GUSH':'DRIP','TYD':'TYO','UGL':'GLL','DRN':'DRV'} pairs_dict.update({v: k for k,v in pairs_dict.items()}) #ensure both directions are covered # Track selling and buying processed_pairs_selling = set() processed_pairs_buying = set() liquidated_equities = set() # Exclude symbols # dictionary symbol_dict = dict(zip(result.iloc[:,0],result.iloc[:,1])) # Log output output = "*****" for symbol, percentage in symbol_dict.items(): output += "{}: {}% - ".format(symbol, round(percentage*100, 2)) output = output.rstrip(" - ") self.Log(output) # Symbols to be transformed transform_symbols = ['PSQ','SH','USDU','SPXU','UPRO','QLD','QID','TSLS','ARKK','FNGU','IBIT','SDS','NUGT','DUST','SPYU','NVDU','SSO'] transform_mapping = {'PSQ':'SQQQ','SH':'SPXS','USDU':'UUP','SPXU':'SPXS','UPRO':'SPXL','QLD':'TQQQ','QID':'SQQQ','TSLS':'TSLQ','ARKK':'TARK','FNGU':'SOXL','GBTC':'BITO','IBIT':'BITO','SDS':'SPXS','NUGT':'JNUG','DUST':'JDST','SPYU':'SPXL','NVDU':'NVDL','SSO':'SPXL'} transform_ratios = {'PSQ':3,'SH':3,'USDU':1,'SPXU':1,'UPRO':1,'QLD':1.5,'QID':1.5,'TSLS':1,'ARKK':2,'FNGU':1,'GBTC':1,'IBIT':1,'SDS':1.5,'NUGT':1,'DUST':1,'SPYU':0.75,'NVDU':1,'SSO':1.5} # Transform symbols for symbol in transform_symbols: if symbol in symbol_dict: new_symbol = transform_mapping[symbol] ratio = transform_ratios[symbol] new_percentage = symbol_dict[symbol]/ratio # Adjust percentage allocation if new_symbol in symbol_dict: new_percentage += symbol_dict[new_symbol] symbol_dict[new_symbol] = new_percentage # Remove transformed symbol_dict.pop(symbol, None) # Ensure updated equities list updated_equities = set(symbol_dict.keys()) # Liquidate equities for equity in self.equities: if equity not in updated_equities and self.Portfolio[equity].HoldStock and equity not in liquidated_equities: self.Liquidate(equity) liquidated_equities.add(equity) # Iterate pairs selling for symbol1,symbol2 in pairs_dict.items(): if symbol1 in symbol_dict and symbol2 in symbol_dict: offset_value = abs(symbol_dict[symbol1] - symbol_dict[symbol2]) if symbol_dict[symbol1] >= symbol_dict[symbol2] and self.Portfolio[symbol2].HoldStock: self.Liquidate(symbol2) elif symbol_dict[symbol1] <= symbol_dict[symbol2] and self.Portfolio[symbol1].HoldStock: self.Liquidate(symbol1) # Mark processed selling processed_pairs_selling.add(symbol1) processed_pairs_selling.add(symbol2) # Iterate remaining selling for symbol,value in symbol_dict.items(): if symbol not in processed_pairs_selling and not value == 0 and symbol not in self.exclude_symbols: if isinstance(symbol, str) and symbol.startswith("['") and symbol.endswith("']"): symbol_list = eval(symbol) percentage_equity = sum(self.Portfolio[s].HoldingsValue for s in symbol_list) / self.Portfolio.TotalPortfolioValue else: percentage_equity = self.Portfolio[symbol].HoldingsValue/self.Portfolio.TotalPortfolioValue if value < percentage_equity and abs(value/percentage_equity - 1) > self.buffer_pct: if isinstance(symbol, str) and symbol.startswith("['") and symbol.endswith("']"): symbol_list = eval(symbol) for s in symbol_list: self.SetHoldings(s, value / len(symbol_list)) else: self.SetHoldings(symbol,value) # Iterate pairs buying for symbol1,symbol2 in pairs_dict.items(): if symbol1 in symbol_dict and symbol2 in symbol_dict and symbol1 not in processed_pairs_buying and symbol2 not in processed_pairs_buying: offset_value = abs(symbol_dict[symbol1] - symbol_dict[symbol2]) if offset_value > self.filter_value: if symbol_dict[symbol1] > symbol_dict[symbol2]: if isinstance(symbol1, list): for s in symbol1: self.SetHoldings(s, offset_value / len(symbol1)) else: self.SetHoldings(symbol1,offset_value) else: if isinstance(symbol2, list): for s in symbol2: self.SetHoldings(s, offset_value / len(symbol2)) else: self.SetHoldings(symbol2,offset_value) else: if isinstance(symbol1, list): for s in symbol1: if self.Portfolio[s].HoldStock: self.Liquidate(s) else: if self.Portfolio[symbol1].HoldStock: self.Liquidate(symbol1) if isinstance(symbol2, list): for s in symbol2: if self.Portfolio[s].HoldStock: self.Liquidate(s) else: if self.Portfolio[symbol2].HoldStock: self.Liquidate(symbol2) # Mark as processed buying processed_pairs_buying.add(symbol1) processed_pairs_buying.add(symbol2) # Filter less than 1% updated_equities = {symbol for symbol, value in symbol_dict.items() if value >= self.filter_value} # Iterate remaining symbol_dict for buying for symbol,value in symbol_dict.items(): if (symbol in updated_equities and symbol not in processed_pairs_buying and symbol not in self.exclude_symbols): if isinstance(symbol, str) and symbol.startswith("['") and symbol.endswith("']"): symbol_list = eval(symbol) percentage_equity = sum(self.Portfolio[s].HoldingsValue for s in symbol_list) / self.Portfolio.TotalPortfolioValue else: percentage_equity = (self.Portfolio[symbol].HoldingsValue / self.Portfolio.TotalPortfolioValue) if value > percentage_equity and abs(percentage_equity/value - 1) > self.buffer_pct: if isinstance(symbol, str) and symbol.startswith("['") and symbol.endswith("']"): symbol_list = eval(symbol) for s in symbol_list: self.SetHoldings(s, value / len(symbol_list)) else: self.SetHoldings(symbol,value) def SetVarToZero(self): for strategy_number in range(1, self.number_of_PT + 1): setattr(self, f'HT{strategy_number}', {str(j).zfill(2): 0 for j in range(1, 10)}) setattr(self, f'HTS{strategy_number}', {str(j).zfill(2): [] for j in range(1, 10)}) def LogStrategies(self): self.FinalTickers = list(self.fixed_strategies.keys()) # Create a set of current strategies current_strategies = set(self.FinalTickers) self.Log("Strategies, Percentages and Holdings") for index, strategy_number in enumerate(self.FinalTickers, start=1): if strategy_number in self.fixed_strategies: percentage = self.fixed_strategies[strategy_number] * 100 strategy_name, holdings_info = self.get_strategy_name_and_holdings(strategy_number) self.Log(f"Strategy {index}: {strategy_name}, Percentage: {round(percentage, 2)}%, Holdings: {holdings_info}") self.Log(self.FinalTickers) self.Log("-" * 50) def get_strategy_name_and_holdings(self, strategy_number): for strategy_name, strategy_instance in self.strategies.items(): strategy_module_source = inspect.getsource(strategy_instance.__class__) strategy_numbers = re.findall(r'AH\(self(?:\.algo)?,\s*[\'\w\(\)\s,]+,\s*(\d+)\s*,', strategy_module_source) strategy_numbers.extend(re.findall(r'Sort\(self(?:\.algo)?,\s*[\'\w]+,\s*[\'\w\(\)\s,]+,\s*\d+\s*,\s*\w+\s*,\s*\d+\s*,\s*(\d+)', strategy_module_source)) strategy_numbers.extend(re.findall(r'AHIV\(self(?:\.algo)?,\s*[\'\w\(\)\s,]+,\s*\d+\s*,\s*(\d+)\s*,\s*[\d\.]+\)', strategy_module_source)) strategy_numbers.extend(re.findall(r'GroupSort\(self(?:\.algo)?,\s*[\'\w]+,\s*[\'\w\(\)\s,]+,\s*\d+\s*,\s*\w+\s*,\s*\d+\s*,\s*(\d+)', strategy_module_source)) #self.Log(f"Strategy: {strategy_name}, Numbers: {strategy_numbers}") # Debugging log if str(strategy_number) in strategy_numbers: HT_attr = getattr(self, f'HT{strategy_number}') HTS_attr = getattr(self, f'HTS{strategy_number}') holdings_info = ', '.join([f"{ticker}: {round(HT_attr[key] * 100, 2)}%" for key, tickers in HTS_attr.items() if tickers for ticker in (tickers if isinstance(tickers, list) else [tickers])]) return type(strategy_instance).__name__, holdings_info return "", ""