Overall Statistics |
Total Orders 102 Average Win 3.36% Average Loss -9.92% Compounding Annual Return 1.441% Drawdown 54.600% Expectancy 0.103 Start Equity 100000 End Equity 140984.11 Net Profit 40.984% Sharpe Ratio -0.023 Sortino Ratio -0.015 Probabilistic Sharpe Ratio 0.000% Loss Rate 18% Win Rate 82% Profit-Loss Ratio 0.34 Alpha -0.028 Beta 0.624 Annual Standard Deviation 0.127 Annual Variance 0.016 Information Ratio -0.437 Tracking Error 0.098 Treynor Ratio -0.005 Total Fees $380.34 Estimated Strategy Capacity $1200000000.00 Lowest Capacity Asset SPY R735QTJ8XC9X Portfolio Turnover 1.16% |
# region imports from AlgorithmImports import * import itertools # endregion class WalkForwardOptimizationGridSearchAlgorithm(QCAlgorithm): def initialize(self): self.set_start_date(2000, 1, 1) self.set_end_date(2024, 1, 1) self.set_cash(100_000) self.settings.automatic_indicator_warm_up = True self._security = self.add_equity("SPY", Resolution.DAILY) self._symbol = self._security.symbol self._short_ema = None self._long_ema = None # Set the optimization search space. self._parameter_sets = self._generate_parameter_sets( { 'short_ema': (10, 50, 10), # min, max, step 'long_ema': (60, 200, 10) } ) # Define the optimization objective function. objective = self._cumulative_return # Schedule periodic optimization sessions. self.train( self.date_rules.month_start(self._symbol), self.time_rules.midnight, lambda: self._do_wfo(self._optimization_func, max, objective) ) # Set a warm-up period so we hit one of the optimization sessions # before we start trading. self.set_warm_up(timedelta(45)) def _generate_parameter_sets(self, search_space): # Create ranges for each parameter. ranges = { parameter_name: np.arange(min_, max_ + step_size, step_size) for parameter_name, (min_, max_, step_size) in search_space.items() } # Create list of dictionaries for parameter sets. return [ dict(zip(ranges.keys(), combination)) for combination in list(itertools.product(*ranges.values())) ] def _do_wfo(self, optimization_func, min_max, objective): # Get the historical data we need to calculate the scores. prices = self.history( self._symbol, timedelta(365), Resolution.DAILY ).loc[self._symbol] # Calculate the score of each parameter set. scores = [ optimization_func(prices, parameter_set, objective) for parameter_set in self._parameter_sets ] # Find the parameter set that maximizes the objective function. optimal_parameters = self._parameter_sets[scores.index(min_max(scores))] # Record the grid search results. for i, score in enumerate(scores): self.log( f"{self.time}; Parameters: {self._parameter_sets[i]}; Score: {score}" ) for name, value in optimal_parameters.items(): self.plot('Parameters', name, value) # Adjust the algorithm's logic. self._update_algorithm_logic(optimal_parameters) def _optimization_func(self, data, parameter_set, objective): p1 = parameter_set['short_ema'] p2 = parameter_set['long_ema'] short_ema = data['close'].ewm(p1, min_periods=p1).mean() long_ema = data['close'].ewm(p2, min_periods=p2).mean() exposure = (short_ema - long_ema).dropna().apply(np.sign)\ .replace(0, pd.NA).ffill().shift(1) # ^ shift(1) because we enter the position on the next day. asset_daily_returns = data['open'].pct_change().shift(-1) # ^ shift(-1) because we want each entry to be the return from # the current day to the next day. strategy_daily_returns = (exposure * asset_daily_returns).dropna() return objective(strategy_daily_returns) def _cumulative_return(self, daily_returns): return (daily_returns + 1).cumprod()[-1] - 1 def _update_algorithm_logic(self, optimal_parameters): # Remove the old indicators. if self._short_ema: self.deregister_indicator(self._short_ema) if self._long_ema: self.deregister_indicator(self._long_ema) # Create the new indicators. self._short_ema = self.ema( self._symbol, optimal_parameters['long_ema'], Resolution.DAILY ) self._long_ema = self.ema( self._symbol, optimal_parameters['short_ema'], Resolution.DAILY ) def on_data(self, data): if self.is_warming_up: return # Case 1: Short EMA is above long EMA if (self._short_ema > self._long_ema and not self._security.holdings.is_long): self.set_holdings(self._symbol, 1) # Case 2: Short EMA is below long EMA elif (self._short_ema < self._long_ema and not self._security.holdings.is_short): self.set_holdings(self._symbol, 0)