Hey there,
I currently have some issues with backtesting. I try to backtest my strategy and I get no results from my backtest.
So either I am missing smth very obvious or there as in issue I cannot resolve by myself.
I used both approaches to test my strategy the classic algorithm structure and the algorithm framework and both show no results.
I have attached the code with some comments, I really appreciate your tips.
Classic Algortihm structure:
-----------------------------
import scipy as sp
from sklearn.linear_model import LinearRegression
import numpy as np
class UncoupledTachyonContainmentField(QCAlgorithm):
def Initialize(self):
self.SetStartDate(2017, 1, 1) # Set Start Date
self.SetEndDate(2017, 1, 31) # Set End Date
self.SetCash(100000) # Set Strategy Cash
self.__numberOfSymbols = 600
self.SetUniverseSelection(FineFundamentalUniverseSelectionModel(self.CoarseSelectionFunction, self.FineSelectionFunction))
#Settings
self.UniverseSettings.Resolution = Resolution.Minute
self.UniverseSettings.ExtendedMarketHours = False
self.SetSecurityInitializer(lambda x: x.SetDataNormalizationMode(DataNormalizationMode.Raw))
self.SetBrokerageModel(BrokerageName.InteractiveBrokersBrokerage, AccountType.Cash)
self.SetBenchmark('SPY')
# Define lists, dictionaries needed for storing values
self.slope_dict = {}
self.ranked_ra = {}
self.volume_profiles = {}
self.val = {}
self.vah = {}
self.pct_change = {}
self.top_stocks = {}
self.number_stocks = 3
self.purchased_stocks = []
self.Schedule.On(self.DateRules.EveryDay(), self.TimeRules.At(3, 10), self.DataGenerator)
def OnData(self, data):
'''OnData event is the primary entry point for your algorithm. Each new data point will be pumped in here.
Arguments:
data: Slice object keyed by symbol containing the stock data
'''
if self.Time.hour == 10 and self.Time.minute == 30:
for key, value in self.ranked_ra.items():
self.pct_change[key] = data[key].Price # some calculation with price at 10:30 AM; resulting value is stored in a dictionary as value with according key pair (I removed the calculation on purpose)
self.pct_change = dict(sorted(self.pct_change.items(), key=lambda x: x[1], reverse=True))
ranked_pct_change = {item:index for index, item in enumerate(self.pct_change,1)}
#Combine ranking of both ranked_ra & val_pct_change; Data is pulled from dictionaries stored in Initialize and they according was generated with DataGenerator-Function
for key, value in self.ranked_ra.items():
value_2 = self.ranked_pct_change[key]
self.top_stocks[key] = value + value_2
Rank stocks according to total rank
self.top_stocks = dict(sorted(self.top_stocks.items(), key=lambda x: x[1], reverse=False))
counter = 0
self.purchased_stocks = []
if counter < self.number_stocks:
for ticker in self.top_stocks.keys():
self.SetHoldings(ticker, int(0.5/self.number_stocks))
counter += 1
self.purchased_stocks.append(ticker)
if self.Time.hour == 15 and self.Time.minute == 00:
if self.Portfolio.Invested:
self.Liquidate()
self.Log(str(f"Number of stocks in Universe: {len(self.ranked_ra)}, Bought these tickers: {self.purchased_stocks}"))
def CoarseSelectionFunction(self, coarse):
# sort descending by daily dollar volume
sortedByDollarVolume = sorted(coarse, key=lambda x: x.DollarVolume, reverse = True)
# select only stocks with fundamental data
cleaned_selection = [x for x in sortedByDollarVolume if x.HasFundamentalData]
# return the symbol objects of the top entries from our cleaned collection
return [ x.Symbol for x in cleaned_selection[:self.__numberOfSymbols] ]
def FineSelectionFunction(self, fine):
def adder(x):
if x == None:
x = 0.01
else:
x
return x
filtered_stocks = [x for x in fine if x.AssetClassification.MorningstarIndustryCode != ['MorningstarIndustryCode.BanksRegionalLatinAmerica',
'MorningstarIndustryCode.BanksRegionalUS']
and adder(x.FinancialStatements.IncomeStatement.InterestIncome.ThreeMonths) !=0
and adder(x.FinancialStatements.IncomeStatement.EBIT.ThreeMonths) !=0
and adder(x.FinancialStatements.BalanceSheet.CurrentDebt.ThreeMonths) !=0
and x.CompanyReference.PrimaryExchangeID in ["NYS","NAS"]
and x.CompanyReference.CountryId == "USA"
]
# take all entries from the filtered collection
return [x.Symbol for x in filtered_stocks]
def DataGenerator(self):
# Clear all the data from the past before starting the calculation for this day again
self.slope_dict.clear()
self.ranked_ra.clear()
self.volume_profiles.clear()
self.val.clear()
self.vah.clear()
slices_ra = self.History(self.Securities.Keys, timedelta(days=90), Resolution.Daily)
for i in self.Securities.Keys:
price_data = slices_ra.loc[str(i)]["close"].iloc[-10:]
y = np.log(price_data)
x = np.arange(len(y))
slope, intercept, r_value, p_value, std_err = sp.stats.linregress(x,y)
self.slope_dict[str(i)] = {"Slope":slope, "R²":r_value**2}
self.slope_dict = dict(sorted(self.slope_dict.items(), key=lambda x: x[1]["R²"], reverse=True))
self.ranked_ra = {item[0]:index for index, item in enumerate(self.slope_dict,1) if index < len(self.slope_dict.keys())*0.1}
slices_vp = self.History(self.Securities.Keys, timedelta(days=14), Resolution.Second)
#Code to create volume profiles for all requested stocks
for i in self.ranked_ra.keys():
#create stock keys in volume profile
self.volume_profiles[str(i)] = {}
low = round(min(slices_vp.loc[str(i)]["close"]), 2)
high = round(max(slices_vp.loc[str(i)]["close"]), 2)
price_range = high - low
total_volume = sum(slices_vp.loc[str(i)]["volume"])
#create volume profile for every stock in keys
for row in slices_vp.loc[str(i)].itertuples():
if row.volume > 0:
key = round(row.close, 2)
volume_per_level = row.volume
if key not in self.volume_profiles[str(i)].keys():
self.volume_profiles[str(i)][str(key)] = volume_per_level
else:
self.volume_profiles[str(i)][str(key)] += volume_per_level
# Set target volume - 70% of daily volume
target_vol = sum(self.volume_profiles[str(i)].values()) * 0.7
# Get the price level that had the largest volume
max_vol_price = max(self.volume_profiles[str(i)], key=self.volume_profiles[str(i)].get)
# Setup a window to capture the POC, centered at the most liquid level
curr_max_price = float(max_vol_price)
curr_min_price = float(max_vol_price)
curr_vol = self.volume_profiles[str(i)][max_vol_price]
# Grow window bounds until we have 70% of the day's volume captured
while curr_vol < target_vol:
# Price one level up
price_up = str(round(curr_max_price + 0.01, 2))
price_up_vol = 0
# Price one level down
price_down = str(round(curr_min_price - 0.01, 2))
price_down_vol = 0
# Get associated volume at new potential levels
if price_up in self.volume_profiles[str(i)].keys():
price_up_vol = self.volume_profiles[str(i)][price_up]
if price_down in self.volume_profiles[str(i)].keys():
price_down_vol = self.volume_profiles[str(i)][price_down]
#Grow windows in the direction of more volume
try:
if price_up_vol > price_down_vol:
curr_max_price = round(float(price_up), 2)
curr_vol += self.volume_profiles[str(i)][price_up]
else:
curr_min_price = round(float(price_down), 2)
curr_vol += self.volume_profiles[str(i)][price_down]
except KeyError:
continue
# Save VAL, value area low & VAH, value area high for each stock
self.val[str(i)] = curr_min_price
self.vah[str(i)] = curr_max_price
---------------------------------------------------------
That is the code for the classic algortihm structure.
(I can share the code for the algorithm framework as well; I think that would be a little bit of an over-kill to do it in this post)
I used the research environment for the DataGenerator-Method and tested it there and it worked well, so I just had to replace "qb" with "self" to get it run for the backtest.
When I run the backtest I only get one warning and my logs are not tracked as well.
Benchmark(SPY): no existing security found, benchmark security will be added with Equity type.
I checked the documentation, I assume I have to add the SPY-data manually with AddEquity() to resolve this problem but I still do not get any results for my backtest.
There must be something I have overlooked.
I attached the result of my backtest as well.
To unlock posting to the community forums please complete at least 30% of Boot Camp.
You can continue your Boot Camp training progress from the terminal. We hope to see you in the community soon!