Overall Statistics
Total Orders
859
Average Win
0.07%
Average Loss
-0.06%
Compounding Annual Return
2.442%
Drawdown
4.100%
Expectancy
0.096
Start Equity
1000000.00
End Equity
1024535.86
Net Profit
2.454%
Sharpe Ratio
-0.909
Sortino Ratio
-1.205
Probabilistic Sharpe Ratio
27.621%
Loss Rate
52%
Win Rate
48%
Profit-Loss Ratio
1.30
Alpha
-0.035
Beta
-0.018
Annual Standard Deviation
0.038
Annual Variance
0.001
Information Ratio
0.071
Tracking Error
0.072
Treynor Ratio
1.874
Total Fees
$0.00
Estimated Strategy Capacity
$42000000.00
Lowest Capacity Asset
GBPUSD 8G
Portfolio Turnover
31.02%
#region imports
from AlgorithmImports import *

import pywt
from sklearn.svm import SVR
from sklearn.model_selection import GridSearchCV
#endregion


class SVMWavelet:

    def forecast(self, data):
        '''
        Decomposes 1-D array "data" into multiple components using Discrete Wavelet Transform,
        denoises each component using thresholding, 
        use Support Vector Regression (SVR) to forecast each component,
        recombine components for aggregate forecast

        returns: the value of the aggregate forecast 1 time-step into the future
        '''

        w = pywt.Wavelet('sym10')  # Daubechies/Symlets are good choices for denoising 
        
        threshold = 0.5

        # Decompose into wavelet components
        coeffs = pywt.wavedec(data, w)
        
        # if we want at least 3 levels (components), solve for:
        #   log2(len(data) / wave_length - 1) >= 3
        #   in this case, since we wave_length(sym10) == 20, after solving we get len(data) >= 152,
        #   hence why our RollingWindow is of length 152 in main.py

        for i in range(len(coeffs)):
            if i > 0:
                # we don't want to threshold the approximation coefficients
                coeffs[i] = pywt.threshold(coeffs[i], threshold*max(coeffs[i]))
            forecasted = self._svm_forecast(coeffs[i])
            coeffs[i] = np.roll(coeffs[i], -1)
            coeffs[i][-1] = forecasted
            
        datarec = pywt.waverec(coeffs, w)
        return datarec[-1]

    def _svm_forecast(self, data, sample_size=10):
        '''
        Paritions "data" and fits an SVM model to this data, then forecasts the
        value one time-step into the future
        '''
        X, y = self._partition_array(data, size=sample_size)

        param_grid = {'C': [.05, .1, .5, 1, 5, 10], 'epsilon': [0.001, 0.005, 0.01, 0.05, 0.1]}
        gsc = GridSearchCV(SVR(), param_grid, scoring='neg_mean_squared_error')
        
        model = gsc.fit(X, y).best_estimator_

        return model.predict(data[np.newaxis, -sample_size:])[0]
        
    def _partition_array(self, arr, size=None, splits=None):
        '''
        partitions 1-D array "arr" in a Rolling fashion if "size" is specified, 
        else, divides the into "splits" pieces

        returns: list of paritioned arrays, list of the values 1 step ahead of each partitioned array
        '''

        arrs = []
        values = []

        if not (bool(size is None) ^ bool(splits is None)):
            raise ValueError('Size XOR Splits should not be None')

        if size:
            arrs = [arr[i:i + size] for i in range(len(arr) - size)]
            values = [arr[i] for i in range(size, len(arr))]

        elif splits:
            size = len(arr) // splits
            if len(arr) % size == 0:
                arrs = [arr[i:i + size] for i in range(size - 1, len(arr) - 1, size)]
                values = [arr[i] for i in range(2 * size - 1, len(arr), size)]
            else:
                arrs = [arr[i:i + size] for i in range(len(arr) % size - 1, len(arr) - 1, size)]
                values = [arr[value].iloc[i] for i in range(len(arr) % size + size - 1, len(arr), size)]

        return np.array(arrs), np.array(values)

#region imports
from AlgorithmImports import *

from SVMWavelet import SVMWavelet
#endregion
        

