Overall Statistics |
Total Trades 175 Average Win 18.58% Average Loss -7.00% Compounding Annual Return 178.304% Drawdown 57.000% Expectancy 0.722 Net Profit 2975.910% Sharpe Ratio 2.32 Probabilistic Sharpe Ratio 83.874% Loss Rate 53% Win Rate 47% Profit-Loss Ratio 2.65 Alpha 1.452 Beta 0.062 Annual Standard Deviation 0.628 Annual Variance 0.395 Information Ratio 2.122 Tracking Error 0.648 Treynor Ratio 23.643 Total Fees $26826.03 |
import numpy as np import matplotlib.pyplot as plt import matplotlib.patches as patches import talib class renko: def __init__(self): self.source_prices = [] self.renko_prices = [] self.renko_directions = [] # Setting brick size. Auto mode is preferred, it uses history def set_brick_size(self, HLC_history = None, auto = True, brick_size = 10.0): if auto == True: self.brick_size = self.__get_optimal_brick_size(HLC_history.iloc[:, [0, 1, 2]]) else: self.brick_size = brick_size return self.brick_size def __renko_rule(self, last_price): # Get the gap between two prices gap_div = int(float(last_price - self.renko_prices[-1]) / self.brick_size) is_new_brick = False start_brick = 0 num_new_bars = 0 # When we have some gap in prices if gap_div != 0: # Forward any direction (up or down) if (gap_div > 0 and (self.renko_directions[-1] > 0 or self.renko_directions[-1] == 0)) or (gap_div < 0 and (self.renko_directions[-1] < 0 or self.renko_directions[-1] == 0)): num_new_bars = gap_div is_new_brick = True start_brick = 0 # Backward direction (up -> down or down -> up) elif np.abs(gap_div) >= 2: # Should be double gap at least num_new_bars = gap_div num_new_bars -= np.sign(gap_div) start_brick = 2 is_new_brick = True self.renko_prices.append(self.renko_prices[-1] + 2 * self.brick_size * np.sign(gap_div)) self.renko_directions.append(np.sign(gap_div)) #else: #num_new_bars = 0 if is_new_brick: # Add each brick for d in range(start_brick, np.abs(gap_div)): self.renko_prices.append(self.renko_prices[-1] + self.brick_size * np.sign(gap_div)) self.renko_directions.append(np.sign(gap_div)) return num_new_bars # Getting renko on history def build_history(self, prices): if len(prices) > 0: # Init by start values self.source_prices = prices self.renko_prices.append(prices.iloc[0]) self.renko_directions.append(0) # For each price in history for p in self.source_prices[1:]: self.__renko_rule(p) return len(self.renko_prices) # Getting next renko value for last price def do_next(self, last_price): if len(self.renko_prices) == 0: self.source_prices.append(last_price) self.renko_prices.append(last_price) self.renko_directions.append(0) return 1 else: self.source_prices.append(last_price) return self.__renko_rule(last_price) # Simple method to get optimal brick size based on ATR def __get_optimal_brick_size(self, HLC_history, atr_timeperiod = 14): brick_size = 0.0 # If we have enough of data if HLC_history.shape[0] > atr_timeperiod: brick_size = np.median(talib.ATR(high = np.double(HLC_history.iloc[:, 0]), low = np.double(HLC_history.iloc[:, 1]), close = np.double(HLC_history.iloc[:, 2]), timeperiod = atr_timeperiod)[atr_timeperiod:]) return brick_size def evaluate(self, method = 'simple'): balance = 0 sign_changes = 0 price_ratio = len(self.source_prices) / len(self.renko_prices) if method == 'simple': for i in range(2, len(self.renko_directions)): if self.renko_directions[i] == self.renko_directions[i - 1]: balance = balance + 1 else: balance = balance - 2 sign_changes = sign_changes + 1 if sign_changes == 0: sign_changes = 1 score = balance / sign_changes if score >= 0 and price_ratio >= 1: score = np.log(score + 1) * np.log(price_ratio) else: score = -1.0 return {'balance': balance, 'sign_changes:': sign_changes, 'price_ratio': price_ratio, 'score': score} def get_renko_prices(self): return self.renko_prices def get_renko_directions(self): return self.renko_directions def plot_renko(self, col_up = 'g', col_down = 'r'): fig, ax = plt.subplots(1, figsize=(20, 10)) ax.set_title('Renko chart') ax.set_xlabel('Renko bars') ax.set_ylabel('Price') # Calculate the limits of axes ax.set_xlim(0.0, len(self.renko_prices) + 1.0) ax.set_ylim(np.min(self.renko_prices) - 3.0 * self.brick_size, np.max(self.renko_prices) + 3.0 * self.brick_size) # Plot each renko bar for i in range(1, len(self.renko_prices)): # Set basic params for patch rectangle col = col_up if self.renko_directions[i] == 1 else col_down x = i y = self.renko_prices[i] - self.brick_size if self.renko_directions[i] == 1 else self.renko_prices[i] height = self.brick_size # Draw bar with params ax.add_patch( patches.Rectangle( (x, y), # (x,y) 1.0, # width self.brick_size, # height facecolor = col ) ) plt.show()
from datetime import datetime, timedelta from clr import AddReference AddReference("System") AddReference("QuantConnect.Algorithm") AddReference("QuantConnect.Common") AddReference("QuantConnect.Indicators") from System import * from QuantConnect import * from QuantConnect.Indicators import * from QuantConnect.Data import * from QuantConnect.Data.Market import * from QuantConnect.Data.Custom import * from QuantConnect.Algorithm import * from QuantConnect.Python import * import pyrenko import pandas as pd import scipy.optimize as opt from scipy.stats import iqr import numpy as np class RenkoStrategy(QCAlgorithm): def Initialize(self): self.SetStartDate(2017,1,1) #Set Start Date self.SetEndDate(2020,5,5) #Set End Date self.SetCash(10000) self.AddCrypto("BTCUSD", Resolution.Daily) self.SetBrokerageModel(BrokerageName.Bitfinex, AccountType.Margin) # Create Renko object self.renko_obj = pyrenko.renko() self.last_brick_size = 0.0 self.Schedule.On(self.DateRules.EveryDay(), self.TimeRules.At(12, 0), self.LiquidateUnrealizedLosses) def OnData(self, data): if len(self.renko_obj.get_renko_prices()) == 0: def evaluate_renko(brick, history, column_name): self.renko_obj = pyrenko.renko() self.renko_obj.set_brick_size(brick_size = brick, auto = False) self.renko_obj.build_history(prices = history) return self.renko_obj.evaluate()[column_name] self.renko_obj = pyrenko.renko() # I look up history close price in an hourly time frame for the last 15 days history = self.History(self.Symbol("BTCUSD"), 360, Resolution.Hour) history = pd.Series(history.close) # Get daily absolute returns diffs = history.diff(24).abs() diffs = diffs[~np.isnan(diffs)] # Calculate IQR of daily returns iqr_diffs = np.percentile(diffs, [50, 80]) # Find the optimal brick size opt_bs = opt.fminbound(lambda x: -evaluate_renko(brick = x, history = history, column_name = 'score'), iqr_diffs[0], iqr_diffs[1], disp=0) # Build the model #self.Debug(str(self.Time) + " " + 'Rebuilding Renko: ' + str(opt_bs)) self.last_brick_size = opt_bs self.renko_obj.set_brick_size(brick_size = opt_bs, auto = False) self.renko_obj.build_history(prices = history) #Open a position self.SetHoldings("BTCUSD", self.renko_obj.get_renko_directions()[-1]) else: last_price = self.History(TradeBar, self.Symbol("BTCUSD"), 10, Resolution.Daily) last_close = last_price['close'].tail(1) last_close = pd.Series(last_close) prev = self.renko_obj.get_renko_prices() prev_dir = self.renko_obj.get_renko_directions()[-1] num_created_bars = self.renko_obj.do_next(last_close) if num_created_bars != 0: self.Log('New Renko bars created') self.Log('last price: ' + str(last_close[0])) #self.Log('previous Renko price: ' + str(prev)) self.Log('current Renko price: ' + str(self.renko_obj.get_renko_prices()[-1])) self.Log('direction: ' + str(prev_dir)) self.Log('brick size: ' + str(self.renko_obj.brick_size)) if np.sign(self.Portfolio["BTCUSD"].Quantity * self.renko_obj.get_renko_directions()[-1]) == -1: self.Liquidate("BTCUSD") self.renko_obj = pyrenko.renko() def LiquidateUnrealizedLosses(self): # if we overcome certain percentage of unrealized losses, liquidate''' if (self.Portfolio.TotalUnrealizedProfit / self.Portfolio.TotalPortfolioValue) > 0.25: self.Log("Liquidated due to unrealized losses at: {0}".format(self.Time)) self.Liquidate() self.renko_obj = pyrenko.renko()