Overall Statistics |
Total Trades 45 Average Win 1.79% Average Loss -4.72% Compounding Annual Return -86.797% Drawdown 57.700% Expectancy -0.803 Net Profit -45.056% Sharpe Ratio -1.023 Probabilistic Sharpe Ratio 2.777% Loss Rate 86% Win Rate 14% Profit-Loss Ratio 0.38 Alpha -0.963 Beta -2.217 Annual Standard Deviation 0.675 Annual Variance 0.455 Information Ratio -0.723 Tracking Error 0.785 Treynor Ratio 0.311 Total Fees $45.00 Estimated Strategy Capacity $18000000.00 Lowest Capacity Asset TZA U7EC123NWZTX |
import clr clr.AddReference("System") clr.AddReference("QuantConnect.Algorithm") clr.AddReference("QuantConnect.Indicators") clr.AddReference("QuantConnect.Common") from System import * from QuantConnect import * from QuantConnect.Algorithm import * from QuantConnect.Indicators import * class QuantumCalibratedComputer(QCAlgorithm): def Initialize(self): self.SetTimeZone("America/New_York") self.SetBenchmark('SPY') self.SetStartDate(2022, 1, 1) #Set Start Date self.SetEndDate(2022, 4, 19) #Set End Date self.SetCash(2400) #Set Strategy Cash # https://github.com/QuantConnect/Lean/blob/master/Algorithm.Framework/Risk/TrailingStopRiskManagementModel.py self.SetRiskManagement(TrailingStopRiskManagementModel(0.25)) # atr_number = int(self.GetParameter('atr_number')) # pl_stop = float(self.GetParameter('pl_stop')) # breakout = int(self.GetParameter('breakout')) # slow, fast = int(self.GetParameter('slow_ma')), int(self.GetParameter('fast_ma')) self.symbols = { # 0symbol, 1per time, 2max_pos, 3atr_number, 4pl_stop, 5breakout # 6slow, 7fast, 8take_profit, 9 trailing_stop_price # 'psq': ['PSQ', 50, 100, 1.5, -0.03, 5, 120, 60, 0.3, None], # 'slv': ['SLV', 50, 100, 1.5, -0.03, 10, 120, 30, 0.3, None], # 'rwm': ['RWM', 50, 100, 1.5, -0.03, 5, 120, 60, 0.3, None], 'tza': ['TZA', 15, 45, 1.5, -0.1, 5, 120, 60, 0.16, None], 'sqqq': ['SQQQ', 15, 45, 1.5, -0.1, 5, 120, 60, 0.16, None], } # 1/4 self.fast, self.slow, self.atr = {}, {}, {} for key, symbol in self.symbols.items(): code = symbol[0] slow, fast = symbol[6], symbol[7] if key in ['psq', 'rwm', 'tza', 'sqqq', 'slv']: # 2/4 self.AddEquity(code, Resolution.Daily) else: raise RuntimeError(code + ' do not set market data ') self.fast[code] = self.SMA(code, fast, Resolution.Daily) self.slow[code] = self.SMA(code, slow, Resolution.Daily) self.atr[code] = self.ATR(code, 15, MovingAverageType.Simple, Resolution.Daily) self.PlotIndicator('atr' + code, self.atr[code]) self.PlotIndicator('fast' + code, self.fast[code]) self.PlotIndicator('slow' + code, self.slow[code]) self.SetWarmup(timedelta(300)) self.Log('for each iterate ' + str(self.Time)) if 'psq' in self.symbols: self.Schedule.On(self.DateRules.EveryDay(self.symbols['psq'][0]), self.TimeRules.AfterMarketOpen(self.symbols['psq'][0], 10), self.run_psq) if 'slv' in self.symbols: self.Schedule.On(self.DateRules.EveryDay(self.symbols['slv'][0]), self.TimeRules.AfterMarketOpen(self.symbols['slv'][0], 10), self.run_slv) if 'rwm' in self.symbols: self.Schedule.On(self.DateRules.EveryDay(self.symbols['rwm'][0]), self.TimeRules.AfterMarketOpen(self.symbols['rwm'][0], 10), self.run_rwm) if 'tza' in self.symbols: self.Schedule.On(self.DateRules.EveryDay(self.symbols['tza'][0]), self.TimeRules.AfterMarketOpen(self.symbols['tza'][0], 45), self.run_tza) if 'sqqq' in self.symbols: self.Schedule.On(self.DateRules.EveryDay(self.symbols['sqqq'][0]), self.TimeRules.AfterMarketOpen(self.symbols['sqqq'][0], 45), self.run_sqqq) # 3/4 self.Log(' Initialize schedule end ' + str(self.Time)) def run_psq(self): self.Log('run_psq ' + str(self.Time)) self.run(self.symbols['psq']) def run_slv(self): self.Log('run_slv ' + str(self.Time)) self.run(self.symbols['slv']) def run_rwm(self): self.Log('run_rwm ' + str(self.Time)) self.run(self.symbols['rwm']) def run_tza(self): self.Log('run_tza ' + str(self.Time)) self.run(self.symbols['tza']) def run_sqqq(self): self.Log('run_sqqq ' + str(self.Time)) self.run(self.symbols['sqqq']) def run_govt(self): self.Log('run_govt ' + str(self.Time)) self.run(self.symbols['govt']) # 4/4 # future trading ondata def OnData(self, data): for code in self.symbols: a = 1 # just do nothing # self.Log("data contains key " + self.symbols[code][0] + ' ' # + str(data.ContainsKey(self.symbols[code][0])) + ' ' + str(self.Time)) # stock trading logic def run(self, symbol): self.Log('run ' + str(symbol)+ ' ' + str(self.Time)) code = symbol[0] pos_per_time = symbol[1] max_pos = symbol[2] atr_number = symbol[3] pl_stop = symbol[4] breakout = symbol[5] take_profit_rate = symbol[8] opens = self.History(self.Symbol(code), breakout, Resolution.Daily)['open'] uptrend, downtrend = False, False if len(opens) >= 4: uptrend = opens[-1] > opens[-2] and opens[-2] > opens[-3] and opens[-3] > opens[-4] downtrend = opens[-1] < opens[-2] and opens[-2] < opens[-3] and opens[-3] < opens[-4] if not self.slow[code].IsReady or not self.fast[code].IsReady or not self.atr[code].IsReady: s = 'indicators is not ready ' + ' ' + str(self.Time) self.Log(s) return p = self.Securities[code].Price ma_slow = round(self.slow[code].Current.Value, 5) ma_fast = round(self.fast[code].Current.Value, 5) atr = round(self.atr[code].Current.Value, 5) highest = max(opens) lowest = min(opens) pos = self.Portfolio[code].Quantity avg_open = self.Portfolio[code].AveragePrice pl = 0 if avg_open > 0: if pos > 0: pl = (p - avg_open) / avg_open if pos < 0: pl = (avg_open - p) / avg_open s = (code + ' price ' + self.f(p) + ' position ' + self.f(pos) + ' avg_open ' + self.f(avg_open) + ' atr ' + self.f(atr) + ' ma_slow ' + self.f(ma_slow) + ' ma_fast ' + self.f(ma_fast) + ' highest ' + self.f(highest) + ' lowest ' + self.f(lowest) + ' pl ' + self.f(pl) + ' trailing_stop ' + self.f(symbol[9]) + ' uptrend ' + str(uptrend) + ' downtrend ' + str(downtrend) + ' ' + str(self.Time)) self.Log(s) is_long, is_short = pos > 0, pos < 0 if not is_long and not is_short: symbol[9] = None if symbol[9] is None and is_long: symbol[9] = avg_open * (1 + pl_stop) symbol[9] = max(symbol[9], avg_open - atr * atr_number) if symbol[9] is None and is_short: symbol[9] = avg_open * (1 - pl_stop) symbol[9] = min(symbol[9], avg_open + atr * atr_number) if is_long and pl > take_profit_rate and downtrend: self.Liquidate(code) self.Log(code + ' liquidate long by take profit when retreat') is_long = False pl = -1 symbol[9] = None if is_short and pl > take_profit_rate and uptrend: self.Liquidate(code) self.Log(code + ' liquidate short by take profit when retreat') is_short = False pl = -1 symbol[9] = None if is_long and pl > take_profit_rate and (highest - p) > 2 * atr: self.Liquidate(code) self.Log(code + ' liquidate long by take profit when retreat atr') is_long = False pl = -1 symbol[9] = None if is_short and pl > take_profit_rate and (p - lowest) > 2 * atr: self.Liquidate(code) self.Log(code + ' liquidate short by take profit when retreat atr') is_short = False pl = -1 symbol[9] = None if is_long: stop_loss_p = avg_open - atr * atr_number symbol[9]= max(symbol[9], stop_loss_p) if p - avg_open > atr * atr_number: symbol[9] = max(symbol[9], avg_open) if p - avg_open > atr * atr_number * 2: symbol[9] = max(symbol[9], avg_open + atr * atr_number) if p - avg_open > atr * atr_number * 4: symbol[9] = max(symbol[9], avg_open + atr * atr_number * 2) if p - avg_open > atr * 2: symbol[9] = max(symbol[9], p - atr * 2) if p <= symbol[9]: self.Liquidate(code) self.Log(code + ' liquidate long by stop loss price ' + self.f(symbol[9])) is_long = False pl = -1 symbol[9] = None if pl < pl_stop: self.Liquidate(code) self.Log(code + ' liquidate long by pl stop') is_long = False pl = -1 symbol[9] = None if is_short: stop_loss_p = avg_open + atr * atr_number symbol[9] = min(symbol[9], stop_loss_p) if avg_open - p > atr * atr_number: symbol[9] = min(symbol[9], avg_open) if avg_open - p > atr * atr_number * 2: symbol[9] = min(symbol[9], avg_open - atr*atr_number) if avg_open - p > atr * atr_number * 4: symbol[9] = min(symbol[9], avg_open - atr*atr_number*2) if avg_open - p > atr * 2: symbol[9] = min(symbol[9], p + atr * 2) if p >= symbol[9]: self.Liquidate(code) self.Log(code + ' liquidate short by stop loss price ' + self.f(symbol[9])) is_short = False pl = -1 symbol[9] = None if pl < pl_stop: self.Liquidate(code) self.Log(code + ' liquidate short by pl stop') is_short = False pl = -1 symbol[9] = None if is_long and (p < ma_fast or p < ma_slow): self.Liquidate(code) self.Log(code + ' liquidate long by ma fast or slow') is_long = False pl = -1 symbol[9] = None if is_short and (p > ma_fast or p > ma_slow): self.Liquidate(code) self.Log(code + ' liquidate short by ma fast or slow') is_short = False pl = -1 symbol[9] = None if (p >= ma_slow and p >= ma_fast and p >= highest and abs(pos) < max_pos and pl >= 0): self.MarketOrder(code, pos_per_time) s = code + ' open long' self.Log(s) if (p <= ma_slow and p <= ma_fast and p <= lowest and abs(pos) < max_pos and pl >= 0): self.MarketOrder(code, pos_per_time * -1) s = code + ' open short' self.Log(s) def OnEndOfDay(self, data): pass def OnOrderEvent(self, orderEvent): order = self.Transactions.GetOrderById(orderEvent.OrderId) if orderEvent.Status == OrderStatus.Filled: self.Log("{0}: {1}: {2}".format(self.Time, order.Type, orderEvent)) def f(self, f): if f is None: return 'None' if type(f) != float: f = float(f) s = str('{:.6f}'.format(f)) if s.endswith('.000000'): s = s[:-7] return s