Overall Statistics |
Total Orders 776 Average Win 0.09% Average Loss -0.09% Compounding Annual Return 24.754% Drawdown 0.500% Expectancy 0.101 Start Equity 100000 End Equity 103305.47 Net Profit 3.305% Sharpe Ratio 4.296 Sortino Ratio 7.496 Probabilistic Sharpe Ratio 99.571% Loss Rate 45% Win Rate 55% Profit-Loss Ratio 0.99 Alpha 0.103 Beta 0.035 Annual Standard Deviation 0.024 Annual Variance 0.001 Information Ratio 0.884 Tracking Error 0.101 Treynor Ratio 2.956 Total Fees $1.05 Estimated Strategy Capacity $9300000.00 Lowest Capacity Asset DE R735QTJ8XC9X Portfolio Turnover 255.41% |
#region imports from AlgorithmImports import * #endregion from scipy.stats import linregress import math import numpy as np class Consolidator: limits = False def __init__(self, s1, s2, algo, smooth, z, ratio): self.s1 = s1 self.s2 = s2 self.algo = algo self.hedge_ratio = ratio self.smooth = smooth self.z = z self.child = s1 self.parent = s2 self.s1_ask = None self.s1_bid = None self.s2_bid = None self.s2_ask = None self.warmed_up = False self.S1 = Identity('S1') self.S2 = Identity('S2') self.sym_str = f'{self.s1}/{self.s2}' cons = self.algo.res self.is_invested = None self.order_tickets = [] self.algo.RegisterIndicator(self.s1, self.S1, cons) #MAYBE dont use any consolidator? Just subscribe to self.algo.res? self.algo.RegisterIndicator(self.s2, self.S2, cons) # self.hedge_ratio = 1.0 # DEFAULT -- Null equivalent. # try: # self.GetHedgeRatio() # except: # self.algo.Debug(f'Error -- Cannot get hedge ratio (YET)') self.series = IndicatorExtensions.Minus(self.S1, IndicatorExtensions.Times( self.S2, self.hedge_ratio)) self.bb = BollingerBands(self.smooth, self.z, MovingAverageType.Exponential) n_pairs = len(self.algo.runs) per_symbol = .9 / n_pairs # Take approx 2x leverage, little below self.per_symbol = per_symbol # This will not work with ensemble self.long_targets = [PortfolioTarget(self.s1, per_symbol), PortfolioTarget(self.s2, -per_symbol)] self.short_targets = [PortfolioTarget(self.s1, -per_symbol), PortfolioTarget(self.s2, per_symbol)] self.flat_targets = [PortfolioTarget(self.s1, 0.0), PortfolioTarget(self.s2, 0.0)] try: self.WarmUp() except: self.algo.Debug(f'Cannot Run Warmup (No idea Why)') pass def WarmUp(self): n = max(1000, self.smooth + 10) history = self.algo.History([self.s1, self.s2], n, self.algo.res) hist = history.unstack(level=0).close hist.dropna(inplace=True) for tm, row in hist.iterrows(): value = row[self.s1] - row[self.s2] * self.hedge_ratio self.bb.Update(tm, value) self.warmed_up = True def GetHedgeRatio(self, history = None): if history is None: try: history = self.algo.History([self.s1, self.s2], self.algo.fit_period, self.algo.fit_res) hist = history.unstack(level=0).close hist.dropna(inplace=True) p1 = hist[self.s1] p2 = hist[self.s2] except: self.algo.Debug(f' --------- CANNOT get History, therefore hedge ratio ------------- ') return False else: p1 = history.loc[self.s1].close p2 = history.loc[self.s2].close # s1 is officially child # s2 is officially parent # so -- s1 is y # -- s2 is x # -- why doesn't this make a huge difference for these pairs? weird. if self.algo._SWAP_XY: reg = linregress(p1, p2) else: reg = linregress(p2,p1) # Proper order is (x,y) self.hedge_ratio = reg.slope self.algo.Debug(f'Ratio Fit --> {self.s1} - {self.hedge_ratio} * {self.s2}') return True def OnData(self, data): qc = self.algo if (not data.Bars.ContainsKey(self.s1)) or (not data.Bars.ContainsKey(self.s2)): return for symbol, quote_bar in data.QuoteBars.items(): if symbol == self.s1: self.s1_bid = quote_bar.Bid.Close self.s1_ask = quote_bar.Ask.Close self.s1_time = quote_bar.EndTime if symbol == self.s2: self.s2_bid = quote_bar.Bid.Close self.s2_ask = quote_bar.Ask.Close self.s2_time = quote_bar.EndTime if self.hedge_ratio == 1: self.GetHedgeRatio() # Ensure on same event, of both products. if self.s1_time != self.s2_time: return # Ensure warmed up, if not self.bb.IsReady: return serie = self.series.Current.Value self.bb.Update(self.algo.Time, serie) if self.bb.IsReady: if self.hedge_ratio != 1.0: self.EntryLogic() # if self.algo.plot: # self.PlotSpread() @property def QuoteReady(self): return self.s1_ask != None and self.s1_bid != None and self.s2_ask != None and self.s2_bid != None def EntryLogic(self): if not self.bb.IsReady: return if not self.series.IsReady: return if not self.QuoteReady: return serie = self.series.Current.Value if self.hedge_ratio > 0: buy_price = self.s1_ask - self.hedge_ratio * self.s2_bid sell_price = self.s1_bid - self.hedge_ratio * self.s2_ask else: buy_price = self.s1_ask - self.hedge_ratio * self.s2_ask sell_price = self.s1_bid - self.hedge_ratio * self.s2_bid # This is based on buying spread. quantity_s1 = self.algo.CalculateOrderQuantity(self.s1, self.per_symbol) quantity_s2 = self.algo.CalculateOrderQuantity(self.s2, self.per_symbol * np.sign(self.hedge_ratio)) # # if it is not invested, see if there is an entry point if not self.is_invested: # if our portfolio is bellow the lower band, enter long if buy_price < self.bb.LowerBand.Current.Value: self.algo.SetHoldings(self.long_targets, tag=f"LE -- {self.sym_str}: {buy_price} < {self.bb.LowerBand.Current.Value}") # This enables ensemble -- but behaves differently? # q1 = quantity_s1 # q2 = quantity_s2 # self.algo.MarketOrder(self.s1, q1) # self.algo.MarketOrder(self.s2, q2) # self.pos_s1 = q1 # self.pos_s2 = q2 self.is_invested = 'long' if self.algo.db_lvl >= 2: self.algo.Debug(f'{self.sym_str} {buy_price} < {self.bb.LowerBand.Current.Value}') if self.algo.db_lvl >= 1: self.algo.Debug(f'LE : {self.sym_str}') # if our portfolio is above the upper band, go short if sell_price > self.bb.UpperBand.Current.Value: self.algo.SetHoldings(self.short_targets, tag=f"SE -- {self.sym_str}: {sell_price} > {self.bb.UpperBand.Current.Value}") # q1 = -1 * quantity_s1 # q2 = -1 * quantity_s2 # self.algo.MarketOrder(self.s1, q1) # self.algo.MarketOrder(self.s2, q2) # self.pos_s1 = q1 # self.pos_s2 = q2 self.is_invested = 'short' if self.algo.db_lvl >= 2: self.algo.Debug(f'{self.sym_str} {sell_price} > {self.bb.UpperBand.Current.Value}') if self.algo.db_lvl >= 1: self.algo.Debug(f'SE: {self.sym_str}') # if it is invested in something, check the exiting signal (when it crosses the mean) elif self.is_invested == 'long': if sell_price > self.bb.MiddleBand.Current.Value: self.algo.SetHoldings(self.flat_targets, tag=f"LX -- {self.sym_str}: {sell_price} > {self.bb.MiddleBand.Current.Value}") # self.algo.MarketOrder(self.s1, self.pos_s1 * -1) # self.algo.MarketOrder(self.s2, self.pos_s2 * -1) # self.pos_s1 = 0 # self.pos_s2 = 0 self.is_invested = None if self.algo.db_lvl >= 2: self.algo.Debug(f'{self.sym_str} {sell_price} > {self.bb.MiddleBand.Current.Value}') if self.algo.db_lvl >= 1: self.algo.Debug(f'LX {self.sym_str}') elif self.is_invested == 'short': if buy_price < self.bb.MiddleBand.Current.Value: self.algo.SetHoldings(self.flat_targets, tag=f"SX -- {self.sym_str}: {buy_price} < {self.bb.MiddleBand.Current.Value}") # self.algo.MarketOrder(self.s1, self.pos_s1 * -1) # self.algo.MarketOrder(self.s2, self.pos_s2 * -1) # self.pos_s1 = 0 # self.pos_s2 = 0 self.is_invested = None if self.algo.db_lvl >= 2: self.algo.Debug(f'{self.sym_str} {buy_price} < {self.bb.MiddleBand.Current.Value}') if self.algo.db_lvl >= 1: self.algo.Debug(f'SX {self.sym_str}') def PlotSpread(self): # if self.algo.db_lvl >= 2: self.algo.Debug(f'{self.sym_str} {serie} < {self.bb.LowerBand.Current.Value}') self.algo.Plot("Strategy Equity", f"{self.sym_str}", serie) self.algo.Plot("Strategy Equity", f"{self.sym_str}_Upper", self.bb.UpperBand.Current.Value) self.algo.Plot("Strategy Equity", f"{self.sym_str}_Middle", self.bb.MiddleBand.Current.Value) self.algo.Plot("Strategy Equity", f"{self.sym_str}_Lower", self.bb.LowerBand.Current.Value) self.algo.Debug(f'Spread Plotted...') # region QuoteLogic def SpreadLimitOrder(self, dir=1): s1 = self.s1 s2 = self.s2 ps = self.per_symbol if not self.QuoteReady: return p1 = self.algo.Portfolio[s1].Price p2 = self.algo.Portfolio[s2].Price if dir == 1: q1 = self.algo.CalculateOrderQuantity(s1, ps) q2 = self.algo.CalculateOrderQuantity(s2, -ps * np.sign(self.hedge_ratio)) p1 = self.s1_ask p2 = self.s2_bid if self.hedge_ratio > 0 else self.s2_ask else: q1 = self.algo.CalculateOrderQuantity(s1, -ps) q2 = self.algo.CalculateOrderQuantity(s2, ps * np.sign(self.hedge_ratio)) p1 = self.s1_bid p2 = self.s2_ask if self.hedge_ratio > 0 else self.s2_bid self.s1_ticket = self.algo.LimitOrder(s1, q1, p1) self.s2_ticket = self.algo.LimitOrder(s2, q2, p2) self.order_tickets.append(self.s1_ticket) self.order_tickets.append(self.s2_ticket) @property def OrderIds(self): if not self.order_tickets: return [] return [i.OrderId for i in self.order_tickets] def OnParentUpdate(self, order: OrderEvent): order = self.algo.Transactions.GetOrderById(orderEvent.OrderId) def OnChildUpdate(self, order: OrderEvent): order = self.algo.Transactions.GetOrderById(orderEvent.OrderId) # if orderEvent.Status == OrderStatus.Filled: # self.Debug(f"{self.Time}: {order.Type}: {orderEvent}") def OnOrderUdate(self, order: OrderEvent): """ Treated as a delegate method, called via Algo.main, if routed via id to this instance https://www.quantconnect.com/docs/v2/writing-algorithms/trading-and-orders/order-events#01-Introduction """ order = self.algo.Transactions.GetOrderById(orderEvent.OrderId) if order.Symbol == inst.child: inst.OnChildUpdate(order) if order.Symbol == inst.parent: inst.OnParentUpdate(order) # endregion
#region imports from AlgorithmImports import * from scipy.stats import linregress import math from Consolidator import Consolidator from datetime import timedelta from enum import Enum #endregion class Run: def __init__(self, child: str, parent: str, smooth: int, z: float, ratio: float): self.child = child self.parent = parent self.smooth = smooth self.z = z self.ratio = ratio @classmethod def base(cls, child: str, parent: str): return cls(child, parent, 150, 1.5) # How often do we re-fit the hedge ratio. class FitFrequency(Enum): DAILY = 0 WEEKLY = 1 MONTHLY = 2 class YXTaker(QCAlgorithm): # Fit parameters. fit_period = 500 fit_res = Resolution.Hour fit_freq = FitFrequency.WEEKLY # This will fit on 500 hours -- just what I was using to begin with. _SWAP_XY = False # Makes no sense, but for some reason swapping this works? # Model Parameters. (defaults) res = Resolution.Minute # Could do Tick/ Second... bb_len = 120 n_sig = .8 # Params overridden by run in Runs -- below. runs = [ Run('EWC','EWA', 500, 1.5, 1.5), Run('XOM','CVX', 150, 3.5, .75), Run('QQQ','XLK', 150, 2, 2.1), # Run('AMAT','XLC', 500, 3, 3) Run("CAT",'DE', 500, 3, 1.5), # Fuck yeah. # Run("WFC",'MS', 1000, 2, .63), # Works more of LATE! earlier, eh. Run("V",'MA', 350, 2, .58), # Run("BLK", "C", 350, 2, 13), # Run("WMT","TGT", 250, 1, .33), # Horrible pair -- so interesting. ] # Debug parameters. db_lvl = 2 plot = False def Initialize(self): self.SetStartDate(2024,3,15) self.SetCash(100000) self.AddEquity('SPY') self._universe_bullshit = [[i.child, i.parent] for i in self.runs] self.SafeOpt() self.Pairs = {} # self.SetWarmup(timedelta(days=5)) _added = {} # for _s1, _s2 in self.Universe: for i in self.runs: try: if i.child not in _added: s1 = self.AddEquity(i.child, self.res).Symbol _added[i.child] = s1 else: s1 = _added[i.child] if i.parent not in _added: s2 = self.AddEquity(i.parent, self.res).Symbol _added[i.parent] = s2 else: s2 = _added[i.parent] # Regardless of how added, we now have symbols, and tickers -- good to go. self.Pairs[(i.child, i.parent)] = Consolidator(s1, s2, self, i.smooth, i.z, i.ratio) except: self.Debug(f'Cannot Add {i}') for symbol, security in self.Securities.items(): security.SetSlippageModel(ConstantSlippageModel(0)) security.SetFeeModel(MakerTakerModel()) # region Re-Fit Callbacks # if self.fit_freq == FitFrequency.DAILY: # self.Schedule.On(self.DateRules.EveryDay("SPY"), # self.TimeRules.AfterMarketOpen("SPY", -10), # self.ReFit) # elif self.fit_freq == FitFrequency.WEEKLY: # self.Schedule.On(self.DateRules.WeekStart("SPY"), # self.TimeRules.AfterMarketOpen("SPY", -10), # self.ReFit) # elif self.fit_freq == FitFrequency.MONTHLY: # self.Schedule.On(self.DateRules.MonthStart("SPY"), # self.TimeRules.AfterMarketOpen("SPY", -10), # self.ReFit) # endregion def ReFit(self): for pair, inst in self.Pairs.items(): inst.GetHedgeRatio() def OnData(self, data): for pair, inst in self.Pairs.items(): inst.OnData(data) def SafeOpt(self): tst = self.GetParameter('bb-len') if tst: self.bb_len = int(tst) tst = self.GetParameter('n-sig') if tst: self.n_sig = float(tst) def OnOrderEvent(self, orderEvent: OrderEvent) -> None: # order = self.Transactions.GetOrderById(orderEvent.OrderId) for parent_child, inst in self.Pairs.items(): if orderEvent.OrderId in inst.OrderIds: inst.OnOrderUpdate(orderEvent) # Can do any bookkeeping, order tracking, logging here. class MakerTakerModel(FeeModel): def __init__(self, maker = -.0016, taker = .003): self.maker = maker self.taker = taker def GetOrderFee(self, parameters: OrderFeeParameters) -> OrderFee: qty = parameters.Order.Quantity ord_type = parameters.Order.Type # self.Debug(f'Order Type: {ord_type}') if ord_type in [OrderType.Market, OrderType.StopMarket]: fee_usd = self.taker * qty else: fee_usd = self.maker * qty return OrderFee(CashAmount(fee_usd, 'USD'))
#region imports ''' Ratio taken from Research (06 - 12) Tested out of sample from 12 + ''' from AlgorithmImports import * #endregion from scipy.stats import linregress import math class EMAMomentumUniverse(QCAlgorithm): res = Resolution.Hour s1 = 'GOOG' s2 = 'GOOGL' # DISC, DISCA, NWS, NWSA, UA, UAA, HPE, HPQ ? Others? bb_len = 120 n_sig = .8 def Initialize(self): # Define backtest window and portfolio cash self.SetStartDate(2012, 6, 10) self.SetEndDate(2012, 6, 20) # self.SetEndDate(2012 + 5, 6, 10) # Try running for 3 years after? # self.SetEndDate(2021, 6, 9) # Try running for 10 yrs after? self.SetCash(100000) # Add the assets to be fed into the algorithm and save the symbol objects (to be referred later) self.ewc_symbol = self.AddEquity(self.s1, self.res).Symbol self.ewa_symbol = self.AddEquity(self.s2, self.res).Symbol # Create two identity indicators (a indicator that repeats the value without any processing) self.ewc_identity = Identity("My_EWC") self.ewa_identity = Identity("My_EWA") # Set these indicators to receive the data from EWC and EWA self.RegisterIndicator(self.ewc_symbol, self.ewc_identity, self.res) self.RegisterIndicator(self.ewa_symbol, self.ewa_identity, self.res) # ---------------------------------------------- ADDIT ------------------------------------------ # h = self.History(self.Securities.Keys, 500, self.res) # Maybe not working? # p1 = h.loc[self.s1] # p1 = self.History(self.ewa_symbol, 500, self.res).loc[self.ewa_symbol].close # p2 = self.History(self.ewc_symbol, 500, self.res).loc[self.ewc_symbol].close # reg = linregress(p1, p2) # self.hedge = reg.slope # self.hedge = 1.3188 #Original self.hedge = None # ---------------------------------------- End # create the portfolio as a new indicator # this is handy as the portfolio will be updated as new data comes in, without the necessity of updating the values manually # as the QCAlgorithm already has a Portfolio attribute, we will call our combined portfolio as series self.series = IndicatorExtensions.Minus(self.ewc_identity, IndicatorExtensions.Times(self.ewa_identity, 1.3188)) # We then create a bollinger band with 120 steps for lookback period self.bb = BollingerBands(self.bb_len, self.n_sig, MovingAverageType.Exponential) # Register indicator here... # Define the objectives when going long or going short (long=buy EWC and sell EWA) (short=sell EWC and buy EWA) self.long_targets = [PortfolioTarget(self.ewc_symbol, 0.9), PortfolioTarget(self.ewa_symbol, -0.9)] self.short_targets = [PortfolioTarget(self.ewc_symbol, -0.9), PortfolioTarget(self.ewa_symbol, 0.9)] self.is_invested = None self.first_time = True for symbol, security in self.Securities.items(): security.SetSlippageModel(ConstantSlippageModel(0)) # kvp.SetFeeModel(ConstantFeeModel(.008)) #This is a raw price I think... #TODO: unsure how to write a fee model ? security.SetFeeModel(MakerTakerModel()) def GetHedgeRatio(self): p1 = self.History(self.ewa_symbol, 500, self.res).loc[self.ewa_symbol].close p2 = self.History(self.ewc_symbol, 500, self.res).loc[self.ewc_symbol].close reg = linregress(p1,p2) self.hedge = reg.slope # Define the objectives when going long or going short (long=buy EWC and sell EWA) (short=sell EWC and buy EWA) self.long_targets = [PortfolioTarget(self.ewc_symbol, 0.9), PortfolioTarget(self.ewa_symbol, -0.9)] self.short_targets = [PortfolioTarget(self.ewc_symbol, -0.9), PortfolioTarget(self.ewa_symbol, 0.9)] def OnEndOfDay(self): if not self.hedge: try: self.GetHedgeRatio() self.first_time = False self.Debug(f' ------------------------------------- HEDGE QTY SET {self.hedge} --------------------------------------------------- ') except: pass def OnData(self, data): # for daily bars data is delivered at 00:00 of the day containing the closing price of the previous day (23:59:59) if (not data.Bars.ContainsKey(self.ewc_symbol)) or (not data.Bars.ContainsKey(self.ewa_symbol)): return #update the Bollinger Band value self.bb.Update(self.Time, self.series.Current.Value) # check if the bolllinger band indicator is ready (filled with 120 steps) if not self.bb.IsReady: return serie = self.series.Current.Value # self.Plot("EWA Prices", "Open", self.Securities[self.ewa_symbol].Open) # self.Plot("EWA Prices", "Close", self.Securities[self.ewa_symbol].Close) # self.Plot("Indicators", "Serie", serie) # self.Plot("Indicators", "Middle", self.bb.MiddleBand.Current.Value) # self.Plot("Indicators", "Upper", self.bb.UpperBand.Current.Value) # self.Plot("Indicators", "Lower", self.bb.LowerBand.Current.Value) s1, s2 = self.ewc_symbol, self.ewa_symbol # if it is not invested, see if there is an entry point if not self.is_invested: # if our portfolio is bellow the lower band, enter long if serie < self.bb.LowerBand.Current.Value: self.SetHoldings(self.long_targets) q1 = self.CalculateOrderQuantity(s1, .9) q2 = self.CalculateOrderQuantity(s2, -.9) # ask = self.Portfolio[s1].AskPrice # ask = data.Bars[s1].AskPrice p1 = self.Portfolio[s1].Price p2 = self.Portfolio[s2].Price # self.LimitOrder(s1, q1, p1) # self.LimitOrder(s2, q1, p2) # self.Debug('Entering Long') self.is_invested = 'long' # if our portfolio is above the upper band, go short if serie > self.bb.UpperBand.Current.Value: self.SetHoldings(self.short_targets) q1 = self.CalculateOrderQuantity(s1, -.9) q2 = self.CalculateOrderQuantity(s2, .9) # ask = self.Portfolio[s1].AskPrice # ask = data.Bars[s1].AskPrice p1 = self.Portfolio[s1].Price p2 = self.Portfolio[s2].Price # Pricing model not working? # self.LimitOrder(s1, q1, p1) # self.LimitOrder(s2, q1, p2) # self.Debug('Entering Short') self.is_invested = 'short' # if it is invested in something, check the exiting signal (when it crosses the mean) elif self.is_invested == 'long': if serie > self.bb.MiddleBand.Current.Value: self.Liquidate() # self.Debug('Exiting Long') self.is_invested = None elif self.is_invested == 'short': if serie < self.bb.MiddleBand.Current.Value: self.Liquidate() # self.Debug('Exiting Short') self.is_invested = None class MakerTakerModel(FeeModel): # def __init__(maker = -.0016, taker = .003): # self.maker = maker # self.taker = taker def GetOrderFee(self, parameters: OrderFeeParameters) -> OrderFee: qty = parameters.Order.Quantity ord_type = parameters.Order.Type # self.Debug(f'Order Type: {ord_type}') # fee_in_usd = .0008 make_ps = -.0016 #Rebate take_ps = .003 if ord_type in [OrderType.Market, OrderType.StopMarket]: fee_usd = take_ps * qty else: fee_usd = make_ps * qty # fee_usd = make_ps * qty return OrderFee(CashAmount(fee_usd, 'USD'))