Overall Statistics |
Total Orders 859 Average Win 0.07% Average Loss -0.06% Compounding Annual Return 2.442% Drawdown 4.100% Expectancy 0.096 Start Equity 1000000.00 End Equity 1024535.86 Net Profit 2.454% Sharpe Ratio -0.909 Sortino Ratio -1.205 Probabilistic Sharpe Ratio 27.621% Loss Rate 52% Win Rate 48% Profit-Loss Ratio 1.30 Alpha -0.035 Beta -0.018 Annual Standard Deviation 0.038 Annual Variance 0.001 Information Ratio 0.071 Tracking Error 0.072 Treynor Ratio 1.874 Total Fees $0.00 Estimated Strategy Capacity $42000000.00 Lowest Capacity Asset GBPUSD 8G Portfolio Turnover 31.02% |
#region imports from AlgorithmImports import * import pywt from sklearn.svm import SVR from sklearn.model_selection import GridSearchCV #endregion class SVMWavelet: def forecast(self, data): ''' Decomposes 1-D array "data" into multiple components using Discrete Wavelet Transform, denoises each component using thresholding, use Support Vector Regression (SVR) to forecast each component, recombine components for aggregate forecast returns: the value of the aggregate forecast 1 time-step into the future ''' w = pywt.Wavelet('sym10') # Daubechies/Symlets are good choices for denoising threshold = 0.5 # Decompose into wavelet components coeffs = pywt.wavedec(data, w) # if we want at least 3 levels (components), solve for: # log2(len(data) / wave_length - 1) >= 3 # in this case, since we wave_length(sym10) == 20, after solving we get len(data) >= 152, # hence why our RollingWindow is of length 152 in main.py for i in range(len(coeffs)): if i > 0: # we don't want to threshold the approximation coefficients coeffs[i] = pywt.threshold(coeffs[i], threshold*max(coeffs[i])) forecasted = self._svm_forecast(coeffs[i]) coeffs[i] = np.roll(coeffs[i], -1) coeffs[i][-1] = forecasted datarec = pywt.waverec(coeffs, w) return datarec[-1] def _svm_forecast(self, data, sample_size=10): ''' Paritions "data" and fits an SVM model to this data, then forecasts the value one time-step into the future ''' X, y = self._partition_array(data, size=sample_size) param_grid = {'C': [.05, .1, .5, 1, 5, 10], 'epsilon': [0.001, 0.005, 0.01, 0.05, 0.1]} gsc = GridSearchCV(SVR(), param_grid, scoring='neg_mean_squared_error') model = gsc.fit(X, y).best_estimator_ return model.predict(data[np.newaxis, -sample_size:])[0] def _partition_array(self, arr, size=None, splits=None): ''' partitions 1-D array "arr" in a Rolling fashion if "size" is specified, else, divides the into "splits" pieces returns: list of paritioned arrays, list of the values 1 step ahead of each partitioned array ''' arrs = [] values = [] if not (bool(size is None) ^ bool(splits is None)): raise ValueError('Size XOR Splits should not be None') if size: arrs = [arr[i:i + size] for i in range(len(arr) - size)] values = [arr[i] for i in range(size, len(arr))] elif splits: size = len(arr) // splits if len(arr) % size == 0: arrs = [arr[i:i + size] for i in range(size - 1, len(arr) - 1, size)] values = [arr[i] for i in range(2 * size - 1, len(arr), size)] else: arrs = [arr[i:i + size] for i in range(len(arr) % size - 1, len(arr) - 1, size)] values = [arr[value].iloc[i] for i in range(len(arr) % size + size - 1, len(arr), size)] return np.array(arrs), np.array(values)
#region imports from AlgorithmImports import * from SVMWavelet import SVMWavelet #endregion class SVMWaveletAlphaModel(AlphaModel): def __init__(self, period): self._period = period self._wavelet = SVMWavelet() self._symbol_data = {} self._day = -1 def update(self, algorithm, data): insights = [] if self._day == algorithm.time.day: return [] self._day = algorithm.time.day for symbol, data in self._symbol_data.items(): if not algorithm.is_market_open(symbol): continue prices = data.prices() forecasted_value = self._wavelet.forecast(prices) # if the sums of the weights > 1, IWPCM normalizes the sum to 1, which # means we don't need to worry about normalizing them weight = (forecasted_value / prices[-1]) - 1 if weight > 0.005: insights.append(Insight.price(symbol, timedelta(1), InsightDirection.UP, weight=abs(weight))) elif weight < -0.005: insights.append(Insight.price(symbol, timedelta(1), InsightDirection.DOWN, weight=abs(weight))) algorithm.insights.cancel([symbol]) return insights def on_securities_changed(self, algorithm, changed): for security in changed.added_securities: symbol = security.symbol self._symbol_data[symbol] = SymbolData(algorithm, symbol, self._period) for security in changed.removed_securities: data = self._symbol_data.pop(security.symbol, None) if data: data.dispose() class SymbolData: def __init__(self, algorithm, symbol, period): self._algorithm = algorithm self._symbol = symbol self._close = RollingWindow[float](period) self._consolidator = QuoteBarConsolidator(timedelta(1)) self._consolidator.data_consolidated += self._on_consolidated algorithm.subscription_manager.add_consolidator(symbol, self._consolidator) hist = algorithm.history[QuoteBar](symbol, period, Resolution.DAILY) for bar in hist: self._consolidator.update(bar) def _on_consolidated(self, sender, bar): self._close.add(bar.close) def dispose(self): self._close.reset() self._algorithm.subscription_manager.remove_consolidator(self._symbol, self._consolidator) def prices(self): return np.array(list(self._close))[::-1]
#region imports from AlgorithmImports import * from alpha import SVMWaveletAlphaModel from portfolio import LeveragedWeightingPortfolioConstructionModel #endregion class OptimizedUncoupledRegulators(QCAlgorithm): def initialize(self): self.set_start_date(2023, 3, 1) self.set_end_date(2024, 3, 1) self.set_cash(1000000) period = self.get_parameter("period", 152) leverage = self.get_parameter("leverage", 20) self.set_brokerage_model(BrokerageName.OANDA_BROKERAGE, AccountType.MARGIN) self.set_benchmark(SecurityType.FOREX, "EURUSD") self.universe_settings.leverage = leverage self.universe_settings.resolution = Resolution.MINUTE symbols = [ Symbol.create(pairs, SecurityType.FOREX, Market.OANDA) for pairs in ["EURJPY", "GBPUSD", "AUDCAD", "NZDCHF"]] self.set_universe_selection(ManualUniverseSelectionModel(symbols)) self.set_alpha(SVMWaveletAlphaModel(period)) self.set_portfolio_construction(LeveragedWeightingPortfolioConstructionModel(lambda dt: None, leverage))
#region imports from AlgorithmImports import * #endregion class LeveragedWeightingPortfolioConstructionModel(EqualWeightingPortfolioConstructionModel): def __init__(self, rebalance = Resolution.DAILY, leverage = 20): super().__init__(rebalance) self.leverage = leverage def should_create_target_for_insight(self, insight): return insight.weight is not None def determine_target_percent(self, activeInsights): result = {} for insight in activeInsights: result[insight] = (insight.direction if self.respect_portfolio_bias(insight) else InsightDirection.FLAT) * insight.weight * self.leverage return result