Overall Statistics |
Total Trades 1 Average Win 0% Average Loss 0% Compounding Annual Return 6.826% Drawdown 57.300% Expectancy 0 Net Profit 218.554% Sharpe Ratio 0.378 Probabilistic Sharpe Ratio 0.073% Loss Rate 0% Win Rate 0% Profit-Loss Ratio 0 Alpha 0.064 Beta 0.462 Annual Standard Deviation 0.159 Annual Variance 0.025 Information Ratio 0.429 Tracking Error 0.16 Treynor Ratio 0.13 Total Fees $0.00 Estimated Strategy Capacity $12000.00 Lowest Capacity Asset SPX500USD 8I |
from AlgorithmImports import * #endregion from datetime import date, timedelta, datetime import numpy as np import math import statistics class LogicalApricotCobra(QCAlgorithm): def Initialize(self): # set up self.SetStartDate(2005, 1, 1) # SPY cumulative return cca 410% for the period from 2005-01-01 to end # self.SetEndDate(2022, 8, 23) self.SetCash(1000000) self.SetBrokerageModel(BrokerageName.OandaBrokerage, AccountType.Margin) # parameters self.budget = 50000 self.thresold = 2.9 self.sma_width = 8 self.close_window_length = 7 self.buy_bond = False # add data self.spx = self.AddCfd("SPX500USD", Resolution.Hour).Symbol # self.spx = self.AddEquity("SPY", Resolution.Hour).Symbol def OnData(self, data): # check if data is available # if not data.Quotes.ContainsKey(self.spx): # return # extract vars c = data[self.spx].Close # plots self.Plot("Cfd plot", "Cfd Close", c) # trading rule if not self.Portfolio[self.spx].Invested: self.SetHoldings(self.spx, 1) """ class LogicalApricotCobra(QCAlgorithm): def Initialize(self): # set up self.SetStartDate(2005, 1, 1) # SPY cumulative return cca 410% for the period from 2005-01-01 to end # self.SetEndDate(2022, 8, 23) self.SetCash(50000) self.SetBrokerageModel(BrokerageName.InteractiveBrokersBrokerage, AccountType.Margin) # parameters self.budget = 50000 self.thresold = 2.9 self.sma_width = 8 self.close_window_length = 7 # self.size = 0 self.buy_bond = False # optimization parameters # self.thresold = float(self.GetParameter("radf-threshold")) # self.sma_width = int(self.GetParameter("radf-sma-width")) if self.LiveMode: self.spy = self.AddEquity("SPY", Resolution.Minute).Symbol self.symbol = self.AddData(RadfAgg, "EXUBER", Resolution.Minute, TimeZones.Utc).Symbol else: # SPY data equity = self.AddCfd("SPX500USD", Resolution.Hour) # equity.SetDataNormalizationMode(DataNormalizationMode.Raw) self.spy = equity.Symbol # self.SetBenchmark(lambda dt: self.Securities[self.spy].Price) # self.SetBenchmark("SPY") # exuber data self.symbol = self.AddData(RadfAgg, "EXUBER", Resolution.Hour).Symbol self.radf_sma = self.SMA("EXUBER", self.sma_width, Resolution.Hour) if self.buy_bond: bond = self.AddEquity("TLT", Resolution.Hour) bond.SetDataNormalizationMode(DataNormalizationMode.Raw) self.bond = bond.Symbol gold = self.AddEquity("GDX", Resolution.Hour) gold.SetDataNormalizationMode(DataNormalizationMode.Raw) self.gold = gold.Symbol # init self.bond_close_window = RollingWindow[float](7 * 5) self.gold_close_window = RollingWindow[float](7 * 5) # set and warmup rolling close self.close_radf = RollingWindow[float](self.close_window_length) history = self.History([self.spy], self.close_window_length, Resolution.Hour) for index, ohlcv in history.loc[self.spy].iterrows(): self.close_radf.Add(ohlcv["close"]) # warm up safe assets if self.buy_bond: history = self.History([self.bond, self.gold], 7 * 5, Resolution.Hour) # , self.gold for index, ohlcv in history.loc[self.bond].iterrows(): self.bond_close_window.Add(ohlcv["close"]) for index, ohlcv in history.loc[self.gold].iterrows(): self.gold_close_window.Add(ohlcv["close"]) def OnData(self, data): # check market hours exchange = self.Securities[self.spy].Exchange if not exchange.ExchangeOpen: self.Debug("Exchange is closed.") return # extract radf if self.LiveMode: # check if SPY data is ready # if not data.ContainsKey(self.spy): # self.Log(f'SPY is not ready') # return # get SPY data spy = self.Securities[self.spy] o, h, l, c = spy.Open, spy.High, spy.Low, spy.Close if c == 0: self.Log("SPY close price is 0.") return # check if custom data is ready if not data.ContainsKey(self.symbol): self.Log(f'Radg agg is not ready') return # define radf value and spy close radf = data[self.symbol].Value self.close_radf.Add(c) else: if not self.radf_sma.IsReady: self.Log(f'Radf agg SMA is not ready') return if not data.Bars.ContainsKey("SPY"): self.Log(f'SPY is not ready') return if not data.ContainsKey(self.symbol): self.Log(f'Radg agg is not ready') return radf = self.radf_sma.Current.Value radf_current = data[self.symbol].Value c = data[self.spy].Close self.close_radf.Add(c) # safe assets if self.buy_bond: if not data.ContainsKey(self.bond) or not data.ContainsKey(self.gold): self.Log(f'Safe assets are not ready') return # safe assets data if self.buy_bond: self.bond_close_window.Add(data[self.bond].Close) self.Plot(f"Alternative assets", "Bond", self.bond_close_window[0]) self.gold_close_window.Add(data[self.gold].Close) self.Plot(f"Alternative assets", "Gold", self.gold_close_window[0]) # current return close_diff = self.close_radf[0] - self.close_radf[self.close_window_length - 1] self.Log(close_diff) # plot indicator self.Plot("Exuber Indicators", "Radf Sum SD", radf) self.Plot("Exuber Indicators", "Exuber Agg Threshold", self.thresold) # mannually set sizes # if self.size == 0 and radf < self.thresold: # self.size = math.floor(self.budget / c) # self.buy_order = self.MarketOrder(self.spy, self.size) # self.Log(f"Buy Exuber. Exuber size after buy {self.size}") # elif self.size > 0 and radf >= self.thresold and close_diff < 0: # self.sell_order = self.MarketOrder(self.spy, -self.size) # self.last_pl = self.buy_order.AverageFillPrice - self.sell_order.AverageFillPrice # self.budget = self.budget + math.floor(self.last_pl) # self.size = 0 # self.Log("Sell Exuber") ## buy or sell all if not self.Portfolio[self.spy].Invested and (radf < self.thresold): if self.buy_bond: if self.Portfolio[self.bond].Invested: self.Liquidate(self.bond, "Sell bonds.") if self.Portfolio[self.gold].Invested and self.buy_bond: self.Liquidate(self.gold, "Sell gold.") self.Log(f'Buy at radf {radf}') self.SetHoldings("SPY", 1) elif self.Portfolio["SPY"].Invested and (radf > self.thresold) and close_diff < 0: self.Log(f'Sell at radf {radf}') self.Liquidate("SPY", "radf greater than threshold") if self.buy_bond: # calculate momentum for alternative assets bond_prices = list(self.bond_close_window) bond_momentum = bond_prices[0] / bond_prices[-1] - 1 bond_prices.reverse() bond_sd = statistics.stdev(bond_prices) bond_momentum = bond_momentum / bond_sd gold_prices = list(self.gold_close_window) gold_prices.reverse() gold_momentum = gold_prices[0] / gold_prices[-1] - 1 gold_sd = statistics.stdev(gold_prices) gold_momentum = gold_momentum / gold_sd # trade alternative assets if bond_momentum > gold_momentum and bond_momentum > 0: self.SetHoldings(self.bond, 1) elif gold_momentum > bond_momentum and gold_momentum > 0: self.SetHoldings(self.gold, 1) class RadfAgg(PythonData): def GetSource(self, config, date, isLive): # "https://www.dropbox.com/s/4b1vrn1v7i8nppj/1-200-2.csv?dl=1" # "https://www.dropbox.com/s/qmn7eqri0jlloy1/1-100-5.csv?dl=1" if isLive: return SubscriptionDataSource("https://contentiobatch.blob.core.windows.net/qc-live/exuber.csv", SubscriptionTransportMedium.RemoteFile) else: return SubscriptionDataSource("https://www.dropbox.com/s/7pp26sqvnd2qg69/1-600-1.csv?dl=1", SubscriptionTransportMedium.RemoteFile) def Reader(self, config, line, date, isLive): # If first character is not digit, pass if not (line.strip() and line[0].isdigit()): return None if isLive: data = line.split(',') index = RadfAgg() index.Symbol = config.Symbol index.EndTime = datetime.utcnow() index.Time = datetime.utcnow() index.Value = float(data[7]) # 7 sd_radf_sum_sma_8 index["adf"] = float(data[1]) index["sadf"] = float(data[2]) index["gsadf"] = float(data[3]) index["bsadf"] = float(data[5]) index["radf_sum"] = float(data[6]) else: data = line.split(',') index = RadfAgg() index.Symbol = config.Symbol index.Time = datetime.strptime(data[0], '%Y-%m-%d %H:%M:%S') + timedelta(hours=1) # Make sure we only get this data AFTER trading day - don't want forward bias. index.Value = float(data[6]) # 5 or 6 index["adf"] = float(data[1]) # sd_adf index["sadf"] = float(data[2]) # sd_sadf index["gsadf"] = float(data[3]) # sd_gsadf index["badf"] = float(data[4]) # sd_badf index["bsadf"] = float(data[5]) # sd_bsadf index["radf_sum"] = float(data[6]) # sd_radf_sum return index """