Overall Statistics |
Total Orders 182 Average Win 1.19% Average Loss -2.28% Compounding Annual Return -9.391% Drawdown 32.400% Expectancy 0.003 Start Equity 1000.0 End Equity 905.84 Net Profit -9.416% Sharpe Ratio -0.584 Sortino Ratio -0.61 Probabilistic Sharpe Ratio 7.483% Loss Rate 34% Win Rate 66% Profit-Loss Ratio 0.52 Alpha -0.115 Beta -0.028 Annual Standard Deviation 0.19 Annual Variance 0.036 Information Ratio 0.075 Tracking Error 0.656 Treynor Ratio 4.023 Total Fees â‚®516.25 Estimated Strategy Capacity â‚®150000000000.00 Lowest Capacity Asset BTCUSDT 10B Portfolio Turnover 49.76% |
#region imports from AlgorithmImports import * import pandas as pd import copy import pickle import calendar import pytz # import jsonpickle #endregion class ReportManager(): # ========================================== # Constructor. Accepts algo Object # ========================================== def __init__(self, algo): self.algo = algo self.algo.OnEndOfAlgorithm = self.OnEndOfAlgorithm self.projectID = "17556876" # ================================================== def OnEndOfAlgorithm(self): closedTrades = self.algo.TradeBuilder.ClosedTrades # create an empty dataframe df = pd.DataFrame() # loop through the list of Trade objects and add one row for each trade object to the dataframe for trade in closedTrades: # Monday is 0 and Sunday is 6 pctReturn = round( ((trade.ExitPrice - trade.EntryPrice) / trade.EntryPrice),2) # Define the Eastern timezone eastern = pytz.timezone('US/Eastern') # Convert the original datetime to Eastern time tradeEntryTime = trade.EntryTime.astimezone(eastern) # dayOfWeek = tradeEntryTime.weekday() dayOfWeek = tradeEntryTime.strftime('%A') hourOfDay = tradeEntryTime.hour epochExit = calendar.timegm(trade.ExitTime.timetuple()) genericObject = { 'symbol' : str(trade.Symbol), 'entryPrice': str(trade.EntryPrice), 'quantity' : str(trade.Quantity), 'entryPrice': str(trade.EntryPrice), 'exitPrice': str(trade.ExitPrice), 'direction' : str(trade.Direction), 'duration' : str(trade.Duration.total_seconds()), 'pctReturn' : str(pctReturn), 'mae' : str(trade.MAE), 'mfe' : str(trade.MFE), 'exitTime' : str(trade.ExitTime), 'epochExit' : str(epochExit), 'hourOfDay' : str(hourOfDay), 'dayOfWeek' : str(dayOfWeek)} # # use the deepcopy function to clone the Trade object into a generic object # generic_object = copy.deepcopy(trade) # # convert the generic object to a dictionary # dictObject = dict(generic_object) # # create a TradeWithDict object using the properties of the Trade object # trade_with_dict = TradeWithDict(**trade.__dict__) # df = df.append(genericObject, ignore_index=True)'' # df = pd.concat([df, genericObject], ignore_index=True) genericObject_df = pd.DataFrame.from_records([genericObject]) df = pd.concat([df, genericObject_df], ignore_index=True) # JsonPickle Approach # --------------- # serialized = jsonpickle.encode(closedTrades) # self.algo.ObjectStore.SaveBytes(f"{self.projectID}/CompletedTrades", serialized) # storedObject = self.algo.ObjectStore.ReadBytes(f"{self.projectID}/CompletedTrades") # deserialized = jsonpickle.decode(bytearray(storedObject)) # Pickle Approach # --------------- # serialized = pickle.dumps(closedTrades) # self.algo.ObjectStore.SaveBytes(f"{self.projectID}/CompletedTrades", serialized) # storedObject = self.algo.ObjectStore.ReadBytes(f"{self.projectID}/CompletedTrades") # deserialized = pickle.loads(bytearray(storedObject)) # JSON Approach # ------------ # serialized = json.dumps(df) self.algo.ObjectStore.Save(f"{self.projectID}/CompletedTrades", df.to_json(date_unit='ns')) retrieved = self.algo.ObjectStore.Read(f"{self.projectID}/CompletedTrades") restored = pd.read_json(retrieved) pass ''' # Read back data from object store restored_history = pd.read_json(qb.ObjectStore.Read("data")) # Restore the indices restored_history['time'] = pd.to_datetime(restored_history['time']) restored_history['symbol'] = restored_history['symbol'].apply(lambda x: qb.Symbol(x)) restored_history.set_index(['symbol', 'time'], inplace=True) restored_history ''' # deserialized = json.loads(retrieved) # pass # self.algo.ObjectStore.SaveBytes('OS_signal_dens', pickle.dumps(self.signal_dens)) # https://www.quantconnect.com/forum/discussion/14176/object-store-upgrades-and-behavior-change/p1 # Add logic: only do this if we are NOT optimizing, and we are NOT live. # self.algo.ObjectStore.SaveBytes(f"{self.projectID}/CompletedTrades", closedTrades) # else: # if self.algo.ObjectStore.ContainsKey(str(self.spy)): # date = self.algo.ObjectStore.Read(str(self.spy)) # date = parser.parse(date) # that = self.algo.ObjectStore.ReadBytes(f"{self.projectID}/CompletedTrades") # so = 0 # class TradeWithDict(Trade): # @property # def __dict__(self): # # get a dictionary that contains the properties and their values # props = vars(self) # return props
# region imports from AlgorithmImports import * # endregion # Your New Python File class TradeUtils(): def __init__(self, algo): self.algo = algo # Convenience method to liquidate with a message def LiquidateWithMsg(self, symbol, exitReason): pnl = round(100 * self.algo.Portfolio[symbol].UnrealizedProfitPercent,2) biasText = 'Long' if (self.algo.Portfolio[symbol].IsLong) else 'Short' winlossText = 'win' if pnl > 0 else 'loss' orderNote = f"[{pnl}% {winlossText}] {exitReason} | Exiting {biasText} position" # If Crypto, call LiquidateMarketOrder if self.algo.Securities[symbol].Type == SecurityType.Crypto: self.LiquidateMarketOrder(symbol=symbol, tag=orderNote) else: self.algo.liquidate(symbol, tag=orderNote) ## Liquidate via market order. Necessary for crypto def LiquidateMarketOrder(self, symbol, tag): crypto = self.algo.securities[symbol] base_currency = crypto.base_currency # Avoid negative amount after liquidate quantity = max(crypto.holdings.quantity, base_currency.amount) # Round down to observe the lot size lot_size = crypto.symbol_properties.lot_size; quantity = (round(quantity / lot_size) - 1) * lot_size if self.is_valid_order_size(crypto, quantity): # self.algo.debug(f"------------ [START] Market Order: liquidation start") self.algo.debug(f" Liquidating: {quantity} units of {symbol.Value}") self.algo.market_order(symbol, -quantity, tag=tag) self.algo.debug(f"Market Order liquidation was Successful") self.algo.debug(f" Leftover: {crypto.holdings.quantity} units of {symbol.Value}") self.algo.debug(f"------------ [END] Market Order liquidation") if( abs(crypto.holdings.quantity) > lot_size): self.LiquidateMarketOrder(symbol, tag="reomving trailing coins") else: self.algo.debug(f"ERROR ERRROR ---- ") self.algo.debug(f"ERROR ERRROR Invalid order size: {quantity}") # Brokerages have different order size rules # Binance considers the minimum volume (price x quantity): def is_valid_order_size(self, crypto, quantity): return abs(crypto.price * quantity) > crypto.symbol_properties.minimum_order_size def HasHoldings(self, symbol): if self.algo.Securities[symbol].Type == SecurityType.Crypto: ## TODO: Explore a better way to do this. Not clear how base_currency.amount should be used min_lot_size = self.algo.securities[symbol].symbol_properties.lot_size asset = self.algo.securities[symbol] base_currency = asset.base_currency # quantity = min(asset.holdings.quantity, base_currency.amount) quantity = abs(asset.holdings.quantity) # abs(self.securities[self.symbol].symbol_properties.lot_size - self.securities[self.symbol].holdings.quantity) > 0.000000001 return abs(quantity) >= self.algo.securities[symbol].symbol_properties.minimum_order_size # return abs(quantity - min_lot_size) > min_lot_size else: return self.algo.Portfolio[symbol].Invested
#region imports from AlgorithmImports import * #endregion class TrailingStopHelper(): def __init__(self, algo, symbol, volaIndicator=None, trailStopCoeff=2, initialStopCoeff=1, activationCoeff=2): # Track state and indicators self.algo = algo self.symbol = symbol self.entryPrice = 0 self.ExitMessage = "" self.systemActivated = False self.volaIndicator = volaIndicator self.volaIndicator.Updated += self.OnATRIndicatorUpdated # Stop Loss States self.trailStopCoeff = trailStopCoeff self.initialStopCoeff = initialStopCoeff self.activationCoeff = activationCoeff self.ResetStopLosses() @property def lastPrice(self): if (self.symbol in self.algo.Securities \ and self.algo.Securities[self.symbol] is not None): return self.algo.Securities[self.symbol].Price return 0 ## ----------------------------------------------- def OnATRIndicatorUpdated(self, sender, updated): self.PlotCharts() # Trailing Stop Exit # This method updates the trailing stop # ============================================ def TrailingExitSignalFired(self): if( not self.volaIndicator.IsReady ): return False # If trailing stop is NOT set, get last price, and set it # -------------------------------------------------------- if( not self.stopsLossActivated ): self.highestPrice = self.lastPrice self.trailingStopLoss = self.lastPrice - (self.volaIndicator.Current.Value * self.trailStopCoeff) self.stopsLossActivated = True # Recursively call this function to check for stops # again, now that the trailing stop has been activated # and the stop loss value has been updated. # -------------------------------------------------- return self.TrailingExitSignalFired() # If trailing stop loss is activated, check if price closed below it. # If it did, then exit. If not, update the trailing stop loss. else: if self.PriceIsBelowStopLoss(): return True else: # If price has gone up if self.lastPrice > self.highestPrice: # If price is above the trail activation price, update trailing stop if self.lastPrice > self.activationPrice: self.highestPrice = self.lastPrice newTrailingStopLoss = self.highestPrice - (self.volaIndicator.Current.Value * self.trailStopCoeff) self.trailingStopLoss = max (self.trailingStopLoss, newTrailingStopLoss, self.activationPrice) # check again just in case price ends up below the new trailing stop level if self.PriceIsBelowStopLoss(): return True return False ## Check if price is below trailing stop loss or regular stop loss ## --------------------------------------------------------------- def PriceIsBelowStopLoss(self): if self.lastPrice > self.activationPrice: if( self.lastPrice < self.trailingStopLoss ): self.ExitMessage = "Trailing Stop Loss" return True else: if( self.lastPrice < self.initialStopLoss ): self.ExitMessage = "Initial Stop Loss" return True self.PlotCharts() return False ## Logic to run immediately after a new position is opened. ## We track entry price and set initial stop loss values ## --------------------------------------------------------- def Activate(self, entryPrice,initialStopValue=0, activationValue=0 ): self.entryPrice = entryPrice self.systemActivated = True self.SetInitialStops(initialStopValue, activationValue) return ## Set initial stop and activation level. Called after new position opened. ## ------------------------------------------------------------------------ def SetInitialStops(self, initialStopValue=0, activationValue=0): ## TODO: Use onOrderEvent to set this, because the actual price may be different self.entryPrice = self.lastPrice if initialStopValue == 0: self.initialStopLoss = self.entryPrice - (self.volaIndicator.Current.Value * self.initialStopCoeff) else: self.initialStopLoss = initialStopValue if activationValue == 0: self.activationPrice = self.entryPrice + (self.volaIndicator.Current.Value * self.activationCoeff) else: self.activationPrice = activationValue ## Logic to run immediately after a position is closed ## Reset exit message, stop loss values. ## --------------------------------------------------- def Deactivate(self): self.PlotCharts() self.ExitMessage = "No Exit Message" self.systemActivated = False self.ResetStopLosses() ## Reset stop losses ## ------------------------------------------------- def ResetStopLosses(self): self.stopsLossActivated = False self.initialStopLoss = 0 self.activationPrice = 0 self.trailingStopLoss = 0 ## Plot Price, Stop losses & activation levels ## ------------------------------------------- def PlotCharts(self): return self.algo.Plot(f"{self.symbol} Trailing stop", "Price", self.lastPrice) self.algo.Plot(f"{self.symbol} Trailing stop", "Initial Stop", self.initialStopLoss) self.algo.Plot(f"{self.symbol} Trailing stop", "Acivation Pt", self.activationPrice) self.algo.Plot(f"{self.symbol} Trailing stop", "TrailingStop", self.trailingStopLoss) return
from AlgorithmImports import * from ReportManager import * from TradeUtils import * from TrailingStopHelper import * from io import StringIO import pandas as pd import pytz class CSVSignalImporter(QCAlgorithm): def Initialize(self): self.InitUtils() self.InitParams() self.InitBacktest() self.ImportSignals() self.InitIndicators() ## Initialize Utils ## ------------------------------------------------ def InitUtils(self): self.TradeUtils = TradeUtils(self) ## Initialize config parameters, state tracking,etc. ## ------------------------------------------------ def InitParams(self): # Data parameters self.timeCol = "date_time" self.timezone = pytz.timezone('UTC') self.newTimeCol = "NewTime" self.ticker = "BTCUSDT" # Trading System parameters self.slATRCoeff = int(self.get_parameter("slATRCoeff")) # 2 # ATR coefficient for stop loss self.rrRatio = float(self.get_parameter("rrRatio")) # 2 # risk-reward ratio for take profit self.sizeAcctPct = 1 # Pct of the cash available to use for trade self.threshold = float(self.get_parameter("thresh")) # Prediction threshold self.signal_exp = 30 # Signal expiry in seconds self.hold_dur = 4320 # Duration (in minutes) for which to hold position self.exitMethod = 2 # 0 exits after specified duration # 1 exits after specified duration + Stop loss # 2 exits after ATR TP/SL # 3 exits after ATR Trailing Stop # 4 exits after min holding period and ATR Trailing Stop # 5 exits after ATR Trailing Stop BUT with ML vola initial levels # Check trailing stop breach only when ATR updates (eg hourly) versus every minute. self.checkTrailingStopOnATRUpdateOnly = False self.useMLVola = (1 == int(self.get_parameter("useMLVola"))) # When meauring tp/sl, use vola from the ML features # State Tracking self.direction = None # Keep track of trade direction self.entryMsg = "" self.exitMsg = "" self.signalTime = "" self.mlVola = 0.0 ## Init backtest properties / configs / managers ## --------------------------------------------- def InitBacktest(self): # Dates self.SetStartDate(2022, 5, 1) self.SetEndDate(2023, 5, 1) # Data self.SetAccountCurrency("USDT") self.SetCash(1000) self.SetBrokerageModel(BrokerageName.KRAKEN, AccountType.Margin) self.symbol = self.AddCrypto(self.ticker, Resolution.Minute, Market.KRAKEN).Symbol self.SetBenchmark(self.symbol) # Custom reporting self.ReportManager = ReportManager(self) ## Init indicators used for tracking volatility and trailing stops ## --------------------------------------------------------------- def InitIndicators(self): self.atr = self.ATR(self.symbol, 14, MovingAverageType.Simple, Resolution.Hour) # Uncomment this if you want trailing stop exists to if(self.checkTrailingStopOnATRUpdateOnly): self.atr.updated += self.onTrailATRUpdateCheckForExits trailStopCoeff = float(self.get_parameter("trailStopCoeff")) initialStopCoeff = float(self.get_parameter("initialStopCoeff")) activationCoeff = initialStopCoeff * self.rrRatio self.trailingStop = TrailingStopHelper(self,self.symbol,self.atr, trailStopCoeff,initialStopCoeff,activationCoeff) ## Logic for ingesting and parsing the CSV with model signals ## ---------------------------------------------------------- def ImportSignals(self): furkan_jul_30_csv = self.Download("https://docs.google.com/spreadsheets/d/1nO1iHe9Eff4uLoUmwGj7SICph-rwS6tmEWNUz9Fj4lk/export?format=csv") signals_csv = furkan_jul_30_csv self.df = pd.read_csv(StringIO(signals_csv)) self.df[self.newTimeCol] = pd.to_datetime(self.df[self.timeCol], format='%Y-%m-%d %H:%M:%S') self.df[self.newTimeCol] = self.df[self.newTimeCol].dt.to_pydatetime() self.df[self.newTimeCol] = pd.to_datetime(self.df[self.newTimeCol]).dt.tz_localize('UTC') # Strip rows that dont meet our prediction threshold self.df = self.df[self.df["predicted_probabilities"].notna() & (self.df["predicted_probabilities"] >= self.threshold)] # Sort df by date -- shouldnt be necessary though self.df = self.df.sort_values(self.newTimeCol) # Custom handling for Tolu's signal CSV if 'Quantity' in self.df.columns: # Remove rows with negative quantity (sell signals) self.df = self.df[self.df['Quantity'] >= 0] ## System method, called every time a new bar comes in. ## Here we check for entry / exit signals, and clean up data ## ---------------------------------------------------------- def OnData(self, data: Slice): # First, make sure we have valid data coming in if (self.ticker in data ) and (data[self.ticker] is not None): currTimeStr = self.Time.strftime("%Y-%m-%d %H:%M:%S") current_time = self.Time.astimezone(self.timezone) # If not invested, check for entry signals if( not self.TradeUtils.HasHoldings(self.symbol)): # If we have a signal, open position, set direction, set tops. if self.EntrySignalFired(): second_diff = (current_time - self.signalTime).total_seconds() signal_vola = round(self.mlVola * 100,2) self.SetHoldings(self.symbol, self.sizeAcctPct, tag=f"Signal fired {second_diff} seconds ago). Signal vola: {signal_vola}% ") self.direction = PortfolioBias.LONG self.SetStopLossTakeProfit() # If invested, check for exit signals else: if self.exitMethod == 3 or self.exitMethod == 4: if not (self.checkTrailingStopOnATRUpdateOnly): ## If we're using trail stops, we might check here for exits (every minuite) ## otherwise, we only check when the trail stop updates (hourly) self.ExecuteExitSignals() else: # If not using trailing stops, always check here for exits self.ExecuteExitSignals() # remove expired signals (records older than x minutes ago). self.df = self.df[self.df[self.newTimeCol] >= (current_time - timedelta(seconds=self.signal_exp))] # When trail ATR updates, check for exit def onTrailATRUpdateCheckForExits(self,indicator,indicator_data): if(self.atr.IsReady): if( self.TradeUtils.HasHoldings(self.symbol)): self.ExecuteExitSignals() def ExecuteExitSignals(self): if self.ExitSignalFired(): self.TradeUtils.LiquidateWithMsg(self.symbol, self.exitMsg) # self.Liquidate(tag=self.exitMsg) ## Returns true if we found a recent valid entry signal (that has not expired). ## ---------------------------------------------------------------------------- def EntrySignalFired(self): current_time = self.Time.astimezone(self.timezone) # Get most recent prediction record that hasnt expired (ie: within last x minutes) recent_df = self.df[self.df[self.newTimeCol] <= (current_time - timedelta(seconds=self.signal_exp))] if not recent_df.empty: for index, row in recent_df.iterrows(): rowTimeOne = (row[self.newTimeCol]) rowTime = pd.to_datetime(row[self.newTimeCol]).astimezone(self.timezone) # rowTimeStr = rowTime.strftime("%Y-%m-%d %H:%M:%S") # self.Debug(f"Entry Signal stamped {rowTimeStr} Fired On Day: {currTimeStr}") # Delete # rowTime = pd.to_datetime(row[self.newTimeCol]) # rowTime = rowTime.to_pydatetime() # rowTime = rowTime.astimezone(self.timezone) try: self.mlVola = row['trgt'] except: self.mlVola = 0 self.Debug(f"Volatility value not found for timestamp: {rowTime}") self.Quit() self.signalTime = rowTime return True return False def SetStopLossTakeProfit(self): vola = self.mlVola self.EntryTime = self.Time closePrice = self.CurrentSlice[self.ticker].Close if (self.exitMethod == 1) or (self.exitMethod == 2) : if( self.useMLVola ): # Set take profit/stop loss - Using Volatiliy from ML features self.takeProfit = closePrice + (closePrice * vola) self.stopLoss = closePrice - (closePrice * vola / self.rrRatio) else: # Set take profit/stop loss - Using ATR-measured Volatiliy self.stopLoss = closePrice - (self.slATRCoeff * self.atr.Current.Value) self.takeProfit = closePrice + (self.slATRCoeff * self.atr.Current.Value * self.rrRatio) elif(self.exitMethod == 3): # Set Trailstop self.trailingStop.Activate(closePrice) elif(self.exitMethod == 5): # Set Trailstop with vola initialStopLoss = closePrice - (closePrice * vola / self.rrRatio) activationLevel = closePrice + (closePrice * vola) self.trailingStop.Activate(closePrice, initialStopLoss, activationLevel) # Check if exit signal has fired, due to # Take Profit, Stop Loss, or time duration def ExitSignalFired(self): # Exit based on time. ie: Held position for x minutes if self.exitMethod == 0: if self.HeldForMinimumDuration(): self.exitMsg = f"Exit After {self.hold_dur} minutes" return True # Exit based on time AND stop loss. elif self.exitMethod == 1: closePrice = self.CurrentSlice[self.ticker].Close if self.HeldForMinimumDuration(): self.exitMsg = f"Exit After {self.hold_dur} minutes" return True elif closePrice <= self.stopLoss: self.exitMsg = "Stop Loss Exit" return True # Take Profit / Stop loss exit elif self.exitMethod == 2: closePrice = self.CurrentSlice[self.ticker].Close if closePrice >= self.takeProfit: self.exitMsg = "Take Profit Exit" return True elif closePrice <= self.stopLoss: self.exitMsg = "Stop Loss Exit" return True # Trailing stop exit elif self.exitMethod == 3 or self.exitMethod == 5: if self.trailingStop.TrailingExitSignalFired(): self.exitMsg = self.trailingStop.ExitMessage return True # Trailing stop exit after Min Hold Duration elif self.exitMethod == 4: if (self.HeldForMinimumDuration()): if not self.trailingStop.systemActivated: self.trailingStop.Activate(self.securities[self.symbol].price) if self.trailingStop.TrailingExitSignalFired(): self.exitMsg = self.trailingStop.ExitMessage return True return False def HeldForMinimumDuration(self): return (self.Time - self.EntryTime) >= pd.Timedelta(minutes=self.hold_dur) # quantity <self.algo.securities[symbol].symbol_properties.lot_size # def HasHoldings(self, symbol): # ## TODO: Explore a better way to do this. Not clear how base_currency.amount should be used # crypto = self.securities[symbol] # base_currency = crypto.base_currency # quantity = min(crypto.holdings.quantity, base_currency.amount) # return (quantity > 0) # quantity = crypto.holdings.quantity # return abs(crypto.price * quantity) > crypto.symbol_properties.minimum_order_size