Overall Statistics |
Total Trades 0 Average Win 0% Average Loss 0% Compounding Annual Return 0% Drawdown 0% Expectancy 0 Net Profit 0% Sharpe Ratio 0 Loss Rate 0% Win Rate 0% Profit-Loss Ratio 0 Alpha 0 Beta 0 Annual Standard Deviation 0 Annual Variance 0 Information Ratio 0 Tracking Error 0 Treynor Ratio 0 Total Fees $0.00 |
from clr import AddReference AddReference("System") AddReference("QuantConnect.Algorithm") AddReference("QuantConnect.Common") from System import * from QuantConnect import * from QuantConnect.Algorithm import * from QuantConnect.Data.UniverseSelection import * import decimal as d import base64 from System.Collections.Generic import List import numpy as np from datetime import timedelta from System import * from QuantConnect.Data import * from QuantConnect.Indicators import * from collections import deque class GF2Dropbox(QCAlgorithm): def Initialize(self): self.SetStartDate(2019,1,1) self.SetEndDate(2019,1,10) self.SetCash(100000) self.smaperiod = 12 #TODO:SetBrokerageModel(BrokerageName.InteractiveBrokersBrokerage, AccountType.Cash) #TODO:SetTimeZone(TimeZones.Toronto) #TODO:SetSecurityInitializer(lambda x: x.SetDataNormalizationMode(DataNormalizationMode.Raw)) self.backtestSymbolsPerDay = {} self.current_universe = [] self.stateData = {} self.addedSymbols = [] self.indicatorchart = {} self.UniverseSettings.Resolution = Resolution.Hour self.AddUniverse("my-dropbox-universe", self.selector) #Create custom chart (max 10 total symbols * series) for x in ["SPY", "AAPL"]: self.indicatorchart[x] = Chart(x +' Indicators') self.indicatorchart[x].AddSeries(Series('Close', SeriesType.Line, 0)) self.indicatorchart[x].AddSeries(Series('SMA12', SeriesType.Line, 0)) self.indicatorchart[x].AddSeries(Series('ATR', SeriesType.Line, 1)) self.AddChart(self.indicatorchart[x]) def selector(self, date): # handle live mode file format if self.LiveMode: # fetch the file from dropbox str = self.Download("https://www.dropbox.com/s/2az14r5xbx4w5j6/daily-stock-picker-live.csv?dl=1") # if we have a file for today, return symbols, else leave universe unchanged self.current_universe = str.split(',') if len(str) > 0 else self.current_universe return self.current_universe # backtest - first cache the entire file if len(self.backtestSymbolsPerDay) == 0: # No need for headers for authorization with dropbox, these two lines are for example purposes byteKey = base64.b64encode("UserName:Password".encode('ASCII')) # The headers must be passed to the Download method as dictionary headers = { 'Authorization' : f'Basic ({byteKey.decode("ASCII")})' } str = self.Download("https://www.dropbox.com/s/lcq6sdxlbmfex3n/UniverseStocksBacktest.csv?dl=1", headers) for line in str.splitlines(): data = line.split(',') self.backtestSymbolsPerDay[data[0]] = data[1:] index = date.strftime("%Y%m%d") self.current_universe = self.backtestSymbolsPerDay.get(index, self.current_universe) return self.current_universe def OnData(self, slice): for x in self.ActiveSecurities.Values: # Updates the SymbolData object with current EOD price avg = self.stateData[x.Symbol] avg.update(self.Time, x.Price) avg.update_atr(slice[x.Symbol]) #self.indicators[symbol].update_bar(data[symbol]) lastClose = slice[x.Symbol].Close if x.Symbol.Value in ["SPY", "AAPL"]: lastSMA = self.stateData[x.Symbol].sma.Current.Value lastATR = self.stateData[x.Symbol].atr.Current.Value #Plot Chart self.Plot(x.Symbol.Value + ' Indicators', 'Close', lastClose) self.Plot(x.Symbol.Value + ' Indicators', 'SMA12', lastSMA) self.Plot(x.Symbol.Value + ' Indicators', 'ATR', lastATR) # reset changes self.changes = None def OnSecuritiesChanged(self, changes): self.changes = changes self.Log(f"self.changes: {self.changes}") # liquidate removed securities for security in changes.RemovedSecurities: if security.Invested: self.Liquidate(security.Symbol) # we want 10% allocation in each security in our universe for security in changes.AddedSecurities: if not self.Portfolio[security.Symbol].Invested: self.SetHoldings(security.Symbol,0.1) #move - won't work until price data avail # warm up the indicator with history price for newly added securities self.addedSymbols = [x.Symbol for x in changes.AddedSecurities] #if x.Symbol.Value != "SPY" history = self.History(self.addedSymbols, self.smaperiod, Resolution.Hour) for symbol in self.addedSymbols: if symbol not in self.stateData.keys(): self.stateData[symbol] = SelectionData(symbol, self.smaperiod) bars = self.History(str(symbol),5,Resolution.Hour) #replaced [symbol] with sym(symbol) #self.Log(f"bars: {bars} {bars.loc[str(symbol)]}") #if str(symbol) in bars.index: # for bar in bars.loc[str(symbol)].itertuples(): # self.Log(f"bar: {bar}") self.stateData[symbol].WarmUpIndicatorBar(bars) #This was/is the problem line if str(symbol) in history.index: #self.Log(f"hist: {history.loc[str(symbol)]}") self.stateData[symbol].WarmUpIndicator(history.loc[str(symbol)]) class SelectionData(object): def __init__(self, symbol, period): self.symbol = symbol self.atr = AverageTrueRange(5) self.sma = SimpleMovingAverage(period) self.is_above_sma = False self.atr_ready = False #self.volume = 0 def update(self, time, price): #self.volume = volume if self.sma.Update(time, price): self.is_above_sma = price > self.sma.Current.Value def update_atr(self, bar): if self.atr.Update(bar): self.atr_ready = self.atr.Current.Value > 0 def WarmUpIndicator(self, history): # warm up the SMA indicator with the history request for tuple in history.itertuples(): item = IndicatorDataPoint(self.symbol, tuple.Index, float(tuple.close)) self.sma.Update(item) def WarmUpIndicatorBar(self, bars): for bar in bars: self.atr.Update(bar)