from clr import AddReference
AddReference("System")
AddReference("QuantConnect.Algorithm")
AddReference("QuantConnect.Common")
from System import *
from QuantConnect import *
from QuantConnect.Algorithm import *
from QuantConnect.Data.UniverseSelection import *
import decimal as d
import base64
from System.Collections.Generic import List
import numpy as np
from datetime import timedelta
from System import *
from QuantConnect.Data import *
from QuantConnect.Indicators import *
from collections import deque
class GF2Dropbox(QCAlgorithm):
def Initialize(self):
self.SetStartDate(2019,1,1)
self.SetEndDate(2019,1,10)
self.SetCash(100000)
self.smaperiod = 12
#TODO:SetBrokerageModel(BrokerageName.InteractiveBrokersBrokerage, AccountType.Cash)
#TODO:SetTimeZone(TimeZones.Toronto)
#TODO:SetSecurityInitializer(lambda x: x.SetDataNormalizationMode(DataNormalizationMode.Raw))
self.backtestSymbolsPerDay = {}
self.current_universe = []
self.stateData = {}
self.addedSymbols = []
self.indicatorchart = {}
self.UniverseSettings.Resolution = Resolution.Hour
self.AddUniverse("my-dropbox-universe", self.selector)
#Create custom chart (max 10 total symbols * series)
for x in ["SPY", "AAPL"]:
self.indicatorchart[x] = Chart(x +' Indicators')
self.indicatorchart[x].AddSeries(Series('Close', SeriesType.Line, 0))
self.indicatorchart[x].AddSeries(Series('SMA12', SeriesType.Line, 0))
self.indicatorchart[x].AddSeries(Series('ATR', SeriesType.Line, 1))
self.AddChart(self.indicatorchart[x])
def selector(self, date):
# handle live mode file format
if self.LiveMode:
# fetch the file from dropbox
str = self.Download("https://www.dropbox.com/s/2az14r5xbx4w5j6/daily-stock-picker-live.csv?dl=1")
# if we have a file for today, return symbols, else leave universe unchanged
self.current_universe = str.split(',') if len(str) > 0 else self.current_universe
return self.current_universe
# backtest - first cache the entire file
if len(self.backtestSymbolsPerDay) == 0:
# No need for headers for authorization with dropbox, these two lines are for example purposes
byteKey = base64.b64encode("UserName:Password".encode('ASCII'))
# The headers must be passed to the Download method as dictionary
headers = { 'Authorization' : f'Basic ({byteKey.decode("ASCII")})' }
str = self.Download("https://www.dropbox.com/s/lcq6sdxlbmfex3n/UniverseStocksBacktest.csv?dl=1", headers)
for line in str.splitlines():
data = line.split(',')
self.backtestSymbolsPerDay[data[0]] = data[1:]
index = date.strftime("%Y%m%d")
self.current_universe = self.backtestSymbolsPerDay.get(index, self.current_universe)
return self.current_universe
def OnData(self, slice):
for x in self.ActiveSecurities.Values:
# Updates the SymbolData object with current EOD price
avg = self.stateData[x.Symbol]
avg.update(self.Time, x.Price)
avg.update_atr(slice[x.Symbol]) #self.indicators[symbol].update_bar(data[symbol])
lastClose = slice[x.Symbol].Close
if x.Symbol.Value in ["SPY", "AAPL"]:
lastSMA = self.stateData[x.Symbol].sma.Current.Value
lastATR = self.stateData[x.Symbol].atr.Current.Value
#Plot Chart
self.Plot(x.Symbol.Value + ' Indicators', 'Close', lastClose)
self.Plot(x.Symbol.Value + ' Indicators', 'SMA12', lastSMA)
self.Plot(x.Symbol.Value + ' Indicators', 'ATR', lastATR)
# reset changes
self.changes = None
def OnSecuritiesChanged(self, changes):
self.changes = changes
self.Log(f"self.changes: {self.changes}")
# liquidate removed securities
for security in changes.RemovedSecurities:
if security.Invested:
self.Liquidate(security.Symbol)
# we want 10% allocation in each security in our universe
for security in changes.AddedSecurities:
if not self.Portfolio[security.Symbol].Invested:
self.SetHoldings(security.Symbol,0.1) #move - won't work until price data avail
# warm up the indicator with history price for newly added securities
self.addedSymbols = [x.Symbol for x in changes.AddedSecurities] #if x.Symbol.Value != "SPY"
history = self.History(self.addedSymbols, self.smaperiod, Resolution.Hour)
for symbol in self.addedSymbols:
if symbol not in self.stateData.keys():
self.stateData[symbol] = SelectionData(symbol, self.smaperiod)
bars = self.History([symbol],5,Resolution.Hour) #duplicate history
#self.Log(f"bars: {bars} {bars.loc[str(symbol)]}")
if str(symbol) in bars.index:
for bar in bars.loc[str(symbol)].itertuples():
self.Log(f"bar: {bar}")
self.stateData[symbol].WarmUpIndicatorBar(bar)
if str(symbol) in history.index:
#self.Log(f"hist: {history.loc[str(symbol)]}")
self.stateData[symbol].WarmUpIndicator(history.loc[str(symbol)])
class SelectionData(object):
def __init__(self, symbol, period):
self.symbol = symbol
self.atr = AverageTrueRange(5)
self.sma = SimpleMovingAverage(period)
self.is_above_sma = False
self.atr_ready = False
#self.volume = 0
def update(self, time, price):
#self.volume = volume
if self.sma.Update(time, price):
self.is_above_sma = price > self.sma.Current.Value
def update_atr(self, bar):
if self.atr.Update(bar):
self.qatr.appendleft(self.atr)
self.atr_ready = self.atr.Current.Value > 0
def WarmUpIndicator(self, history):
# warm up the SMA indicator with the history request
for tuple in history.itertuples():
item = IndicatorDataPoint(self.symbol, tuple.Index, float(tuple.close))
self.sma.Update(item)
def WarmUpIndicatorBar(self, bar):
self.atr.Update(bar)