class SVMWaveletAlphaModel(AlphaModel):
    def __init__(self, period):
        self._period = period
        self._wavelet = SVMWavelet()
        self._symbol_data = {}
        self._day = -1
        
    def update(self, algorithm, data):
        insights = []

        if self._day == algorithm.time.day: 
            return []
        self._day = algorithm.time.day
        
        for symbol, data in self._symbol_data.items():
            if not algorithm.is_market_open(symbol):
                continue

            prices = data.prices()
            forecasted_value = self._wavelet.forecast(prices)
            
            # if the sums of the weights > 1, IWPCM normalizes the sum to 1, which
            #   means we don't need to worry about normalizing them
            weight = (forecasted_value / prices[-1]) - 1
            
            if weight > 0.005:
                insights.append(Insight.price(symbol, timedelta(1), InsightDirection.UP, weight=abs(weight)))
            elif weight < -0.005:
                insights.append(Insight.price(symbol, timedelta(1), InsightDirection.DOWN, weight=abs(weight)))
            
            algorithm.insights.cancel([symbol])
        
        return insights
  
    def on_securities_changed(self, algorithm, changed):
        for security in changed.added_securities:
            symbol = security.symbol
            self._symbol_data[symbol] = SymbolData(algorithm, symbol, self._period)
        
        for security in changed.removed_securities:
            data = self._symbol_data.pop(security.symbol, None)
            if data:
                data.dispose()


class SymbolData:
    def __init__(self, algorithm, symbol, period):
        self._algorithm = algorithm
        self._symbol = symbol
        self._close = RollingWindow[float](period)

        self._consolidator = QuoteBarConsolidator(timedelta(1))
        self._consolidator.data_consolidated += self._on_consolidated
        algorithm.subscription_manager.add_consolidator(symbol, self._consolidator)

        hist = algorithm.history[QuoteBar](symbol, period, Resolution.DAILY)
        for bar in hist:
            self._consolidator.update(bar)
    
    def _on_consolidated(self, sender, bar):
        self._close.add(bar.close)

    def dispose(self):
        self._close.reset()
        self._algorithm.subscription_manager.remove_consolidator(self._symbol, self._consolidator)

    def prices(self):
        return np.array(list(self._close))[::-1]
#region imports
from AlgorithmImports import *

from alpha import SVMWaveletAlphaModel
from portfolio import LeveragedWeightingPortfolioConstructionModel
#endregion


class OptimizedUncoupledRegulators(QCAlgorithm):

    def initialize(self):
        self.set_start_date(2023, 3, 1)
        self.set_end_date(2024, 3, 1)
        self.set_cash(1000000)  
        
        period = self.get_parameter("period", 152)
        leverage = self.get_parameter("leverage", 20)

        self.set_brokerage_model(BrokerageName.OANDA_BROKERAGE, AccountType.MARGIN)
        self.set_benchmark(SecurityType.FOREX, "EURUSD")

        self.universe_settings.leverage = leverage
        self.universe_settings.resolution = Resolution.MINUTE
        symbols = [ Symbol.create(pairs, SecurityType.FOREX, Market.OANDA) 
            for pairs in ["EURJPY", "GBPUSD", "AUDCAD", "NZDCHF"]]
        self.set_universe_selection(ManualUniverseSelectionModel(symbols))
        
        self.set_alpha(SVMWaveletAlphaModel(period))
        self.set_portfolio_construction(LeveragedWeightingPortfolioConstructionModel(lambda dt: None, leverage))
#region imports
from AlgorithmImports import *
#endregion


class LeveragedWeightingPortfolioConstructionModel(EqualWeightingPortfolioConstructionModel):
    def __init__(self, rebalance = Resolution.DAILY, leverage = 20):
        super().__init__(rebalance)
        self.leverage = leverage

    def should_create_target_for_insight(self, insight):
        return insight.weight is not None

    def determine_target_percent(self, activeInsights):
        result = {}

        for insight in activeInsights:
            result[insight] = (insight.direction if self.respect_portfolio_bias(insight) else InsightDirection.FLAT) * insight.weight * self.leverage
        
        return result