Overall Statistics |
Total Trades 24 Average Win 0.15% Average Loss -0.15% Compounding Annual Return 0.723% Drawdown 1.100% Expectancy 0.008 Net Profit 0.012% Sharpe Ratio 0.116 Probabilistic Sharpe Ratio 45.587% Loss Rate 50% Win Rate 50% Profit-Loss Ratio 1.02 Alpha 0.497 Beta -0.382 Annual Standard Deviation 0.067 Annual Variance 0.005 Information Ratio -12.432 Tracking Error 0.102 Treynor Ratio -0.02 Total Fees $24.00 |
import scipy as sp from sklearn.linear_model import LinearRegression import numpy as np class UncoupledTachyonContainmentField(QCAlgorithm): def Initialize(self): self.SetStartDate(2017, 1, 1) # Set Start Date self.SetEndDate(2017, 1, 7) # Set End Date self.SetCash(100000) # Set Strategy Cash self.__numberOfSymbols = 3 #600 self.SetUniverseSelection(FineFundamentalUniverseSelectionModel(self.CoarseSelectionFunction, self.FineSelectionFunction)) #Settings self.UniverseSettings.Resolution = Resolution.Minute self.UniverseSettings.ExtendedMarketHours = False self.SetSecurityInitializer(lambda x: x.SetDataNormalizationMode(DataNormalizationMode.Raw)) self.SetBrokerageModel(BrokerageName.InteractiveBrokersBrokerage, AccountType.Cash) #self.SetBenchmark('SPY') # Define lists, dictionaries needed for storing values self.slope_dict = {} self.ranked_ra = {} self.volume_profiles = {} self.val = {} self.vah = {} self.pct_change = {} self.top_stocks = {} self.number_stocks = 3 self.purchased_stocks = [] self.Schedule.On(self.DateRules.EveryDay(), self.TimeRules.At(3, 10), self.DataGenerator) def OnData(self, data): '''OnData event is the primary entry point for your algorithm. Each new data point will be pumped in here. Arguments: data: Slice object keyed by symbol containing the stock data ''' if self.Time.hour == 10 and self.Time.minute == 30: for key, value in self.ranked_ra.items(): if data.ContainsKey(key) and data[key] is not None: self.pct_change[key] = data[key].Price # some calculation with price at 10:30 AM; resulting value is stored in a dictionary as value with according key pair (I removed the calculation on purpose) self.pct_change = dict(sorted(self.pct_change.items(), key=lambda x: x[1], reverse=True)) ranked_pct_change = {item:index for index, item in enumerate(self.pct_change,1)} #Combine ranking of both ranked_ra & val_pct_change; Data is pulled from dictionaries stored in Initialize and they according was generated with DataGenerator-Function for key, value in self.ranked_ra.items(): if key not in ranked_pct_change: continue value_2 = ranked_pct_change[key] self.top_stocks[key] = value + value_2 #Rank stocks according to total rank self.top_stocks = dict(sorted(self.top_stocks.items(), key=lambda x: x[1], reverse=False)) counter = 0 self.purchased_stocks = [] if counter < self.number_stocks: for ticker in self.top_stocks.keys(): self.SetHoldings(ticker, 0.5/self.number_stocks) counter += 1 self.purchased_stocks.append(ticker) if self.Time.hour == 15 and self.Time.minute == 00: if self.Portfolio.Invested: self.Liquidate() #self.Log(str(f"Number of stocks in Universe: {len(self.ranked_ra)}, Bought these tickers: {self.purchased_stocks}")) def CoarseSelectionFunction(self, coarse): # sort descending by daily dollar volume sortedByDollarVolume = sorted(coarse, key=lambda x: x.DollarVolume, reverse = True) # select only stocks with fundamental data cleaned_selection = [x for x in sortedByDollarVolume if x.HasFundamentalData] # return the symbol objects of the top entries from our cleaned collection return [ x.Symbol for x in cleaned_selection[:self.__numberOfSymbols] ] def FineSelectionFunction(self, fine): def adder(x): if x == None: x = 0.01 else: x return x filtered_stocks = [x for x in fine if x.AssetClassification.MorningstarIndustryGroupCode != MorningstarIndustryGroupCode.Banks and adder(x.FinancialStatements.IncomeStatement.InterestIncome.ThreeMonths) !=0 and adder(x.FinancialStatements.IncomeStatement.EBIT.ThreeMonths) !=0 and adder(x.FinancialStatements.BalanceSheet.CurrentDebt.ThreeMonths) !=0 and x.CompanyReference.PrimaryExchangeID in ["NYS","NAS"] and x.CompanyReference.CountryId == "USA" ] # take all entries from the filtered collection return [x.Symbol for x in filtered_stocks] def DataGenerator(self): # Clear all the data from the past before starting the calculation for this day again self.slope_dict.clear() self.ranked_ra.clear() self.volume_profiles.clear() self.val.clear() self.vah.clear() slices_ra = self.History(self.Securities.Keys, 10, Resolution.Daily) for i in self.Securities.Keys: price_data = slices_ra.loc[str(i)]["close"].iloc[-10:] y = np.log(price_data) x = np.arange(len(y)) slope, intercept, r_value, p_value, std_err = sp.stats.linregress(x,y) self.slope_dict[str(i)] = {"Slope":slope, "R²":r_value**2} self.slope_dict = dict(sorted(self.slope_dict.items(), key=lambda x: x[1]["R²"], reverse=True)) self.ranked_ra = {} for index, (key, item) in enumerate(self.slope_dict.items(),1): #if index >= len(self.slope_dict.keys())*0.1: # break self.ranked_ra[key] = index #self.ranked_ra = {key:index for index, (key, item) in enumerate(self.slope_dict.items(),1) if index < len(self.slope_dict.keys())*0.1} slices_vp = self.History(list(self.ranked_ra.keys()), 15, Resolution.Minute) #Code to create volume profiles for all requested stocks for i in self.ranked_ra.keys(): #create stock keys in volume profile self.volume_profiles[str(i)] = {} low = round(min(slices_vp.loc[str(i)]["close"]), 2) high = round(max(slices_vp.loc[str(i)]["close"]), 2) price_range = high - low total_volume = sum(slices_vp.loc[str(i)]["volume"]) #create volume profile for every stock in keys for row in slices_vp.loc[str(i)].itertuples(): if row.volume > 0: key = round(row.close, 2) volume_per_level = row.volume if key not in self.volume_profiles[str(i)].keys(): self.volume_profiles[str(i)][str(key)] = volume_per_level else: self.volume_profiles[str(i)][str(key)] += volume_per_level # Set target volume - 70% of daily volume target_vol = sum(self.volume_profiles[str(i)].values()) * 0.7 # Get the price level that had the largest volume max_vol_price = max(self.volume_profiles[str(i)], key=self.volume_profiles[str(i)].get) # Setup a window to capture the POC, centered at the most liquid level curr_max_price = float(max_vol_price) curr_min_price = float(max_vol_price) curr_vol = self.volume_profiles[str(i)][max_vol_price] price_levels = sorted([float(price_level) for price_level, vol in self.volume_profiles[str(i)].items() if vol > 0]) # Grow window bounds until we have 70% of the day's volume captured while curr_vol < target_vol: # Price one level up price_up = None price_up_vol = 0 up_prices = [price for price in price_levels if price > curr_max_price] if len(up_prices) > 0: price_up = up_prices[0] price_up_vol = self.volume_profiles[str(i)][str(price_up)] # Price one level down price_down = None price_down_vol = 0 down_prices = [price for price in price_levels if price < curr_min_price] if len(down_prices) > 0: price_down = down_prices[-1] price_down_vol = self.volume_profiles[str(i)][str(price_down)] #Grow windows in the direction of more volume if price_up is not None and (price_up_vol > price_down_vol): curr_max_price = round(price_up, 2) curr_vol += price_up_vol else: curr_min_price = round(price_down, 2) curr_vol += price_down_vol # Save VAL, value area low & VAH, value area high for each stock self.val[str(i)] = curr_min_price self.vah[str(i)] = curr_max_price