Overall Statistics |
Total Orders 50 Average Win 5.55% Average Loss -1.74% Compounding Annual Return 34.363% Drawdown 14.400% Expectancy 1.009 Start Equity 100000 End Equity 146635.25 Net Profit 46.635% Sharpe Ratio 1.613 Sortino Ratio 2.614 Probabilistic Sharpe Ratio 77.014% Loss Rate 52% Win Rate 48% Profit-Loss Ratio 3.19 Alpha 0.205 Beta 0.195 Annual Standard Deviation 0.203 Annual Variance 0.041 Information Ratio -0.697 Tracking Error 0.429 Treynor Ratio 1.675 Total Fees â‚®14283.54 Estimated Strategy Capacity â‚®1300000.00 Lowest Capacity Asset BTCUSDT 10B Portfolio Turnover 10.22% |
#region imports from AlgorithmImports import * import pandas as pd import copy import pickle import calendar import pytz # import jsonpickle #endregion class ReportManager(): # ========================================== # Constructor. Accepts algo Object # ========================================== def __init__(self, algo): self.algo = algo self.algo.OnEndOfAlgorithm = self.OnEndOfAlgorithm self.projectID = "17556876" # ================================================== def OnEndOfAlgorithm(self): closedTrades = self.algo.TradeBuilder.ClosedTrades # create an empty dataframe df = pd.DataFrame() # loop through the list of Trade objects and add one row for each trade object to the dataframe for trade in closedTrades: # Monday is 0 and Sunday is 6 pctReturn = round( ((trade.ExitPrice - trade.EntryPrice) / trade.EntryPrice),2) # Define the Eastern timezone eastern = pytz.timezone('US/Eastern') # Convert the original datetime to Eastern time tradeEntryTime = trade.EntryTime.astimezone(eastern) # dayOfWeek = tradeEntryTime.weekday() dayOfWeek = tradeEntryTime.strftime('%A') hourOfDay = tradeEntryTime.hour epochExit = calendar.timegm(trade.ExitTime.timetuple()) genericObject = { 'symbol' : str(trade.Symbol), 'entryPrice': str(trade.EntryPrice), 'quantity' : str(trade.Quantity), 'entryPrice': str(trade.EntryPrice), 'exitPrice': str(trade.ExitPrice), 'direction' : str(trade.Direction), 'duration' : str(trade.Duration.total_seconds()), 'pctReturn' : str(pctReturn), 'mae' : str(trade.MAE), 'mfe' : str(trade.MFE), 'exitTime' : str(trade.ExitTime), 'epochExit' : str(epochExit), 'hourOfDay' : str(hourOfDay), 'dayOfWeek' : str(dayOfWeek)} # # use the deepcopy function to clone the Trade object into a generic object # generic_object = copy.deepcopy(trade) # # convert the generic object to a dictionary # dictObject = dict(generic_object) # # create a TradeWithDict object using the properties of the Trade object # trade_with_dict = TradeWithDict(**trade.__dict__) # df = df.append(genericObject, ignore_index=True)'' # df = pd.concat([df, genericObject], ignore_index=True) genericObject_df = pd.DataFrame.from_records([genericObject]) df = pd.concat([df, genericObject_df], ignore_index=True) # JsonPickle Approach # --------------- # serialized = jsonpickle.encode(closedTrades) # self.algo.ObjectStore.SaveBytes(f"{self.projectID}/CompletedTrades", serialized) # storedObject = self.algo.ObjectStore.ReadBytes(f"{self.projectID}/CompletedTrades") # deserialized = jsonpickle.decode(bytearray(storedObject)) # Pickle Approach # --------------- # serialized = pickle.dumps(closedTrades) # self.algo.ObjectStore.SaveBytes(f"{self.projectID}/CompletedTrades", serialized) # storedObject = self.algo.ObjectStore.ReadBytes(f"{self.projectID}/CompletedTrades") # deserialized = pickle.loads(bytearray(storedObject)) # JSON Approach # ------------ # serialized = json.dumps(df) self.algo.ObjectStore.Save(f"{self.projectID}/CompletedTrades", df.to_json(date_unit='ns')) retrieved = self.algo.ObjectStore.Read(f"{self.projectID}/CompletedTrades") restored = pd.read_json(retrieved) pass ''' # Read back data from object store restored_history = pd.read_json(qb.ObjectStore.Read("data")) # Restore the indices restored_history['time'] = pd.to_datetime(restored_history['time']) restored_history['symbol'] = restored_history['symbol'].apply(lambda x: qb.Symbol(x)) restored_history.set_index(['symbol', 'time'], inplace=True) restored_history ''' # deserialized = json.loads(retrieved) # pass # self.algo.ObjectStore.SaveBytes('OS_signal_dens', pickle.dumps(self.signal_dens)) # https://www.quantconnect.com/forum/discussion/14176/object-store-upgrades-and-behavior-change/p1 # Add logic: only do this if we are NOT optimizing, and we are NOT live. # self.algo.ObjectStore.SaveBytes(f"{self.projectID}/CompletedTrades", closedTrades) # else: # if self.algo.ObjectStore.ContainsKey(str(self.spy)): # date = self.algo.ObjectStore.Read(str(self.spy)) # date = parser.parse(date) # that = self.algo.ObjectStore.ReadBytes(f"{self.projectID}/CompletedTrades") # so = 0 # class TradeWithDict(Trade): # @property # def __dict__(self): # # get a dictionary that contains the properties and their values # props = vars(self) # return props
#region imports from AlgorithmImports import * #endregion class TrailingStopHelper(): def __init__(self, algo, symbol, volaIndicator=None, trailStopCoeff=2, initialStopCoeff=1, activationCoeff=2): # Track state and indicators self.algo = algo self.symbol = symbol self.entryPrice = 0 self.ExitMessage = "" self.systemActivated = False self.volaIndicator = volaIndicator self.volaIndicator.Updated += self.OnATRIndicatorUpdated # Stop Loss States self.trailStopCoeff = trailStopCoeff self.initialStopCoeff = initialStopCoeff self.activationCoeff = activationCoeff self.ResetStopLosses() @property def lastPrice(self): if (self.symbol in self.algo.Securities \ and self.algo.Securities[self.symbol] is not None): return self.algo.Securities[self.symbol].Price return 0 ## ----------------------------------------------- def OnATRIndicatorUpdated(self, sender, updated): self.PlotCharts() # Trailing Stop Exit # This method updates the trailing stop # ============================================ def TrailingExitSignalFired(self): if( not self.volaIndicator.IsReady ): return False # If trailing stop is NOT set, get last price, and set it # -------------------------------------------------------- if( not self.stopsLossActivated ): self.highestPrice = self.lastPrice self.trailingStopLoss = self.lastPrice - (self.volaIndicator.Current.Value * self.trailStopCoeff) self.stopsLossActivated = True # Recursively call this function to check for stops # again, now that the trailing stop has been activated # and the stop loss value has been updated. # -------------------------------------------------- return self.TrailingExitSignalFired() # If trailing stop loss is activated, check if price closed below it. # If it did, then exit. If not, update the trailing stop loss. else: if self.PriceIsBelowStopLoss(): return True else: # If price has gone up if self.lastPrice > self.highestPrice: # If price is above the trail activation price, update trailing stop if self.lastPrice > self.activationPrice: self.highestPrice = self.lastPrice newTrailingStopLoss = self.highestPrice - (self.volaIndicator.Current.Value * self.trailStopCoeff) self.trailingStopLoss = max (self.trailingStopLoss, newTrailingStopLoss, self.activationPrice) # check again just in case price ends up below the new trailing stop level if self.PriceIsBelowStopLoss(): return True return False ## Check if price is below trailing stop loss or regular stop loss ## --------------------------------------------------------------- def PriceIsBelowStopLoss(self): if self.lastPrice > self.activationPrice: if( self.lastPrice < self.trailingStopLoss ): self.ExitMessage = "Trailing Stop Loss" return True else: if( self.lastPrice < self.initialStopLoss ): self.ExitMessage = "Initial Stop Loss" return True self.PlotCharts() return False ## Logic to run immediately after a new position is opened. ## We track entry price and set initial stop loss values ## --------------------------------------------------------- def Activate(self, entryPrice): self.entryPrice = entryPrice self.systemActivated = True self.SetInitialStops() return ## Set initial stop and activation level. Called after new position opened. ## ------------------------------------------------------------------------ def SetInitialStops(self): ## TODO: Use onOrderEvent to set this, because the actual price may be different self.entryPrice = self.lastPrice self.initialStopLoss = self.entryPrice - (self.volaIndicator.Current.Value * self.initialStopCoeff) self.activationPrice = self.entryPrice + (self.volaIndicator.Current.Value * self.activationCoeff) ## Logic to run immediately after a position is closed ## Reset exit message, stop loss values. ## --------------------------------------------------- def Deactivate(self): self.PlotCharts() self.ExitMessage = "No Exit Message" self.systemActivated = False self.ResetStopLosses() ## Reset stop losses ## ------------------------------------------------- def ResetStopLosses(self): self.stopsLossActivated = False self.initialStopLoss = 0 self.activationPrice = 0 self.trailingStopLoss = 0 ## Plot Price, Stop losses & activation levels ## ------------------------------------------- def PlotCharts(self): return self.algo.Plot(f"{self.symbol} Trailing stop", "Price", self.lastPrice) self.algo.Plot(f"{self.symbol} Trailing stop", "Initial Stop", self.initialStopLoss) self.algo.Plot(f"{self.symbol} Trailing stop", "Acivation Pt", self.activationPrice) self.algo.Plot(f"{self.symbol} Trailing stop", "TrailingStop", self.trailingStopLoss) return
from AlgorithmImports import * from ReportManager import * from TrailingStopHelper import * from io import StringIO import pandas as pd import pytz class CSVSignalImporter(QCAlgorithm): def Initialize(self): self.InitParams() self.InitBacktest() self.ImportSignals() self.InitIndicators() ## Initialize config parameters, state tracking,etc. ## ------------------------------------------------ def InitParams(self): # Data parameters self.timeCol = "date_time" self.timezone = pytz.timezone('UTC') self.newTimeCol = "NewTime" self.ticker = "BTCUSDT" # Trading System parameters self.slATRCoeff = int(self.get_parameter("slATRCoeff")) # 2 # ATR coefficient for stop loss self.rrRatio = float(self.get_parameter("rrRatio")) # 2 # risk-reward ratio for take profit self.sizeAcctPct = 1 # Pct of the cash available to use for trade self.threshold = float(self.get_parameter("thresh")) # Prediction threshold self.signal_exp = 30 # Signal expiry in seconds self.hold_dur = 4320 # Duration (in minutes) for which to hold position self.exitMethod = 4 # 0 exits after specified duration # 1 exits after specified duration + Stop loss # 2 exits after ATR TP/SL # 3 exits after ATR Trailing Stop # 4 exits after min holding period and ATR Trailing Stop self.useMLVola = (1 == int(self.get_parameter("useMLVola"))) # When meauring tp/sl, use vola from the ML features # State Tracking self.direction = None # Keep track of trade direction self.entryMsg = "" self.exitMsg = "" self.signalTime = "" self.mlVola = 0.0 ## Init backtest properties / configs / managers ## --------------------------------------------- def InitBacktest(self): # Dates self.SetStartDate(2022, 7, 17) # self.SetEndDate(2022, 8, 1) self.SetEndDate(2023, 11, 1) # Data self.SetAccountCurrency("USDT") self.SetBrokerageModel(BrokerageName.Kraken, AccountType.Margin) self.symbol = self.AddCrypto(self.ticker, Resolution.Minute, Market.KRAKEN).Symbol self.SetBenchmark(self.symbol) # Custom reporting self.ReportManager = ReportManager(self) ## Init indicators used for tracking volatility and trailing stops ## --------------------------------------------------------------- def InitIndicators(self): self.atr = self.ATR(self.symbol, 14, MovingAverageType.Simple, Resolution.Hour) trailStopCoeff = float(self.get_parameter("trailStopCoeff")) initialStopCoeff = float(self.get_parameter("initialStopCoeff")) activationCoeff = initialStopCoeff * self.rrRatio self.trailingStop = TrailingStopHelper(self,self.symbol,self.atr,trailStopCoeff,initialStopCoeff,) ## Logic for ingesting and parsing the CSV with model signals ## ---------------------------------------------------------- def ImportSignals(self): # tolu_apr_14_csv = self.Download("https://docs.google.com/spreadsheets/d/1c0yd2udMUEZiAPFQAJN6Kl683xHxWU928HNbv-WABXM/export?format=csv") # furkan_jun_14_csv = self.Download("https://docs.google.com/spreadsheets/d/1uho1Q1hgbHc4Av7KegT1Pz-ot_gPQ02XN6elkPSfpSY/export?format=csv") # furkan_jun_25_csv = self.Download("https://docs.google.com/spreadsheets/d/1H-FXAdPoMvvQocWOAPH-lvKWRfrbVfZSCzdAeT5Yku0/export?format=csv") # furkan_jun_26_csv = self.Download("https://docs.google.com/spreadsheets/d/1X7W7vamKfowN1wukF2VkX2-bCdmrnTc2Jzmz1rIpwyk/export?format=csv") furkan_jun_30_csv = self.Download("https://docs.google.com/spreadsheets/d/1ZcegNaSUgPNsxKkrBErG-YEkMzyHLfsZ3FKD7FIt86s/export?format=csv") signals_csv = furkan_jun_30_csv self.df = pd.read_csv(StringIO(signals_csv)) self.df[self.newTimeCol] = pd.to_datetime(self.df[self.timeCol], format='%Y-%m-%d %H:%M:%S') self.df[self.newTimeCol] = self.df[self.newTimeCol].dt.to_pydatetime() self.df[self.newTimeCol] = pd.to_datetime(self.df[self.newTimeCol]).dt.tz_localize('UTC') # Strip rows that dont meet our prediction threshold self.df = self.df[self.df["primary_prediction"].notna() & (self.df["primary_prediction"] >= self.threshold)] # Sort df by date -- shouldnt be necessary though self.df = self.df.sort_values(self.newTimeCol) # Custom handling for Tolu's signal CSV if 'Quantity' in self.df.columns: # Remove rows with negative quantity (sell signals) self.df = self.df[self.df['Quantity'] >= 0] ## System method, called every time a new bar comes in. ## Here we check for entry / exit signals, and clean up data ## ---------------------------------------------------------- def OnData(self, data: Slice): # First, make sure we have valid data coming in if (self.ticker in data ) and (data[self.ticker] is not None): currTimeStr = self.Time.strftime("%Y-%m-%d %H:%M:%S") current_time = self.Time.astimezone(self.timezone) # If not invested, check for entry signals if( not self.HasHoldings(self.symbol)): # If we have a signal, open position, set direction, set tops. if self.EntrySignalFired(): second_diff = (current_time - self.signalTime).total_seconds() self.SetHoldings(self.symbol, self.sizeAcctPct, tag=f"Signal fired {second_diff} seconds ago)") self.direction = PortfolioBias.LONG self.SetStopLossTakeProfit() # If invested, check for exit signals else: if self.ExitSignalFired(): self.LiquidateWithMsg(self.exitMsg) # self.Liquidate(tag=self.exitMsg) # remove expired signals (records older than x minutes ago). self.df = self.df[self.df[self.newTimeCol] >= (current_time - timedelta(seconds=self.signal_exp))] ## Returns true if we found a recent valid entry signal (that has not expired). ## ---------------------------------------------------------------------------- def EntrySignalFired(self): current_time = self.Time.astimezone(self.timezone) # Get most recent prediction record that hasnt expired (ie: within last x minutes) recent_df = self.df[self.df[self.newTimeCol] <= (current_time - timedelta(seconds=self.signal_exp))] if not recent_df.empty: for index, row in recent_df.iterrows(): rowTimeOne = (row[self.newTimeCol]) rowTime = pd.to_datetime(row[self.newTimeCol]).astimezone(self.timezone) # rowTimeStr = rowTime.strftime("%Y-%m-%d %H:%M:%S") # self.Debug(f"Entry Signal stamped {rowTimeStr} Fired On Day: {currTimeStr}") # Delete # rowTime = pd.to_datetime(row[self.newTimeCol]) # rowTime = rowTime.to_pydatetime() # rowTime = rowTime.astimezone(self.timezone) self.mlVola = row['vol'] self.signalTime = rowTime return True return False def SetStopLossTakeProfit(self): vola = self.mlVola self.EntryTime = self.Time closePrice = self.CurrentSlice[self.ticker].Close if (self.exitMethod == 1) or (self.exitMethod == 2) : if( self.useMLVola ): # Set take profit/stop loss - Using Volatiliy from ML features self.stopLoss = closePrice - (self.slATRCoeff * vola) self.takeProfit = closePrice + (self.slATRCoeff * vola * self.rrRatio) else: # Set take profit/stop loss - Using ATR-measured Volatiliy self.stopLoss = closePrice - (self.slATRCoeff * self.atr.Current.Value) self.takeProfit = closePrice + (self.slATRCoeff * self.atr.Current.Value * self.rrRatio) elif(self.exitMethod == 3): # Set Trailstop self.trailingStop.Activate(closePrice) # Check if exit signal has fired, due to # Take Profit, Stop Loss, or time duration def ExitSignalFired(self): # Exit based on time. ie: Held position for x minutes if self.exitMethod == 0: if self.HeldForMinimumDuration(): self.exitMsg = f"Exit After {self.hold_dur} minutes" return True # Exit based on time AND stop loss. elif self.exitMethod == 1: closePrice = self.CurrentSlice[self.ticker].Close if self.HeldForMinimumDuration(): self.exitMsg = f"Exit After {self.hold_dur} minutes" return True elif closePrice <= self.stopLoss: self.exitMsg = "Stop Loss Exit" return True # Take Profit / Stop loss exit elif self.exitMethod == 2: closePrice = self.CurrentSlice[self.ticker].Close if closePrice >= self.takeProfit: self.exitMsg = "Take Profit Exit" return True elif closePrice <= self.stopLoss: self.exitMsg = "Stop Loss Exit" return True # Trailing stop exit elif self.exitMethod == 3: if self.trailingStop.TrailingExitSignalFired(): self.exitMsg = self.trailingStop.ExitMessage return True # Trailing stop exit after Min Hold Duration elif self.exitMethod == 4: if (self.HeldForMinimumDuration()): if not self.trailingStop.systemActivated: self.trailingStop.Activate(self.securities[self.symbol].price) if self.trailingStop.TrailingExitSignalFired(): self.exitMsg = self.trailingStop.ExitMessage return True return False def HeldForMinimumDuration(self): return (self.Time - self.EntryTime) >= pd.Timedelta(minutes=self.hold_dur) # Convenience method to liquidate with a message def LiquidateWithMsg(self, exitReason): pnl = round(100* self.Portfolio[self.symbol].UnrealizedProfitPercent,2) biasText = 'Long' if (self.direction == PortfolioBias.LONG) else 'short' winlossText = 'win' if pnl > 0 else 'loss' self.LiquidateCrypto(symbol=self.symbol, tag=f"[{pnl}% {winlossText}] {exitReason} Exiting {biasText} position with ") # self.Liquidate(tag=f"{exitReason} Exiting {biasText} position with {pnl}% {winlossText}") self.trailingStop.Deactivate() def LiquidateCrypto(self, symbol, tag): crypto = self.securities[symbol] base_currency = crypto.base_currency # Avoid negative amount after liquidate quantity = min(crypto.holdings.quantity, base_currency.amount) # Round down to observe the lot size lot_size = crypto.symbol_properties.lot_size; quantity = (round(quantity / lot_size) - 1) * lot_size if self.is_valid_order_size(crypto, quantity): self.market_order(symbol, -quantity, tag=tag) self.debug("liquidated succesfully") else: self.debug(f"Invalid order size: {quantity}") # Brokerages have different order size rules # Binance considers the minimum volume (price x quantity): def is_valid_order_size(self, crypto, quantity): return abs(crypto.price * quantity) > crypto.symbol_properties.minimum_order_size def HasHoldings(self, symbol): ## TODO: Explore a better way to do this. Not clear how base_currency.amount should be used crypto = self.securities[symbol] base_currency = crypto.base_currency quantity = min(crypto.holdings.quantity, base_currency.amount) return (quantity > 0) # quantity = crypto.holdings.quantity # return abs(crypto.price * quantity) > crypto.symbol_properties.minimum_order_size