Overall Statistics |
Total Trades 586 Average Win 0.00% Average Loss 0.00% Compounding Annual Return -0.430% Drawdown 0.000% Expectancy -0.255 Net Profit -0.009% Sharpe Ratio -2.837 Probabilistic Sharpe Ratio 22.922% Loss Rate 54% Win Rate 46% Profit-Loss Ratio 0.63 Alpha 0.001 Beta 0.006 Annual Standard Deviation 0.001 Annual Variance 0 Information Ratio 3.925 Tracking Error 0.176 Treynor Ratio -0.602 Total Fees $1259.90 Estimated Strategy Capacity $0 Lowest Capacity Asset ES XKGCMV4QK9VL |
#region imports from AlgorithmImports import * # from QuantConnect.Api import Api from datetime import datetime, timedelta, timezone import numpy as np import pandas as pd import warnings import math import matplotlib import matplotlib.pyplot as plt import matplotlib.dates as mdates from mpl_toolkits.mplot3d import Axes3D from matplotlib.colors import LightSource from mpl_toolkits.mplot3d.art3d import Poly3DCollection import matplotlib.tri as mtri from matplotlib import cm import matplotlib.dates as dates import dateutil import random from random import randrange from scipy.interpolate import UnivariateSpline from scipy.interpolate import interp1d from scipy.interpolate import make_interp_spline warnings.filterwarnings('ignore') #endregion PROJECT_ID = 12797570 # one day algo is aa856e5f51a79fd3f57aacdbccb30202 #2k backtest is 6aabfd389d5ce3d779512e4412e16bc9 class EntryModel: def __init__(self, api: Api, projectId: int = 12797570, backtestId: str = "aa856e5f51a79fd3f57aacdbccb30202"): """ Initializes an instance of EntryModel for a given Project ID. Args: api (Api): The instance of QuantConnect.Api projectId (int): The ID of the project the backtest is associated with backtestId (str): The ID of the backtest to run the entry model analysis for """ self.qb = QuantBook() self.api = api if projectId is None: projectId = PROJECT_ID if projectId is None: print(f"Please pass a Project ID or assign it to the variable PROJECT_ID in entry_analyzer.py.") return self.projectId = projectId if backtestId is None: backtestId = sorted([b for b in api.ListBacktests(projectId).Backtests if b.Completed], key=lambda b: b.Created, reverse=True)[0].BacktestId self.backtestId = backtestId self.backtest = api.ReadBacktest(projectId, backtestId) self.backtest_orders = FetchBacktestOrders(api, projectId, backtestId) self.trades_df = pd.DataFrame([(order.Symbol.Canonical.Value, order.Time, order.Price, order.Quantity) for order in self.backtest_orders], columns=['symbol', 'entry_time', 'entry_price', 'quantity']) self.trades_df.loc[:,'entry_time'] = pd.to_datetime(self.trades_df.entry_time, utc=True) \ .dt.tz_convert('US/Eastern') \ .dt.tz_localize(None) qb = self.qb canonical_ticker = self.trades_df.symbol.unique().tolist()[0].lstrip('/') self.future = qb.AddFuture(canonical_ticker, Resolution.Tick, dataNormalizationMode=DataNormalizationMode.BackwardsRatio, dataMappingMode=DataMappingMode.OpenInterest, contractDepthOffset=0) qb.SetStartDate(qb.Time - timedelta(1)) self.future.SetMarketPrice(qb.GetLastKnownPrice(self.future)) self.lastPrice = self.future.Price if self.future.Price == 0: prev_trading_day = self.future.Exchange.Hours.GetPreviousTradingDay(qb.Time - timedelta(1)) _end = self.future.Exchange.Hours.GetNextMarketClose(prev_trading_day, False) _start = _end - timedelta(minutes=1) lastPrice = qb.History(Tick, self.future.Symbol, _start, _end).lastprice.dropna().iloc[-1] self.lastPrice = lastPrice print(f"Entry Model initialized for backtest '{self.backtest.Name}' requested at {self.backtest.Created} with Algorithm ID: {self.backtest.BacktestId}.") def generate_random_entries(self, ignore_fees: bool = False, fwd_period: timedelta = timedelta(minutes=2), resolution: Resolution = Resolution.Second, ignore_overnight_returns: bool = True, figscale: float = 1.0) -> None: global trades_df_r global random_histogram trades_df_r = self.trades_df.copy() qb = QuantBook() future = qb.AddFuture("ES", Resolution.Second, dataNormalizationMode=DataNormalizationMode.BackwardsRatio, dataMappingMode=DataMappingMode.OpenInterest, contractDepthOffset=0) printcount = 0 marketclose = 16*60 d1 = datetime.strptime(str(trades_df_r.min(axis=0)['entry_time']), '%Y-%m-%d %H:%M:%S.%f').replace(hour=0, minute=0, second=0, microsecond=0) #need to get the hour minutes to 0 d2 = datetime.strptime(str(trades_df_r.max(axis=0)['entry_time']), '%Y-%m-%d %H:%M:%S.%f').replace(hour=0, minute=0, second=0, microsecond=0) delta = d2 - d1 day_delta = (delta.days) #+ delta.seconds trades_df_r = pd.DataFrame(columns=['symbol', 'entry_time', 'entry_price', 'quantity']) fwd_minutes = int(fwd_period.total_seconds())/60 while printcount < 5000: random_day = randrange(day_delta) random_hour = random.randint(9,15) if random_hour == 9: random_minute = random.randint(30, 59) else: random_minute = random.randint(0, 59) random_second = random.randint(0, 60) random_milli = random.randint(0, 999) day = d1 + timedelta(days=random_day, hours=random_hour, minutes=random_minute, seconds=random_second, milliseconds=random_milli) #day is the random timestamp hourmin = (int(day.strftime('%H'))*60)+int(day.strftime('%M')) #extract hourmin from timestamp for checks if np.is_busday(day.strftime("%Y-%m-%d")) == True and hourmin <= marketclose - fwd_minutes: #print('date chosen is:',day) start = day end = start + timedelta(seconds=1) #ticks = list(qb.History[Tick](future.Symbol, start, end, Resolution.Tick)) #ticks = list(qb.History(future.Symbol, start, end, Resolution.Second)) if not qb.History(future.Symbol, start,end,Resolution.Second).empty: price = qb.History(future.Symbol, start,end,Resolution.Second).close if price[0] == 0: print('error, price is 0!:', day) printcount += 1 trades_df2 = pd.DataFrame([(future.Symbol.Canonical.Value, day, price[0], 0)], columns=['symbol', 'entry_time', 'entry_price', 'quantity']) trades_df_r = trades_df_r.append(trades_df2) #this will be deprecated in future version trades_df_r.loc[:,'entry_time'] = pd.to_datetime(trades_df_r.entry_time, utc=True) \ .dt.tz_convert('US/Eastern') \ .dt.tz_localize(None) """ Histogram section """ trades_df = trades_df_r qb = self.qb trades_df.loc[:,'exit_time'] = trades_df.entry_time + fwd_period future = self.future intervals = [*map(tuple, trades_df.loc[:,['entry_time', 'exit_time']].to_numpy().astype('datetime64[ms]').astype(datetime).tolist())] df_list = [] prev_end = datetime.min for i, interval in enumerate(intervals): start, end = interval start = start.replace(microsecond=0) - timedelta(seconds=1) end = end.replace(microsecond=0) + timedelta(seconds=1) start = max(start, prev_end) if abs((end-start).total_seconds()) <= 1: continue ticks = qb.History[Tick](future.Symbol, start, end, Resolution.Tick) df = pd.DataFrame([(future.Symbol.Value, tick.Time, tick.Price) for tick in ticks if not tick.Suspicious and tick.TickType is TickType.Trade], columns=['symbol', 'time', 'price']) if df.empty: continue df.loc[:,'time'] = pd.to_datetime(df.time) df.set_index(['symbol', 'time'], inplace=True) df_list.append(df) prev_end = min(end, df.droplevel(0).index[-1].to_pydatetime().replace(microsecond=0) + timedelta(seconds=1)) data = pd.concat(df_list, axis=0, sort=True) data = data.loc[~data.index.duplicated(keep='first')] price_ranges_long = [] price_ranges_short = [] ignore_overnight_returns = True fwd_minutes = int(fwd_period.total_seconds())/60 for trade in trades_df.itertuples(): #remove head path = data.loc[future.Symbol.Value].loc[trade.entry_time:trade.exit_time].pct_change().fillna(0).add(1).cumprod() - 1 if ignore_overnight_returns: s = path.groupby(path.index.date).head(1) path.loc[s.index] = 0 if path.empty: continue if trade.quantity == 0: price_range_pct = (min(math.ceil(path.min().iloc[0]*2e3)/2e3, 0), max(math.floor(path.max().iloc[0]*2e3)/2e3, 0)) #2e3 (200) is just making number bigger, and you need to times by 5 to price_ranges_long.append(price_range_pct) bins = np.linspace(-.01, .01, 41) #change this too, defaut is (-0.1, .01, 21) binsarr = [round(i,8) for i in bins.tolist()] ranges_long_df = pd.DataFrame(price_ranges_long, columns=['low', 'high']).clip(lower=bins[0], upper=bins[-1]) #### NEW ######## ranges_long_flat = ranges_long_df.to_numpy().flatten() binlocation = [binsarr.index(i.item()) for i in ranges_long_flat] counts_long = pd.Series(bins[binlocation]).value_counts().sort_index().reindex(bins).fillna(0) counts_long.loc[counts_long.index >= 0] = counts_long.loc[counts_long.index >= 0].sort_index(ascending=False).cumsum() #need to subtract a 1 here counts_long.loc[counts_long.index <= 0] = counts_long.loc[counts_long.index <= 0].cumsum() counts_long /= counts_long.loc[0] random_histogram = counts_long def plot_avg_returns(self, ignore_fees: bool = False, fwd_period: timedelta = timedelta(minutes=30), resolution: Resolution = Resolution.Second, ignore_overnight_returns: bool = True, figscale: float = 1.0) -> None: """ Visualizes the average forward returns for the given entry model Args: ignore_fees (bool): Specifies whether fees should be ignored when computing returns. Defaults to `False`. fwd_period (timedelta): Specifies the forward-looking period to compute the returns for. Default is `timedelta(minutes=10)`. resolution (Resolution): The data resolution in use when computing returns. Defaults to `Resolution.Second`. ignore_overnight_returns (bool): Specifies whether the first data point after market open should be ignored (to avoid outliers from gaps). Defaults to `True`. figscale (float): The scaling factor of the figure to plot. Default is 1.0. Returns: None """ trades_df = self.trades_df.copy() qb = self.qb if resolution is Resolution.Second: trades_df.loc[:,'entry_time'] = trades_df.entry_time.astype('datetime64[s]') + timedelta(seconds=1) future = self.future min_pct_change = future.SymbolProperties.MinimumPriceVariation / self.lastPrice shift = timedelta(seconds=1) if resolution is Resolution.Second else timedelta(seconds=0) intervals = [(entry_time.to_pydatetime().replace(microsecond=0) - shift, entry_time.to_pydatetime().replace(microsecond=0) + timedelta(seconds=1) + fwd_period) for entry_time in trades_df.entry_time] df_list = [] prev_end = datetime.min for interval in intervals: start, end = interval start = max(start, prev_end) if abs((end - start).total_seconds()) <= 1: continue if resolution is Resolution.Tick: ticks = qb.History[Tick](future.Symbol, start, end, Resolution.Tick) df = pd.DataFrame([(future.Symbol.Value, tick.Time, tick.Price) for tick in ticks if not tick.Suspicious and tick.TickType is TickType.Trade], columns=['symbol', 'time', 'close']) if df.empty: continue df.loc[:,'time'] = pd.to_datetime(df.time) df = df.set_index(['symbol', 'time']) elif resolution is Resolution.Second: df = qb.History(future.Symbol, start, end, resolution, extendedMarket=False) if df.empty: continue df = df.droplevel(0).loc[:,['close']].rename(lambda x: future.Symbol.Value, level=0) df_list.append(df) prev_end = min(end, df.droplevel(0).index[-1]) if len(df_list) == 0: print(f"No historical data found for the specified period.") return history = pd.concat(df_list, axis=0, sort=True) history = history.loc[~history.index.duplicated(keep='first')] if history.empty: print(f"No historical data found for the specified period.") return returns = history.groupby(level=0).close.pct_change().fillna(0) if ignore_overnight_returns: s = returns.groupby(returns.index.get_level_values(1).date).head(1) returns.loc[s.index] = 0 returns.clip(lower=-min_pct_change, upper=min_pct_change, inplace=True) paths_long = [] paths_short = [] fees_pct = 0 shortcount = 0 longcount = 0 allcount = 0 yy_shorts = [] zz_shorts = [] max_shorts = [] min_shorts = [] high_shorts = [] minutes = 600 printtrue = True aa_array = [] bb_array = [] cc_array = [] fwd_minutes = int(fwd_period.total_seconds())/60 if not ignore_fees: scaling_factor = max(qb.Securities[self.future.Symbol].SymbolProperties.ContractMultiplier , 1) fees_amount = GetOrderFeeAmount(qb, self.backtest_orders[0]) fees_pct = fees_amount / trades_df.entry_price.mean() / scaling_factor for trade in trades_df.itertuples(): path = returns.loc[trade.symbol].loc[trade.entry_time : trade.entry_time + fwd_period] if printtrue == True: print('entry price',trade.entry_price) printtrue = False if path.empty: continue if trade.quantity >= 0: paths_long.append(path.reset_index(drop=True)) else: paths_short.append(path.reset_index(drop=True)) #path.mul(-1) is default flag_long = False flag_short = False if len(paths_long) > 1: flag_long = True paths_df_long = pd.concat(paths_long, axis=1, ignore_index=True).fillna(0) paths_df_long = pd.concat([pd.DataFrame(np.zeros((1, paths_df_long.columns.size)), columns=paths_df_long.columns), paths_df_long], ignore_index=True, axis=0) paths_df_long.loc[:,'mean'] = paths_df_long.mean(axis=1) if ignore_overnight_returns: min_pct_change /= future.SymbolProperties.ContractMultiplier paths_df_long.loc[:,'mean'].iloc[1] = paths_df_long.loc[:,'mean'].clip(lower=-min_pct_change, upper=min_pct_change).iloc[1] paths_df_long.loc[:,'stdev'] = paths_df_long.std(axis=1) td_idx_long = pd.TimedeltaIndex(data=[timedelta(seconds=i) for i in paths_df_long.index]) paths_df_long.index = td_idx_long.seconds post_entry_mean_returns_long = paths_df_long.loc[:,'mean'].add(1).cumprod() - fees_pct - 1 post_entry_stdev_long = paths_df_long.loc[:,'stdev'].iloc[1:].rolling(100).mean().fillna(method='bfill') if len(paths_short) > 1: flag_short = True paths_df_short = pd.concat(paths_short, axis=1, ignore_index=True).fillna(0) paths_df_short = pd.concat([pd.DataFrame(np.zeros((1, paths_df_short.columns.size)), columns=paths_df_short.columns), paths_df_short], ignore_index=True, axis=0) paths_df_short.loc[:,'mean'] = paths_df_short.mean(axis=1) if ignore_overnight_returns: min_pct_change /= future.SymbolProperties.ContractMultiplier paths_df_short.loc[:,'mean'].iloc[1] = paths_df_short.loc[:,'mean'].clip(lower=-min_pct_change,upper=min_pct_change).iloc[1] paths_df_short.loc[:,'stdev'] = paths_df_short.std(axis=1) td_idx_short = pd.TimedeltaIndex(data=[timedelta(seconds=i) for i in paths_df_short.index]) paths_df_short.index = td_idx_short.seconds post_entry_mean_returns_short = paths_df_short.loc[:,'mean'].add(1).cumprod() - fees_pct - 1 post_entry_stdev_short = paths_df_short.loc[:,'stdev'].iloc[1:].rolling(100).mean().fillna(method='bfill') maxls = max(post_entry_mean_returns_long.max(),post_entry_mean_returns_short.max())*1.05 minls = min(post_entry_mean_returns_long.min(),post_entry_mean_returns_short.min())*1.05 figsize_x, figsize_y = 15*figscale, 10*figscale fig, ax = plt.subplots(ncols=1, nrows=2, figsize=(figsize_x, figsize_y), sharex=False) if flag_long: post_entry_mean_returns_long.plot(ax=ax[0], color='tab:purple', lw=2) post_entry_mean_returns_long.add(2*post_entry_stdev_long).plot(ax=ax[0], alpha=.5, color='y', lw=1, ls=':') post_entry_mean_returns_long.sub(2*post_entry_stdev_long).plot(ax=ax[0], alpha=.5, color='y', lw=1, ls=':') if flag_short: post_entry_mean_returns_short.plot(ax=ax[1], color='tab:purple', lw=2) post_entry_mean_returns_short.add(2*post_entry_stdev_short).plot(ax=ax[1], alpha=.5, color='y', lw=1, ls=':') post_entry_mean_returns_short.sub(2*post_entry_stdev_short).plot(ax=ax[1], alpha=.5, color='y', lw=1, ls=':') ax[0].yaxis.set_major_formatter(matplotlib.ticker.PercentFormatter(xmax=1.0)) ax[1].yaxis.set_major_formatter(matplotlib.ticker.PercentFormatter(xmax=1.0)) ax[0].set_ylim(ymin=minls,ymax=maxls) ax[1].set_ylim(ymin=minls,ymax=maxls) ax[0].set_title(f'Trade Direction Long minutes: {fwd_minutes}') ax[1].set_title(f'Trade Direction Short minutes: {fwd_minutes}') ax[1].set_xlabel('Time after entry (minutes)') ax[0].set_xlabel('Time after entry (minutes)') ax[0].set_ylabel('Average return (%)') ax[1].set_ylabel('Average return (%)') formatter = matplotlib.ticker.FuncFormatter(format_func) ax[0].xaxis.set_major_formatter(formatter) ax[0].xaxis.set_major_locator(matplotlib.ticker.MultipleLocator(base=60)) ax[1].xaxis.set_major_formatter(formatter) ax[1].xaxis.set_major_locator(matplotlib.ticker.MultipleLocator(base=60)) fig.subplots_adjust(hspace=.5) plt.show() def plot_random_avg_returns(self, ignore_fees: bool = False, fwd_period: timedelta = timedelta(minutes=30), resolution: Resolution = Resolution.Second, ignore_overnight_returns: bool = True, figscale: float = 1.0) -> None: """ Visualizes the average forward returns for the given entry model Args: ignore_fees (bool): Specifies whether fees should be ignored when computing returns. Defaults to `False`. fwd_period (timedelta): Specifies the forward-looking period to compute the returns for. Default is `timedelta(minutes=10)`. resolution (Resolution): The data resolution in use when computing returns. Defaults to `Resolution.Second`. ignore_overnight_returns (bool): Specifies whether the first data point after market open should be ignored (to avoid outliers from gaps). Defaults to `True`. figscale (float): The scaling factor of the figure to plot. Default is 1.0. Returns: None """ trades_df = trades_df_r qb = QuantBook() future = qb.AddFuture("ES", Resolution.Second, dataNormalizationMode=DataNormalizationMode.BackwardsRatio, dataMappingMode=DataMappingMode.OpenInterest, contractDepthOffset=0) printcount = 0 marketclose = 16*60 #for futures qb = self.qb if resolution is Resolution.Second: trades_df.loc[:,'entry_time'] = trades_df.entry_time.astype('datetime64[s]') + timedelta(seconds=1) future = self.future min_pct_change = future.SymbolProperties.MinimumPriceVariation / self.lastPrice shift = timedelta(seconds=1) if resolution is Resolution.Second else timedelta(seconds=0) intervals = [(entry_time.to_pydatetime().replace(microsecond=0) - shift, entry_time.to_pydatetime().replace(microsecond=0) + timedelta(seconds=1) + fwd_period) for entry_time in trades_df.entry_time] df_list = [] prev_end = datetime.min for interval in intervals: start, end = interval start = max(start, prev_end) if abs((end - start).total_seconds()) <= 1: continue if resolution is Resolution.Tick: ticks = qb.History[Tick](future.Symbol, start, end, Resolution.Tick) df = pd.DataFrame([(future.Symbol.Value, tick.Time, tick.Price) for tick in ticks if not tick.Suspicious and tick.TickType is TickType.Trade], columns=['symbol', 'time', 'close']) if df.empty: continue df.loc[:,'time'] = pd.to_datetime(df.time) df = df.set_index(['symbol', 'time']) elif resolution is Resolution.Second: df = qb.History(future.Symbol, start, end, resolution, extendedMarket=False) if df.empty: continue df = df.droplevel(0).loc[:,['close']].rename(lambda x: future.Symbol.Value, level=0) df_list.append(df) prev_end = min(end, df.droplevel(0).index[-1]) if len(df_list) == 0: print(f"No historical data found for the specified period.") return history = pd.concat(df_list, axis=0, sort=True) history = history.loc[~history.index.duplicated(keep='first')] if history.empty: print(f"No historical data found for the specified period.") return returns = history.groupby(level=0).close.pct_change().fillna(0) if ignore_overnight_returns: s = returns.groupby(returns.index.get_level_values(1).date).head(1) returns.loc[s.index] = 0 returns.clip(lower=-min_pct_change, upper=min_pct_change, inplace=True) fwd_minutes = int(fwd_period.total_seconds())/60 paths_long = [] paths_short = [] fees_pct = 0 shortcount = 0 longcount = 0 allcount = 0 yy_shorts = [] zz_shorts = [] max_shorts = [] min_shorts = [] high_shorts = [] minutes = 600 printtrue = True aa_array = [] bb_array = [] cc_array = [] if not ignore_fees: scaling_factor = max(qb.Securities[self.future.Symbol].SymbolProperties.ContractMultiplier , 1) fees_amount = GetOrderFeeAmount(qb, self.backtest_orders[0]) fees_pct = fees_amount / trades_df.entry_price.mean() / scaling_factor for trade in trades_df.itertuples(): path = returns.loc[trade.symbol].loc[trade.entry_time : trade.entry_time + fwd_period] if printtrue == True: print('entry price',trade.entry_price) printtrue = False if path.empty: continue if trade.quantity == 0: paths_long.append(path.reset_index(drop=True)) paths_short.append(path.reset_index(drop=True)) #path.mul(-1) is default flag_long = False flag_short = False if len(paths_long) > 1: flag_long = True paths_df_long = pd.concat(paths_long, axis=1, ignore_index=True).fillna(0) paths_df_long = pd.concat([pd.DataFrame(np.zeros((1, paths_df_long.columns.size)), columns=paths_df_long.columns), paths_df_long], ignore_index=True, axis=0) paths_df_long.loc[:,'mean'] = paths_df_long.mean(axis=1) if ignore_overnight_returns: min_pct_change /= future.SymbolProperties.ContractMultiplier paths_df_long.loc[:,'mean'].iloc[1] = paths_df_long.loc[:,'mean'].clip(lower=-min_pct_change, upper=min_pct_change).iloc[1] paths_df_long.loc[:,'stdev'] = paths_df_long.std(axis=1) td_idx_long = pd.TimedeltaIndex(data=[timedelta(seconds=i) for i in paths_df_long.index]) paths_df_long.index = td_idx_long.seconds post_entry_mean_returns_long = paths_df_long.loc[:,'mean'].add(1).cumprod() - fees_pct - 1 post_entry_stdev_long = paths_df_long.loc[:,'stdev'].iloc[1:].rolling(100).mean().fillna(method='bfill') #print('paths long first 10',post_entry_mean_returns_long.head(20)) #print('paths max',post_entry_mean_returns_long.max()) if len(paths_short) > 1: flag_short = True paths_df_short = pd.concat(paths_short, axis=1, ignore_index=True).fillna(0) paths_df_short = pd.concat([pd.DataFrame(np.zeros((1, paths_df_short.columns.size)), columns=paths_df_short.columns), paths_df_short], ignore_index=True, axis=0) paths_df_short.loc[:,'mean'] = paths_df_short.mean(axis=1) if ignore_overnight_returns: min_pct_change /= future.SymbolProperties.ContractMultiplier paths_df_short.loc[:,'mean'].iloc[1] = paths_df_short.loc[:,'mean'].clip(lower=-min_pct_change,upper=min_pct_change).iloc[1] paths_df_short.loc[:,'stdev'] = paths_df_short.std(axis=1) td_idx_short = pd.TimedeltaIndex(data=[timedelta(seconds=i) for i in paths_df_short.index]) paths_df_short.index = td_idx_short.seconds post_entry_mean_returns_short = paths_df_short.loc[:,'mean'].add(1).cumprod() - fees_pct - 1 post_entry_stdev_short = paths_df_short.loc[:,'stdev'].iloc[1:].rolling(100).mean().fillna(method='bfill') figsize_x, figsize_y = 15*figscale, 10*figscale fig, ax = plt.subplots(ncols=1, nrows=2, figsize=(figsize_x, figsize_y), sharex=False) if flag_long: post_entry_mean_returns_long.plot(ax=ax[0], color='tab:purple', lw=2) #post_entry_mean_returns_long.add(2*post_entry_stdev_long).plot(ax=ax[0], alpha=.5, color='y', lw=1, ls=':') #post_entry_mean_returns_long.sub(2*post_entry_stdev_long).plot(ax=ax[0], alpha=.5, color='y', lw=1, ls=':') if flag_short: post_entry_mean_returns_short.plot(ax=ax[1], color='tab:purple', lw=2) #post_entry_mean_returns_short.add(2*post_entry_stdev_short).plot(ax=ax[1], alpha=.5, color='y', lw=1, ls=':') #post_entry_mean_returns_short.sub(2*post_entry_stdev_short).plot(ax=ax[1], alpha=.5, color='y', lw=1, ls=':') ax[0].yaxis.set_major_formatter(matplotlib.ticker.PercentFormatter(xmax=1.0)) ax[1].yaxis.set_major_formatter(matplotlib.ticker.PercentFormatter(xmax=1.0)) ax[0].set_title(f'Trade Direction Long period: {fwd_minutes}') ax[1].set_title(f'Trade Direction Short period: {fwd_minutes}') ax[1].set_xlabel('Time after entry (minutes)') ax[0].set_xlabel('Time after entry (minutes)') ax[0].set_ylabel('Average return (%)') ax[1].set_ylabel('Average return (%)') formatter = matplotlib.ticker.FuncFormatter(format_func) ax[0].xaxis.set_major_formatter(formatter) ax[0].xaxis.set_major_locator(matplotlib.ticker.MultipleLocator(base=60)) ax[1].xaxis.set_major_formatter(formatter) ax[1].xaxis.set_major_locator(matplotlib.ticker.MultipleLocator(base=60)) #ax[0].ticklabel_format(axis='y',style='plain') #ax[1].ticklabel_format(axis='y',style='plain') #yticklabels = [f"{float(item.get_text()):.3%}" for item in ax[1].get_yticklabels()] #ax[1].set_yticklabels(yticklabels) fig.subplots_adjust(hspace=.5) plt.show() def plot_3d_avg_returns(self, ignore_fees: bool = False, fwd_period: timedelta = timedelta(minutes=10), resolution: Resolution = Resolution.Second, ignore_overnight_returns: bool = True, figscale: float = 1.3) -> None: """ Visualizes the average forward returns for the given entry model and turns it into a 3d chart """ trades_df = self.trades_df.copy() qb = self.qb if resolution is Resolution.Second: trades_df.loc[:,'entry_time'] = trades_df.entry_time.astype('datetime64[s]') + timedelta(seconds=1) future = self.future min_pct_change = future.SymbolProperties.MinimumPriceVariation / self.lastPrice shift = timedelta(seconds=1) if resolution is Resolution.Second else timedelta(seconds=0) intervals = [(entry_time.to_pydatetime().replace(microsecond=0) - shift, entry_time.to_pydatetime().replace(microsecond=0) + timedelta(seconds=1) + fwd_period) for entry_time in trades_df.entry_time] df_list = [] prev_end = datetime.min for interval in intervals: start, end = interval start = max(start, prev_end) if abs((end - start).total_seconds()) <= 1: continue if resolution is Resolution.Tick: ticks = qb.History[Tick](future.Symbol, start, end, Resolution.Tick) df = pd.DataFrame([(future.Symbol.Value, tick.Time, tick.Price) for tick in ticks if not tick.Suspicious and tick.TickType is TickType.Trade], columns=['symbol', 'time', 'close']) if df.empty: continue df.loc[:,'time'] = pd.to_datetime(df.time) df = df.set_index(['symbol', 'time']) elif resolution is Resolution.Second: df = qb.History(future.Symbol, start, end, resolution, extendedMarket=False) if df.empty: continue df = df.droplevel(0).loc[:,['close']].rename(lambda x: future.Symbol.Value, level=0) df_list.append(df) prev_end = min(end, df.droplevel(0).index[-1]) if len(df_list) == 0: print(f"No historical data found for the specified period.") return history = pd.concat(df_list, axis=0, sort=True) history = history.loc[~history.index.duplicated(keep='first')] if history.empty: print(f"No historical data found for the specified period.") return returns = history.groupby(level=0).close.pct_change().fillna(0) if ignore_overnight_returns: s = returns.groupby(returns.index.get_level_values(1).date).head(1) returns.loc[s.index] = 0 returns.clip(lower=-min_pct_change, upper=min_pct_change, inplace=True) paths_long = [] paths_short = [] fees_pct = 0 shortcount = 0 longcount = 0 allcount = 0 yy_longs = [] zz_longs = [] yy_shorts = [] zz_shorts = [] seconds = 900 # mean 15 minutes before and after entry printtrue = True aa_array = [] bb_array = [] cc_array = [] aa2_array = [] bb2_array = [] cc2_array = [] shortpath = 0 path600 = 0 path601 = 0 fpsec = int(fwd_period.total_seconds()) fwd_minutes = int(fwd_period.total_seconds())/60 if not ignore_fees: scaling_factor = max(qb.Securities[self.future.Symbol].SymbolProperties.ContractMultiplier , 1) fees_amount = GetOrderFeeAmount(qb, self.backtest_orders[0]) fees_pct = fees_amount / trades_df.entry_price.mean() / scaling_factor for trade in trades_df.itertuples(): path = returns.loc[trade.symbol].loc[trade.entry_time : trade.entry_time + fwd_period] allcount += 1 ttime = int(datetime.strftime(trade.entry_time, '%M')) + (int(datetime.strftime(trade.entry_time, '%H'))*60) zz = trade.entry_time ztime = (int(datetime.strftime(zz, '%H'))*60*60 + int(datetime.strftime(zz, '%M'))*60 + int(datetime.strftime(zz, '%S'))) if path.empty: continue if trade.quantity >= 0 and len(path) >= fpsec-1 and ttime < 950: paths_long.append(path.reset_index(drop=True)) yy_long_path = path.add(1).cumprod() - 1 yy_longs.append(yy_long_path.values.tolist()[:fpsec]) zz_longs.append(ztime) longcount += 1 elif trade.quantity < 0 and len(path) >= fpsec-1 and ttime < 950: paths_short.append(path.mul(-1).reset_index(drop=True)) yy_short_path = path.add(1).cumprod() - 1 #does not include fees yy_shorts.append(yy_short_path.values.tolist()[:fpsec]) zz_shorts.append(ztime) shortcount += 1 elif trade.quantity >= 0 and len(path) < fpsec-1 and ttime < 950: shortpath += 1 elif trade.quantity < 0 and len(path) < fpsec-1 and ttime < 950: shortpath += 1 print('count of long trades',longcount) print('count of short trades',shortcount) print('count of paths with 600', path600) print('count of paths < 600', shortpath) flag_long = False flag_short = False yy_longs2 = np.array(yy_longs) yy = yy_longs2.flatten() #dont need? zz_longs2 = np.array(zz_longs) yy_shorts2 = np.array(yy_shorts) yy = yy_shorts2.flatten() #dont need? zz_shorts2 = np.array(zz_shorts) for v in zz_longs2: lookup = ([]) for d in zz_longs2: if v - d <= seconds and d - v <= seconds: #can encounter duplicates, if its duplicate it should do each one individually if np.where(zz_longs2 == d)[0].size > 1: multilook = np.array(np.where(zz_longs2 == d)[0]) for i in multilook: lookup.append(i) else: lookupx = np.where(zz_longs2 == d)[0] #if single lookup.append(lookupx) lookup_np = np.array(lookup) # convert array into numpy array rows_looked_up_already = [] for r in lookup_np: #Now we have the lookup value for rows, find all matching y columns for those rows i = r.item() # get number value if len(yy_longs2[i]) == fpsec: # remove this logic later on rows_looked_up_already.append(yy_longs2[i]) elif len(yy_longs2[i]) != fpsec: print('wrong yy_longs2 len',len(yy_longs2[i])) ox = np.mean(rows_looked_up_already, axis=0) #sum temp rows, axis=0 is default xx = [1*i for i in range(len(ox))] #create data points for x for every value in y zz = [v for i in range(len(ox))] #same with timestamp of original trade aa_array.append(xx) bb_array.append(ox) cc_array.append(zz) a = np.array(aa_array) b = np.array(bb_array) c = np.array(cc_array) aa=a.flatten() bb=b.flatten() cc=c.flatten() for v in zz_shorts2: lookup = ([]) lookup_new = np.array(lookup) for d in zz_shorts2: if v - d <= seconds and d - v <= seconds: #can encounter duplicates, if its duplicate it should do each one individually if np.where(zz_shorts2 == d)[0].size > 1: multilook = np.array(np.where(zz_shorts2 == d)[0]) for i in multilook: lookup.append(i) else: lookupx = np.where(zz_shorts2 == d)[0] #if single lookup.append(lookupx) np.append(lookup_new,lookupx) #add single to a lookup array lookup_np = np.array(lookup) # convert array into numpy array rows_looked_up_already = [] for r in lookup_np: #Now we have the lookup value for rows, find all matching y columns for those rows i = r.item() # get number value rows_looked_up_already.append(yy_shorts2[i]) ox = np.mean(rows_looked_up_already, axis=0) #sum temp rows xx = [1*i for i in range(len(ox))] zz = [v for i in range(len(ox))] aa2_array.append(xx) bb2_array.append(ox) cc2_array.append(zz) a2 = np.array(aa2_array) b2 = np.array(bb2_array) c2 = np.array(cc2_array) aa2=a2.flatten() bb2=b2.flatten() cc2=c2.flatten() maxbb = max(bb.max(), bb2.max(), bb.min()*-1, bb2.min()*-1) ######LOGIC TO COUNT ENTRIES OVER TIME mtime = np.arange(34200, 57600, 60,dtype='int64') countlong = [] countlong2 = [] for item in mtime: count = np.where((cc<(item+300))&(cc>(item-300)))[0].size countx = count/fpsec countlong.append(item) countlong2.append(countx) countshort = [] countshort2 = [] for item in mtime: count = np.where((cc2<(item+60))&(cc2>(item-60)))[0].size countx = count/fpsec countshort.append(item) countshort2.append(countx) bbend = bb[::(fpsec-1)] # do I need the -1??? ccend = cc[::(fpsec-1)] bb2end = bb2[::(fpsec-1)] cc2end = cc2[::(fpsec-1)] yyend = yy_longs[::(fpsec-1)] zzend = zz_longs[::(fpsec-1)] cs, bs = zip(*sorted(zip(ccend, bbend))) cs2, bs2 = zip(*sorted(zip(cc2end, bb2end))) #print('zzend len',len(zzend)) #print('yyend len',len(yyend)) df = pd.DataFrame([ccend,bbend]) #print('df before transpose',df) df = df.T df.columns = ['entry_time', 'gain'] df = df.groupby(['entry_time']).mean().reset_index() xnp = df['entry_time'].to_numpy() ynp = df['gain'].to_numpy() X_=np.linspace(xnp.min(), xnp.max(), 200) #spl = UnivariateSpline(cs, bs) #Y_=spl(X_) ispl = make_interp_spline(xnp, ynp, k=1) Y_ = ispl(X_) #spl.set_smoothing_factor(0.001) # cubic method cubic_interploation_model = interp1d(xnp,ynp, kind = "linear") X2_=np.linspace(xnp.min(), xnp.max(), 175) Y2_=cubic_interploation_model(X2_) #ax5.plot(X_, Y_) def HMSFormatter(value, loc): h = value // 3600 m = (value - h * 3600) // 60 s = value % 60 #return "%02d:%02d:%02d" % (h,m,s) return "%02d:%02d" % (h,m) fig = plt.figure(plt.figure(figsize=plt.figaspect(0.5)*3)) ax1 = fig.add_subplot(2,2,1,projection='3d') ax2 = fig.add_subplot(2,2,2,projection='3d') ax3 = fig.add_subplot(4,2,5) ax4 = fig.add_subplot(4,2,6) ax5 = fig.add_subplot(4,2,7) ax6 = fig.add_subplot(4,2,8) ax5.yaxis.set_major_formatter(matplotlib.ticker.PercentFormatter(xmax=1.0)) ax6.yaxis.set_major_formatter(matplotlib.ticker.PercentFormatter(xmax=1.0)) major_ticks = np.arange(34200, 57600, 3600) ax3.plot(countlong,countlong2, color='tab:purple') ax4.plot(countshort,countshort2, color='tab:purple') ax5.plot(X_,Y_, color='tab:purple') ax6.plot(X2_,Y2_, color='tab:purple') #ax6.plot(cs2,bs2, color='tab:purple') ax3.set_title('Long Entries Count') ax3.set_xlim([34200, 57600]) ax3.set_xticks(major_ticks) ax3.xaxis.set_major_formatter(matplotlib.ticker.FuncFormatter(HMSFormatter)) ax4.set_title('Short Entries Count') ax4.set_xlim([34200, 57600]) ax4.set_xticks(major_ticks) ax4.xaxis.set_major_formatter(matplotlib.ticker.FuncFormatter(HMSFormatter)) ax5.set_title('Long Final Value') ax5.set_xlim([34200, 57600]) ax5.set_xticks(major_ticks) ax5.xaxis.set_major_formatter(matplotlib.ticker.FuncFormatter(HMSFormatter)) ax5.set_ylim([maxbb*-1, maxbb]) ax6.set_title('Short Final Value') ax6.set_xlim([34200, 57600]) ax6.set_xticks(major_ticks) ax6.xaxis.set_major_formatter(matplotlib.ticker.FuncFormatter(HMSFormatter)) ax6.set_ylim([maxbb*-1, maxbb]) ax1.plot_trisurf(aa, cc, bb, cmap=cm.plasma, edgecolor='none') #switch z and y values so the chart looks cleaner ax1.view_init(elev=41., azim=353) ax1.set_box_aspect(aspect = (1,3,1)) ax1.set_title(f'Trade Direction Long period: {fwd_minutes}') ax1.set_xlabel('Seconds after entry') ax1.set_ylabel('Entry Time',labelpad=20) ax1.set_zlabel('Percent Gain',labelpad=20) ax1.set_yticks(major_ticks) ax1.set_zlim([maxbb*-1, maxbb]) ax1.yaxis.set_major_formatter(matplotlib.ticker.FuncFormatter(HMSFormatter)) #ax1.invert_xaxis() ax2.plot_trisurf(aa2, cc2, bb2, cmap=cm.plasma, edgecolor='none') ax2.view_init(elev=41., azim=345) ax2.set_box_aspect(aspect = (1,3,1)) ax2.set_title(f'Trade Direction short period: {fwd_minutes}') ax2.set_xlabel('Seconds after entry') ax2.set_ylabel('Entry Time',labelpad=20) ax2.set_zlabel('Percent Gain',labelpad=20) ax2.set_yticks(major_ticks) ax2.set_zlim([maxbb*-1, maxbb]) ax1.zaxis.set_major_formatter(matplotlib.ticker.PercentFormatter(xmax=1.0)) ax2.zaxis.set_major_formatter(matplotlib.ticker.PercentFormatter(xmax=1.0)) ax2.yaxis.set_major_formatter(matplotlib.ticker.FuncFormatter(HMSFormatter)) ax2.invert_xaxis() plt.show() def plot_histogram(self, fwd_period: timedelta = timedelta(minutes=2), ignore_overnight_returns: bool = True, figscale: float = 1.0) -> None: """ Visualizes the chance to hit a percent away from entry price. Args: fwd_period (timedelta): Specifies the forward-looking period to compute the returns for. Default is `timedelta(minutes=30)`. ignore_overnight_returns (bool): Specifies whether the first data point after market open should be ignored (to avoid outliers from gaps). Defaults to `True`. figscale (float): The scaling factor of the figure to plot. Default is 1.0. Returns: None """ trades_df = self.trades_df.copy() qb = self.qb trades_df.loc[:,'exit_time'] = trades_df.entry_time + fwd_period future = self.future intervals = [*map(tuple, trades_df.loc[:,['entry_time', 'exit_time']].to_numpy().astype('datetime64[ms]').astype(datetime).tolist())] df_list = [] prev_end = datetime.min for i, interval in enumerate(intervals): start, end = interval start = start.replace(microsecond=0) - timedelta(seconds=1) end = end.replace(microsecond=0) + timedelta(seconds=1) start = max(start, prev_end) if abs((end-start).total_seconds()) <= 1: continue ticks = qb.History[Tick](future.Symbol, start, end, Resolution.Tick) df = pd.DataFrame([(future.Symbol.Value, tick.Time, tick.Price) for tick in ticks if not tick.Suspicious and tick.TickType is TickType.Trade], columns=['symbol', 'time', 'price']) if df.empty: continue df.loc[:,'time'] = pd.to_datetime(df.time) df.set_index(['symbol', 'time'], inplace=True) df_list.append(df) prev_end = min(end, df.droplevel(0).index[-1].to_pydatetime().replace(microsecond=0) + timedelta(seconds=1)) data = pd.concat(df_list, axis=0, sort=True) data = data.loc[~data.index.duplicated(keep='first')] price_ranges_long = [] price_ranges_short = [] ignore_overnight_returns = True printcount = 0 fwd_minutes = int(fwd_period.total_seconds())/60 min_tick = 0.25 printonce = True for trade in trades_df.itertuples(): #remove head path = data.loc[future.Symbol.Value].loc[trade.entry_time:trade.exit_time].pct_change().fillna(0).add(1).cumprod() - 1 if ignore_overnight_returns: s = path.groupby(path.index.date).head(1) path.loc[s.index] = 0 if path.empty: continue if trade.quantity >= 0: printcount += 1 price_range_pct = (min(math.ceil(path.min().iloc[0]*2e3)/2e3, 0), max(math.floor(path.max().iloc[0]*2e3)/2e3, 0)) #2e3 (200) is just making number bigger, and you need to times by 5 to price_ranges_long.append(price_range_pct) #New calc here to find the +10/-10 ticks around entry price in df_list if printonce: print('trade price',trade['price']) printonce = False #tprice = trade['price'] #10tick = np.arange(tprice+(10*min_tick),tprice-(10*min_tick),min_tick) #path_ticks = np.arange(path.min,path.max,min_tick) #new_list = [p for p in path_ticks if p in 10tick] else: price_range_pct = (min(math.ceil(path.min().iloc[0]*2e3)/2e3, 0), max(math.floor(path.max().iloc[0]*2e3)/2e3, 0)) price_ranges_short.append(price_range_pct) bins = np.linspace(-.01, .01, 41) #change this too, defaut is (-0.1, .01, 21) binsarr = [round(i,8) for i in bins.tolist()] ranges_long_df = pd.DataFrame(price_ranges_long, columns=['low', 'high']).clip(lower=bins[0], upper=bins[-1]) #### NEW ######## ranges_long_flat = ranges_long_df.to_numpy().flatten() binlocation = [binsarr.index(i.item()) for i in ranges_long_flat] counts_long = pd.Series(bins[binlocation]).value_counts().sort_index().reindex(bins).fillna(0) ranges_short_df = pd.DataFrame(price_ranges_short, columns=['low', 'high']).clip(lower=bins[0], upper=bins[-1]) #### NEW ######## ranges_short_flat = ranges_short_df.to_numpy().flatten() binlocation = [binsarr.index(i.item()) for i in ranges_short_flat] counts_short = pd.Series(bins[binlocation]).value_counts().sort_index().reindex(bins).fillna(0) counts_long.loc[counts_long.index >= 0] = counts_long.loc[counts_long.index >= 0].sort_index(ascending=False).cumsum() #need to subtract a 1 here counts_long.loc[counts_long.index <= 0] = counts_long.loc[counts_long.index <= 0].cumsum() #print('ascending cumsum',counts_long.loc[counts_long.index <= 0]) counts_long /= counts_long.loc[0] counts_short.loc[counts_short.index >= 0] = counts_short.loc[counts_short.index >= 0].sort_index(ascending=False).cumsum() counts_short.loc[counts_short.index <= 0] = counts_short.loc[counts_short.index <= 0].cumsum() counts_short /= counts_short.loc[0] major_ticks = np.linspace(0, 1, 21) ##will this work? figsize_x, figsize_y = (figscale*15, figscale*10) fig, (ax1, ax2) = plt.subplots(nrows=2, ncols=1, figsize=(figsize_x,figsize_y)) fig.suptitle(f'Chance to hit % away', fontsize=16) if 'random_histogram' in globals(): random_histogram.plot.barh(ax=ax1, title=f'Trade Direction Long period: {fwd_minutes}', color='tab:blue', width=1, lw=.5, alpha=0.4) random_histogram.plot.barh(ax=ax2, title=f'Trade Direction Short period: {fwd_minutes}', color='tab:blue', width=1, lw=.5, alpha=0.4) counts_long.plot.barh(ax=ax1, title=f'Trade Direction Long period: {fwd_minutes}', color='tab:purple', width=1, lw=.5, alpha=0.7) #counts_long.plot.barh(ax=ax1, title=f'Trade Direction Long period: {fwd_minutes}', color='tab:purple', width=1, edgecolor='k', lw=.5) #plt.yticks(fontsize=6) counts_short.plot.barh(ax=ax2, title=f'Trade Direction Short period: {fwd_minutes}', color='tab:purple', width=1, lw=.5, alpha=0.7) #counts_short.plot.barh(ax=ax2, title=f'Trade Direction Short period: {fwd_minutes}', color='tab:purple', width=1, edgecolor='k', lw=.5) fig.canvas.draw() ##xticklabels = [f"{float(item.get_text()):.0%}" for item in ax1.get_xticklabels()] yticklabels = [f"{float(item.get_text()):.3%}" for item in ax1.get_yticklabels()] #need to change this too, default is [f"{round(float(item.get_text()),3):.1%}" for item in ax1.get_yticklabels()] ax1.set_xticks(major_ticks) # will these work? ax2.set_xticks(major_ticks) ax1.set_yticklabels(yticklabels) ax2.set_yticklabels(yticklabels) ##ax1.set_xticklabels(xticklabels) ##ax2.set_xticklabels(xticklabels) ##ax1.set_xticklabels(xlabels) ##ax2.set_xticklabels(xlabels) fig.subplots_adjust(hspace=.3) ax1.set_axisbelow(True) ax2.set_axisbelow(True) #plt.grid(visible=True, which='major', color='grey', linestyle='--') #plt.yticks(fontsize=6) ax1.tick_params(axis='y', which='major', labelsize=6) ax1.tick_params(axis='y', which='minor', labelsize=6) ax2.tick_params(axis='y', which='major', labelsize=6) ax2.tick_params(axis='y', which='minor', labelsize=6) ax1.xaxis.grid(lw=.1, color='grey') ax2.xaxis.grid(lw=.1, color='grey') plt.show() def plot_volume_profile(self, fwd_period: timedelta = timedelta(minutes=5), ignore_overnight_returns: bool = True, figscale: float = 1.0) -> None: """ Visualizes the volume profile after entry for the fwd_period Args: fwd_period (timedelta): Specifies the forward-looking period to compute the returns for. Default is `timedelta(minutes=30)`. ignore_overnight_returns (bool): Specifies whether the first data point after market open should be ignored (to avoid outliers from gaps). Defaults to `True`. figscale (float): The scaling factor of the figure to plot. Default is 1.0. Returns: None """ trades_df = self.trades_df.copy() qb = self.qb trades_df.loc[:,'exit_time'] = trades_df.entry_time + fwd_period future = self.future intervals = [*map(tuple, trades_df.loc[:,['entry_time', 'exit_time']].to_numpy().astype('datetime64[ms]').astype(datetime).tolist())] df_list = [] prev_end = datetime.min for i, interval in enumerate(intervals): start, end = interval start = start.replace(microsecond=0) - timedelta(seconds=1) end = end.replace(microsecond=0) + timedelta(seconds=1) start = max(start, prev_end) if abs((end-start).total_seconds()) <= 1: continue ticks = qb.History[Tick](future.Symbol, start, end, Resolution.Tick) #ticklist = list(qb.History[Tick](future.Symbol, start, end, Resolution.Tick)) df = pd.DataFrame([(future.Symbol.Value, tick.Time, tick.Price, tick.Quantity) for tick in ticks if not tick.Suspicious and tick.TickType is TickType.Trade], columns=['symbol', 'time', 'price', 'quantity']) if df.empty: continue df.loc[:,'time'] = pd.to_datetime(df.time) df.set_index(['symbol', 'time'], inplace=True) df_list.append(df) prev_end = min(end, df.droplevel(0).index[-1].to_pydatetime().replace(microsecond=0) + timedelta(seconds=1)) data = pd.concat(df_list, axis=0, sort=True) data = data.loc[~data.index.duplicated(keep='first')] price_ranges_long = [] price_ranges_short = [] ignore_overnight_returns = True l_paths_sum = pd.DataFrame(columns=['price', 'quantity']) s_paths_sum = pd.DataFrame(columns=['price', 'quantity']) paths_sum = pd.DataFrame(columns=['price', 'quantity']) fpaths_sum = pd.DataFrame(columns=['price', 'quantity']) p_emptycount = 0 bins = np.linspace(-.01, .01, 201) #should i have logic do sorted or rounding binsarr = [round(i,8) for i in bins.tolist()] fbins = np.linspace(-.01, .01, 21) #should i have logic do sorted or rounding fbinsarr = [round(i,8) for i in fbins.tolist()] fwd_minutes = int(fwd_period.total_seconds())/60 for trade in trades_df.itertuples(): #remove head path = data.loc[future.Symbol.Value].loc[trade.entry_time:trade.exit_time] path2 = path.price.pct_change().fillna(0).add(1).cumprod() - 1 trades_path = pd.concat([path2,path.quantity], axis=1) trades_path_sum = trades_path.groupby(['price']).quantity.sum().reset_index().sort_values(by=['price'], ascending=False) if len(trades_path_sum) > 0: yb=1 path = data.loc[future.Symbol.Value].loc[trade.entry_time:trade.exit_time].pct_change().fillna(0).add(1).cumprod() - 1 if ignore_overnight_returns: #how does this affect it?? s = path.groupby(path.index.date).head(1) path.loc[s.index] = 0 if path.empty: p_emptycount += 1 continue if trade.quantity >= 0 and len(trades_path_sum) > 0: l_paths_sum = l_paths_sum.append(trades_path_sum, ignore_index = True) if trade.quantity < 0 and len(trades_path_sum) > 0: s_paths_sum = s_paths_sum.append(trades_path_sum, ignore_index = True) """ No need to split by >0 and <0 here as the rounding function will work the same for both """ l_paths_sum['price'] = l_paths_sum['price'].apply(lambda x: round(x,4)) paths_sum = l_paths_sum.groupby(['price']).quantity.sum().reset_index().sort_values(by=['price'], ascending=False) binsdf = pd.DataFrame([(i,0) for i in binsarr],columns=['price', 'quantity']) paths_sum = binsdf.append(paths_sum, ignore_index = True) paths_sum = paths_sum.groupby(['price']).quantity.sum().reset_index().sort_values(by=['price'], ascending=False) paths_sum = paths_sum.set_index(['price']) s_paths_sum['price'] = s_paths_sum['price'].apply(lambda x: round(x,3)) fpaths_sum = s_paths_sum.groupby(['price']).quantity.sum().reset_index().sort_values(by=['price'], ascending=False) fbinsdf = pd.DataFrame([(i,0) for i in fbinsarr],columns=['price', 'quantity']) fpaths_sum = fbinsdf.append(fpaths_sum, ignore_index = True) fpaths_sum = fpaths_sum.groupby(['price']).quantity.sum().reset_index().sort_values(by=['price'], ascending=False) fpaths_sum = fpaths_sum.set_index(['price']) figscale = float(1.0) figsize_x, figsize_y = (figscale*15, figscale*10) fig, (ax1, ax2) = plt.subplots(nrows=2, ncols=1, figsize=(figsize_x,figsize_y)) fig.suptitle(f'Volume Profile %', fontsize=16) paths_sum.plot.barh(ax=ax1, title=f'Trade Direction Long period: {fwd_minutes}', color='tab:purple', width=1, edgecolor='k', lw=.5) fpaths_sum.plot.barh(ax=ax2, title=f'Trade Direction Short period: {fwd_minutes}', color='tab:purple', width=1, edgecolor='k', lw=.5) fig.canvas.draw() yticklabels = [f"{float(item.get_text()):.4%}" for item in ax1.get_yticklabels()] ax1.set_yticklabels(yticklabels) for index, label in enumerate(ax1.yaxis.get_ticklabels()): if index % 10 != 0: label.set_visible(False) yticklabels = [f"{float(item.get_text()):.4%}" for item in ax2.get_yticklabels()] ax2.set_yticklabels(yticklabels) ax1.invert_yaxis() ax1.set_axisbelow(True) ax1.xaxis.grid(lw=.1, color='grey') ax2.invert_yaxis() ax2.set_axisbelow(True) ax2.xaxis.grid(lw=.1, color='grey') #plt.yticks(fontsize=6) ax1.tick_params(axis='y', which='major', labelsize=6) ax1.tick_params(axis='y', which='minor', labelsize=6) ax2.tick_params(axis='y', which='major', labelsize=6) ax2.tick_params(axis='y', which='minor', labelsize=6) plt.show() def FetchBacktestOrders(api: Api, projectId: int, backtestId: str) -> list: '''Fetches the orders of a given backtest using QuantConnect.Api Args: api (Api): The instance of QuantConnect.Api that is automatically created on start of the research environment projectId (int): The ID of the project the backtest is associated with backtestId (str): The ID of the backtest fetch the orders for Returns: backtestOrders (list): The list of orders ''' backtestOrders = [] start = 0 stepsize = 100 end = start + stepsize while True: orders = api.ReadBacktestOrders(projectId, backtestId, start, end) backtestOrders.extend(orders) start += stepsize end = start + stepsize if len(orders) < 100: break return backtestOrders def GetOrderFeeAmount(qb: QuantBook, order: Order) -> float: '''Computes the order fee amount for a given order Args: qb (QuantBook): The QuantBook instance order (Order): The order to compute the fees for Returns: fee_amount (float): The cash amount of the order fee ''' symbol = order.Symbol if symbol in qb.Securities.Keys: security = qb.Securities[symbol] else: security = qb.AddFutureContract(order.Symbol) feeModel = security.FeeModel submitOrderRequest = SubmitOrderRequest(OrderType.Market, SecurityType.Future, symbol, 1, 0, 0, 0, qb.Time, "", None) order = Order.CreateOrder(submitOrderRequest) orderFeeParameters = OrderFeeParameters(security, order) fee_amount = feeModel.GetOrderFee(orderFeeParameters).Value.Amount return fee_amount def format_func(x: float, pos) -> None: minutes = int((x%3600)//60) seconds = int(x%60) #return "{:d}:{:02d}".format(minutes, seconds) return "{:d}".format(minutes)
#region imports from datetime import timedelta from AlgorithmImports import * #endregion class RetrospectiveTanButterfly(QCAlgorithm): def Initialize(self): self.SetStartDate(2020, 9, 17) # Set Start Date self.SetEndDate(2020, 9, 24) self.SetCash(1000000000) # Set Strategy Cash self.symbolData = {} self.canLong = True self.canShort = True #symbol = self.AddSecurity(SecurityType.Future, Futures.Indices.SP500EMini , Resolution.Tick, extendedMarketHours = False, dataNormalizationMode = DataNormalizationMode.BackwardsRatio, dataMappingMode = DataMappingMode.OpenInterest, contractDepthOffset = 0).Symbol self.contract = self.AddFuture(Futures.Indices.SP500EMini , Resolution.Tick, extendedMarketHours = False, dataNormalizationMode = DataNormalizationMode.BackwardsRatio, dataMappingMode = DataMappingMode.OpenInterest, contractDepthOffset = 0) symbol = self.contract.Symbol #symbol.SetFilter(0, 90) #self.futureSP500 = self.AddFuture(Futures.Indices.SP500EMini, extendedMarketHours = True) #self.futureGold = self.AddFuture(Futures.Metals.Gold, extendedMarketHours = True) #future = self.AddFuture(Futures.Indices.SP500EMini , Resolution.Tick, extendedMarketHours = False) #future.SetFilter(0, 90) #symbol = future.Symbol #continuousContract = [Futures.Indices.SP500EMini] #currentContract = self.Securities[symbol.Mapped] self.symbolData[symbol] = SymbolData() self.symbolData[symbol].bidPrice = self.Securities[symbol].BidPrice self.symbolData[symbol].askPrice = self.Securities[symbol].AskPrice #symbol.SetFilter(lambda x: x.FrontMonth().OnlyApplyFilterAtMarketOpen()) #self.contracts = [symbol] #tickers = ["SPY", "QQQ"] #for ticker in tickers: # symbol = self.AddEquity(ticker, Resolution.Tick).Symbol # self.symbolData[symbol] = SymbolData() # self.symbolData[symbol].bidPrice = self.Securities[symbol].BidPrice # self.symbolData[symbol].askPrice = self.Securities[symbol].AskPrice def OnData(self, data): for symbol, symbolData in self.symbolData.items(): if not data.Ticks.ContainsKey(symbol): continue #underlying = symbol.Underlying if self.Time.second == 00 or self.Time.second == 30: symbolData.buyRollingVolume5 = symbolData.buyRollingVolume4 symbolData.sellRollingVolume5 = symbolData.sellRollingVolume4 symbolData.buyRollingVolume4 = symbolData.buyRollingVolume3 symbolData.sellRollingVolume4 = symbolData.sellRollingVolume3 symbolData.buyRollingVolume3 = symbolData.buyRollingVolume2 symbolData.sellRollingVolume3 = symbolData.sellRollingVolume2 symbolData.buyRollingVolume2 = symbolData.buyRollingVolume1 symbolData.sellRollingVolume2 = symbolData.sellRollingVolume1 symbolData.buyRollingVolume1 = 0 symbolData.sellRollingVolume1 = 0 ticks = data.Ticks[symbol] for tick in ticks: if tick.TickType == TickType.Quote: symbolData.bidPrice = tick.BidPrice if tick.BidPrice != 0 else symbolData.bidPrice symbolData.askPrice = tick.AskPrice if tick.AskPrice != 0 else symbolData.askPrice elif tick.TickType == TickType.Trade: if tick.Price - symbolData.bidPrice > symbolData.askPrice - tick.Price: symbolData.sellVolume += tick.Quantity symbolData.sellRollingVolume1 += tick.Quantity else: symbolData.buyVolume += tick.Quantity symbolData.buyRollingVolume1 += tick.Quantity if (symbolData.buyRollingVolume1 + symbolData.buyRollingVolume2 + symbolData.buyRollingVolume3 + symbolData.buyRollingVolume4 + symbolData.buyRollingVolume5) - (symbolData.sellRollingVolume1 + symbolData.sellRollingVolume2 + symbolData.sellRollingVolume3 + symbolData.sellRollingVolume4 + symbolData.sellRollingVolume5) < 5: #and > 5: #self.Log(f"Can Long volume buy Delta: {symbolData.buyIntraVolume - symbolData.sellIntraVolume}") self.canLong = True elif (symbolData.sellRollingVolume1 + symbolData.sellRollingVolume2 + symbolData.sellRollingVolume3 + symbolData.sellRollingVolume4 + symbolData.sellRollingVolume5) - (symbolData.buyRollingVolume1 + symbolData.buyRollingVolume2 + symbolData.buyRollingVolume3 + symbolData.buyRollingVolume4 + symbolData.buyRollingVolume5) < 5: #self.Log(f"Can Short volume sell Delta: {symbolData.sellIntraVolume - symbolData.buyIntraVolume}") self.canShort = True if (symbolData.buyRollingVolume1 + symbolData.buyRollingVolume2 + symbolData.buyRollingVolume3 + symbolData.buyRollingVolume4 + symbolData.buyRollingVolume5) - (symbolData.sellRollingVolume1 + symbolData.sellRollingVolume2 + symbolData.sellRollingVolume3 + symbolData.sellRollingVolume4 + symbolData.sellRollingVolume5) >= 700 and self.canLong == True: self.Log(f"volume buy Delta: {(symbolData.buyRollingVolume1 + symbolData.buyRollingVolume2 + symbolData.buyRollingVolume3 + symbolData.buyRollingVolume4 + symbolData.buyRollingVolume5) - (symbolData.sellRollingVolume1 + symbolData.sellRollingVolume2 + symbolData.sellRollingVolume3 + symbolData.sellRollingVolume4 + symbolData.sellRollingVolume5)}") self.canLong = False self.MarketOrder(self.contract.Mapped, 1) #self.Buy(symbol, 1) elif (symbolData.sellRollingVolume1 + symbolData.sellRollingVolume2 + symbolData.sellRollingVolume3 + symbolData.sellRollingVolume4 + symbolData.sellRollingVolume5) - (symbolData.buyRollingVolume1 + symbolData.buyRollingVolume2 + symbolData.buyRollingVolume3 + symbolData.buyRollingVolume4 + symbolData.buyRollingVolume5) >= 700 and self.canShort == True: self.Log(f"volume sell Delta: {(symbolData.sellRollingVolume1 + symbolData.sellRollingVolume2 + symbolData.sellRollingVolume3 + symbolData.sellRollingVolume4 + symbolData.sellRollingVolume5) - (symbolData.buyRollingVolume1 + symbolData.buyRollingVolume2 + symbolData.buyRollingVolume3 + symbolData.buyRollingVolume4 + symbolData.buyRollingVolume5)}") self.canShort = False self.MarketOrder(self.contract.Mapped, -1) #self.Buy(symbol, -1) def OnEndOfDay(self, symbol): symbolData = self.symbolData[symbol] self.Debug(f"{symbol.Value}'s buy volume is {symbolData.buyVolume} and sell volume is {symbolData.sellVolume} for today") self.Log(f"{symbol.Value}'s buy volume is {symbolData.buyVolume} and sell volume is {symbolData.sellVolume} for today") symbolData.ClearDay() class SymbolData: def __init__(self): self.buyVolume = 0 self.sellVolume = 0 self.buyRollingVolume1 = 0 self.sellRollingVolume1 = 0 self.buyRollingVolume2 = 0 self.sellRollingVolume2 = 0 self.buyRollingVolume3 = 0 self.sellRollingVolume3 = 0 self.buyRollingVolume4 = 0 self.sellRollingVolume4 = 0 self.buyRollingVolume5 = 0 self.sellRollingVolume5 = 0 self.buyRollingVolume = self.buyRollingVolume1 + self.buyRollingVolume2 + self.buyRollingVolume3 + self.buyRollingVolume4 + self.buyRollingVolume5 self.sellRollingVolume = self.sellRollingVolume1 + self.sellRollingVolume2 + self.sellRollingVolume3 + self.sellRollingVolume4 + self.sellRollingVolume5 self.bidPrice = 0 self.askPrice = 0 self.canShort = True self.canLong = True def ClearDay(self): self.buyVolume = 0 self.sellVolume = 0 def ClearIntra(self): self.buyIntraVolume = 0 self.sellIntraVolume = 0