Overall Statistics |
Total Trades 1354 Average Win 0.66% Average Loss -0.90% Compounding Annual Return 93.321% Drawdown 61.000% Expectancy 0.200 Net Profit 136.642% Sharpe Ratio 1.436 Probabilistic Sharpe Ratio 47.168% Loss Rate 31% Win Rate 69% Profit-Loss Ratio 0.73 Alpha 1.302 Beta -1.159 Annual Standard Deviation 0.819 Annual Variance 0.671 Information Ratio 1.257 Tracking Error 0.85 Treynor Ratio -1.016 Total Fees $0.00 Estimated Strategy Capacity $860000.00 Lowest Capacity Asset EURGBP 8G |
class CustomSlope: def __init__(self, algorithm, name, period, symbol, atr): self.Name = name self.Time = datetime.min self.IsReady = False self.multiplier = 3 self.symbol = symbol self._atr = atr self._final_upperband = RollingWindow[float](2) self._final_lowerband = RollingWindow[float](2) self._supertrend = RollingWindow[float](2) self._close = RollingWindow[float](3) warmUpData = self.algorithm.History(symbol, 20, Resolution.Daily) if not warmUpData.empty: for bar in warmUpData.loc[symbol, :].itertuples(): tradebar = TradeBar(bar.Index, symbol, bar.open, bar.high, bar.low, bar.close, bar.volume) self._close.Add(bar.close) if not self._atr.IsReady: continue _basic_upperband = ((bar.high + bar.low)/2 + self.multiplier * self._atr.Current.Value) _basic_lowerband = ((bar.high + bar.low)/2 - self.multiplier * self._atr.Current.Value) if self._final_lowerband.IsReady and self._final_upperband.IsReady: if (_basic_upperband < self._final_upperband[0]) or (self._close[1] > self._final_upperband[0]): self._final_upperband.Add(_basic_upperband) else: self._final_upperband.Add(self._final_upperband[0]) if (_basic_lowerband > self._final_lowerband[0]) or (self._close[1] < self._final_lowerband[0]): self._final_lowerband.Add(_basic_lowerband) else: self._final_lowerband.Add(self._final_lowerband[0]) else: self._final_upperband.Add(0) self._final_lowerband.Add(0) if self._supertrend.IsReady: if (self._supertrend[0] == self._final_upperband[1]) and (self._close[0] < self._final_upperband[0]): self._supertrend.Add(self._final_upperband[0]) if (self._supertrend[0] == self._final_upperband[1]) and (self._close[0] > self._final_upperband[0]): self._supertrend.Add(self._final_lowerband[0]) if (self._supertrend[0] == self._final_lowerband[1]) and (self._close[0] > self._final_lowerband[0]): self._supertrend.Add(self._final_lowerband[0]) if (self._supertrend[0] == self._final_lowerband[1]) and (self._close[0] < self._final_lowerband[0]): self._supertrend.Add(self._final_upperband[0]) else: self._supertrend.Add(0) else: self.algorithm.Debug(f'{str(symbol)} warmUpData is empty') def __repr__(self): return "{0} -> IsReady: {1}. Time: {2}. Value: {3}".format(self.Name, self.IsReady, self.Time, self.Value) def Update(self, high, low, close): # Need to do this at the end, so that self._close.Add(close) self.IsReady = self._supertrend.IsReady if self.IsReady: # Update rolling windows _basic_upperband = ((high + low)/2 + self.multiplier * self._atr.Current.Value) _basic_lowerband = ((high + low)/2 - self.multiplier * self._atr.Current.Value) if self._final_lowerband.IsReady and self._final_upperband.IsReady: if (_basic_upperband < self._final_upperband[0]) or (self._close[1] > self._final_upperband[0]): self._final_upperband.Add(_basic_upperband) else: self._final_upperband.Add(self._final_upperband[0]) if (_basic_lowerband > self._final_lowerband[0]) or (self._close[1] < self._final_lowerband[0]): self._final_lowerband.Add(_basic_lowerband) else: self._final_lowerband.Add(self._final_lowerband[0]) if self._supertrend.IsReady: if (self._supertrend[0] == self._final_upperband[1]) and (self._close[0] < self._final_upperband[0]): self._supertrend.Add(self._final_upperband[0]) if (self._supertrend[0] == self._final_upperband[1]) and (self._close[0] > self._final_upperband[0]): self._supertrend.Add(self._final_lowerband[0]) if (self._supertrend[0] == self._final_lowerband[1]) and (self._close[0] > self._final_lowerband[0]): self._supertrend.Add(self._final_lowerband[0]) if (self._supertrend[0] == self._final_lowerband[1]) and (self._close[0] < self._final_lowerband[0]): self._supertrend.Add(self._final_upperband[0]) else: self._supertrend.Add(0) return self.IsReady @property def IsReady(self): if self._close.IsReady and self._final_lowerband.IsReady and self._final_upperband.IsReady: return True return False def Update2(self, time, value): self.queue.appendleft(value) count = len(self.queue) self.Time = time self.IsReady = count == self.queue.maxlen #### start here the indicator calulation if self.IsReady: y = np.log(self.queue) x = [range(len(y))] slope, intercept, r_value, p_value, std_err = stats.linregress(x, y) self.Slope = slope * 10000 # value is very small an will display 0 if not multiplyed self.Intercept = intercept self.R_value = r_value * 10 self.P_value = p_value self.Std_err = std_err #### finish the custom indicator return self.IsReady
import pandas as pd from datetime import datetime, timedelta import re from enum import Enum class ReferenceType(Enum): AVERAGE = 1 PREVIOUS_CLOSE = 2 class VolatilityType(Enum): STANDARD_DEVIATION = 1 HIGH_LOW = 2 # This trigger very little orders, ineffective. ATR = 3 class PositionAmountType(Enum): AVERAGE = 1 INCREMENTAL = 2 DOUBLEUP = 3 class FXMomentumAlgorithm(QCAlgorithm): def Initialize(self): self.SetStartDate(2020, 12, 28) # self.SetStartDate(2017, 12, 28) self.SetEndDate(datetime.now()) self.SetCash(100000) self.resolution = Resolution.Minute self._grid_number = 10 self._leverage_ratio = 50 self._reference_price = None self._std = None self._stop_loss = None self._cash_preserve_ratio = 0.99 self._cash_per_position = None self.IS_READY = False self.REFERENCE_PRICE_METHOD = ReferenceType.PREVIOUS_CLOSE self.POSITION_AMOUNT_METHOD = PositionAmountType.AVERAGE self.VOLATILITY_METHOD = VolatilityType.STANDARD_DEVIATION self.VOLATILITY_MULTIPLIER = 2 self.STOP_LOSS_MODE = False self.STOP_LOSS_RESCALE = False self.STOP_LOSS_MULTIPLIER = 1.2 # 10% more on upper side and 10% less on lower side # self.pair = 'GBPCHF' # self.pair = 'AUDCAD' # self.pair = 'GBPCAD' # self.pair = 'USDJPY' # self.pair = 'AUDJPY' self.pair = 'EURGBP' # self.pair = 'GBPUSD' self.forex = self.AddForex( self.pair, self.resolution, Market.Oanda, True, self._leverage_ratio ) # self.Debug(f'Our Leverage: {self.forex.MarginModel.GetLeverage(self.forex)}') ################################################ # Adding Schedule event self.Schedule.On( self.DateRules.MonthStart(self.pair), self.TimeRules.AfterMarketOpen(self.pair, 0), self.openMarket ) self.Schedule.On( self.DateRules.MonthEnd(self.pair), self.TimeRules.BeforeMarketClose(self.pair, 5), self.closeMarket ) def OnData(self, data): if not self.IS_READY or self.pair not in data: return if not self.STOP_LOSS_MODE: return # Stop loss if data[self.pair].Close < (self._reference_price - self._stop_loss) or data[self.pair].Close > (self._reference_price + self._stop_loss): # self.Debug(f'{self.Time}: Break out and clean the open orders') # Liquidate all positions and close all open orders self.Liquidate(self.pair) if self.STOP_LOSS_RESCALE: self.openMarket() def setParameters(self): history = self.History([self.pair], 93, Resolution.Daily) history = history.droplevel(0) # Set up reference price if self.REFERENCE_PRICE_METHOD == ReferenceType.PREVIOUS_CLOSE: # Get close of previous day self._reference_price = self.History( [self.pair], 2880, Resolution.Minute ).droplevel(0).close[-1] elif self.REFERENCE_PRICE_METHOD == ReferenceType.AVERAGE: # Daily moving average self.Debug('Average mean') self._reference_price = history.close[-20:].mean() # Cash amount per position if self.POSITION_AMOUNT_METHOD == PositionAmountType.AVERAGE: # Leveraged Equal weighted self._cash_per_position = (self.Portfolio.MarginRemaining * self._leverage_ratio * self._cash_preserve_ratio) / (self._grid_number * 2) elif self.POSITION_AMOUNT_METHOD == PositionAmountType.INCREMENTAL: # Leveraged incremental weighted self._cash_per_position = (self.Portfolio.MarginRemaining * self._leverage_ratio * self._cash_preserve_ratio) / (sum(range(1, self._grid_number+1)) * 2) elif self.POSITION_AMOUNT_METHOD == PositionAmountType.DOUBLEUP: # Leveraged double up weighted self._cash_per_position = (self.Portfolio.MarginRemaining * self._leverage_ratio * self._cash_preserve_ratio) / ((2**self._grid_number - 1) * 2) # Get volatility of previous day if self.VOLATILITY_METHOD == VolatilityType.STANDARD_DEVIATION: # The 2 * Standardization = 95% self._std = history.close.std() * self.VOLATILITY_MULTIPLIER elif self.VOLATILITY_METHOD == VolatilityType.HIGH_LOW: # The high and low of the previous market open day self._std = abs(history.high.max() - history.low.min()) elif self.VOLATILITY_METHOD == VolatilityType.ATR: # ATR self._std = ta.ATR(history.high, history.low, history.close, timeperiod=60)[-1] self._stop_loss = self._std * self.STOP_LOSS_MULTIPLIER def openMarket(self): # self.Debug(f'{self.Time} Market open. Order number ({len(self.Transactions.GetOpenOrders(self.pair))})') self.setParameters() # Cancel all open orders before creating new orders self.Liquidate(self.pair) # Set up the grid limit order for i in range(1, self._grid_number + 1): qty = int(self._cash_per_position * self.get_qty_multiplier(i) / (self._reference_price - self._std / self._grid_number * i)) order = self.LimitOrder( self.pair, qty, round(self._reference_price - self._std / self._grid_number * i, 5), f'Long{i}' ) qty = int(self._cash_per_position * self.get_qty_multiplier(i) / (self._reference_price + self._std / self._grid_number * i)) order = self.LimitOrder( self.pair, -qty, round(self._reference_price + self._std / self._grid_number * i, 5), f'Short{i}' ) self.IS_READY = True def closeMarket(self): # self.Debug(f'{self.Time} Market close') # Liquidate all positions by the end of the day self.Liquidate(self.pair) # self.Debug(f'{self.Time} Market close. Order number ({len(self.Transactions.GetOpenOrders(self.pair))})') def OnOrderEvent(self, orderEvent): order = self.Transactions.GetOrderById(orderEvent.OrderId) if orderEvent.Status == OrderStatus.Filled: # self.Debug( # "{0}: {1} ({2})".format( # self.Time, # orderEvent, # order.Tag # ) # ) match = re.match(r'(.*)(\d+)(.*)$', order.Tag) if not match: return if match.group(1) == 'Long': i = int(match.group(2)) if match.group(3) == '-Liquidate': qty = int(self._cash_per_position * self.get_qty_multiplier(i) / (self._reference_price - self._std / self._grid_number * i)) self.LimitOrder( self.pair, qty, round(self._reference_price - self._std / self._grid_number * i, 5), f'Long{match.group(2)}' ) else: if (i - 1) == 0: self.LimitOrder( self.pair, -abs(order.Quantity), round(self._reference_price, 5), f'Long{match.group(2)}-Liquidate' ) elif i > 1: self.LimitOrder( self.pair, -abs(order.Quantity), round(self._reference_price - self._std / self._grid_number * (i - 1), 5), f'Long{match.group(2)}-Liquidate' ) elif match.group(1) == 'Short': i = int(match.group(2)) if match.group(3) == '-Liquidate': qty = int(self._cash_per_position * self.get_qty_multiplier(i) / (self._reference_price + self._std / self._grid_number * i)) self.LimitOrder( self.pair, -qty, round(self._reference_price + self._std / self._grid_number * i, 5), f'Short{match.group(2)}' ) openOrders = self.Transactions.GetOpenOrders(self.pair) for order in openOrders: if order.Tag in [f'Short{match.group(2)}-Liquidate', f'Short{match.group(2)}-Stoploss']: self.Transactions.CancelOrder(order.Id) else: if (i - 1) == 0: self.LimitOrder( self.pair, abs(order.Quantity), round(self._reference_price, 5), f'Short{match.group(2)}-Liquidate' ) else: self.LimitOrder( self.pair, abs(order.Quantity), round(self._reference_price + self._std / self._grid_number * (i - 1), 5), f'Short{match.group(2)}-Liquidate' ) self.IS_READY = False def get_qty_multiplier(self, grid_number): if self.POSITION_AMOUNT_METHOD == PositionAmountType.AVERAGE: # Even weighted return 1 elif self.POSITION_AMOUNT_METHOD == PositionAmountType.INCREMENTAL: # Incremental return grid_number elif self.POSITION_AMOUNT_METHOD == PositionAmountType.DOUBLEUP: # Double up weighted if not isinstance(grid_number, (int)): grid_number = int(grid_number) return 2**(grid_number - 1)