Zipline
Using Data
Pipeline in Algorithms
As covered in the Initialization section, Pipelines can be replicated using Universe Selection in QuantConnect, albeit with some additional steps in between and a performance impact.
In this section, we will construct and define an equivalent pipeline model using universe selection in QuantConnect. We will filter our data set in Coarse and Fine, and apply an additional filter to historical data with a rolling window.
We first create a skeleton algorithm definition to begin setting up our Pipeline. Note that this algorithm will allow all equities through, which will have a substantial performance impact on our algorithm.
class MyPipelineAlgorithm(QCAlgorithm): def initialize(self) -> None: self.set_start_date(2020, 1, 1) self.set_end_date(2020, 10, 20) self.add_universe(self.coarse_selection_function, self.fine_selection_function) def coarse_selection_function(self, coarse: List[CoarseFundamental]) -> List[Symbol]: # Allows all Symbols through, no filtering applied return [coarse_data.symbol for coarse_data in coarse] def fine_selection_function(self, fine: List[FineFundamental]) -> List[Symbol]: # Allows all Symbols through, no filtering applied return [fine_data.symbol for fine_data in fine]
The skeleton algorithm is the equivalent of the Pipeline call below.
from quantopian.pipeline import Pipeline from quantopian.pipeline.domain import US_EQUITIES from quantopian.research import run_pipeline pipe = Pipeline(columns={}, domain=US_EQUITIES) run_pipeline(pipe, '2020-01-01', '2020-10-20')
The equivalent of Pipeline(screen=...)
resolves to the filter applied at the Coarse and Fine stages
of universe selection.
Let's define a filter of stocks with a dollar volume greater than $50000000 USD, as well as
a rolling thirty day return greater than 2%. Once we've initially filtered the Symbols in Coarse Universe Selection,
let's define a final filter only allowing stocks with EPS greater than 0.
Beware of making History()
calls with many Symbols. It could potentially cause your algorithm to
run out of system resources (i.e. RAM) and reduce performance of your algorithm on universe selection.
from datetime import datetime, timedelta class MyPipelineAlgorithm(QCAlgorithm): def Initialize(self) -> None: self.SetStartDate(2020, 1, 1) self.SetEndDate(2020, 10, 20) self.AddUniverse(self.CoarseSelectionFunction, self.FineSelectionFunction) def CoarseSelectionFunction(self, coarse: List[CoarseFundamental]) -> List[Symbol]: # Allows all Symbols through, no filtering applied dollar_volume_filter_symbols = [coarse_data.Symbol for coarse_data in coarse if coarse_data.DollarVolume > 50000000] # Make a history call to calculate the 30 day rolling returns df = self.History(dollar_volume_filter_symbols, self.Time - timedelta(days=60), self.Time, Resolution.Daily) # Compute the rolling 30 day returns df = df['close'].groupby(level=0).filter(lambda x: len(x) >= 30).groupby(level=0).apply(lambda x: (x.iloc[-1] - x.iloc[-30]) / x.iloc[-30]) # Finally, apply our filter dataframe_screen = df[df > 0.02] # Filters out any Symbol that is not in the DataFrame return [s for s in dollar_volume_filter_symbols if str(s) in dataframe_screen] def FineSelectionFunction(self, fine: List[FineFundamental]) -> List[Symbol]: # We receive the filtered symbols from before, and we filter by EPS > 0 in this step return [s.Symbol for s in fine if s.EarningReports.BasicEPS.ThreeMonths > 0]
This class definition is now roughly equivalent to the following CustomFactor
and Pipeline call in Quantopian, excluding the EPS filtering.
from quantopian.pipeline.filters import QTradableStocksUS from quantopian.pipeline.factors import AverageDollarVolume class PercentageChange(CustomFactor): def compute(self, today, assets, out, values): out[:] = (values[-1] - values[0]) / values[0] dollar_volume = AverageDollarVolume(window_length=5) dollar_volume_filter = (dollar_volume > 50000000) pipe = Pipeline( columns={ "percent_change": PercentageChange(inputs=[USEquityPricing.close], window_length=30) }, screen=(QTradableStocksUS() & dollar_volume_filter) )
An example of the shape of the DataFrame returned from History is shown below.
The DataFrame has a MultiIndex, with level=0
being the Symbol, and level=1
being the Time
for that point of data. You can index the Symbol/level=0
index by using either the SecurityIdentifier string (e.g. df.loc["AAPL R735QTJ8XC9X"]
) or
with the ticker of the Symbol (e.g. df.loc["AAPL"]
)
BarData Lookup
Similar but different, the Quantopian BarData
object, and the QuantConnect Slice
object
both provide data to the user's algorithm as point-in-time data.
In Quantopian, data is handled via the handle_data(context, data)
function.
In QuantConnect, data is handled via the OnData(self, slice)
method.
Both of these functions accept data whenever new data exists for a given point in time.
Although these two functions share the same method signature, the handling of the data is different.
BarData
vs. Slice
BarData
is the primary mechanism to retrieve the point-in-time data, as well as requesting
history for any given securities in Quantopian.
The following code retrieves daily historical data from 30 days into the past, as well as getting
the most recent data for AAPL at the current point-in-time.
Quantopian
def initialize(context): context.aapl = sid(24) def handle_data(context, data): # Gets a DataFrame of AAPL history going back 30 days aapl_history = data.history(context.aapl, fields=["open", "high", "low", "close", "volume"], 30, "1d") # Gets a pandas Series of the most recent AAPL data aapl_current = data.current(context.aapl, fields=["open", "high", "low", "close", "volume"])
QuantConnect
from datetime import timedelta class MyHistoryAlgorithm(QCAlgorithm): def initialize(self) -> None: self.aapl = self.add_equity("AAPL", Resolution.DAILY) def on_data(self, slice: Slice) -> None: # Gets a DataFrame of AAPL history going back 30 days aapl_history = self.history([self.aapl.symbol], timedelta(days=30), Resolution.DAILY) # Gets the most recent AAPL Trade data (OHLCV) aapl_current = slice.bars[self.aapl.symbol]
Slice
represents a single point in time, and does not provide the functionality to access historical data itself.
To access historical data in an algorithm, use the algorithm's self.History()
call to request
a pandas DataFrame of historical data.
In Slice
, the data that is accessed is not a pandas DataFrame, but rather a single object containing data for a single point in time.
The TradeBar
class, for example, only contains scalar values of OHLCV, rather than return a DataFrame of OHLCV values.
Since the data Slice
contains is point-in-time, there will be only a single trade/quote bar per Symbol whenever OnData(self, data)
is called.
QuantConnect provides Quote (NBBO) data for use in your algorithm, otherwise known as a QuoteBar
.
Quote data is only accessible when an algorithm is set to consume Tick, Second, or Minutely data.
You can access Trade (OHLCV) data by accessing the Bars
attribute of Slice
.
You can access Quote (Bid(OHLCV), Ask(OHLCV)) data by accessing the QuoteBars
attribute of Slice
.
Both of the Bars
and QuoteBars
attributes are similar to Python dictionaries, and can
be used as such. To check to see if there exists a new piece of data for a given security, you can use Python's in
operator
on Bars
and or QuoteBars
.
You can also choose to iterate on all data received by calling the Values
attribute of the Bars
or QuoteBars
attributes, which will
return either a list of TradeBar
or QuoteBar
objects.
The TradeBar
object contains the Open
, High
, Low
, Close
, Volume
, Time
time
, EndTime
end_time
, and Symbol
attributes.
The QuoteBar
object contains the following attributes:
Bid.Open
,Bid.High
,Bid.Low
,Bid.Close
,LastBidSize
Ask.Open
,Ask.High
,Ask.Low
,Ask.Close
,LastAskSize
Time
time
,EndTime
end_time
, andSymbol
.
Note that the Bid
and Ask
attributes can potentially be None
if no bid/ask data exists
at a given point-in-time.
The example below shows the different ways to access TradeBar
and QuoteBar
data, as well as requesting 30 days of AAPL historical data.
from datetime import datetime, timedelta class MyDataAlgorithm(QCAlgorithm): def initialize(self) -> None: self.aapl_security = self.add_equity("AAPL", Resolution.DAILY) self.aapl_symbol = self.aapl_security.symbol def on_data(self, slice: Slice) -> None: # Gets 30 days of AAPL history aapl_history = self.history([self.aapl_symbol], timedelta(days=30), Resolution.DAILY) # We must first check to make sure we have AAPL data, since this point-in-time # might not have had any trades for AAPL (this is in case you trade a low # liquidity asset. The data can potentially be missing for a point-in-time). if self.aapl_symbol in slice.bars: aapl_current_trade = slice.bars[self.aapl_symbol] Log(f"{self.time} :: TRADE :: {self.aapl_symbol} - O: {aapl_current_trade.open} H: {aapl_current_trade.high} L: {aapl_current_trade.low} C: {aapl_current_trade.close} V: {aapl_current_trade.volume}") # Check to make sure we have AAPL data first, since there might not have # been any quote updates since the previous for AAPL (this is in case you trade # a low liquidity asset. The data can potentially be missing for a point-in-time). if self.aapl_symbol in data.quote_bars: aapl_current_quote = slice.quote_bars[self.aapl_symbol] if aapl_current_quote.bid is not None: Log(f"{} :: QUOTE :: {} - Bid O: {} Bid H: {} Bid L: {} Bid C: {} Bid size: {}".format( str(self.time), str(self.aapl_symbol) str(aapl_current_quote.bid.open), str(aapl_current_quote.bid.high), str(aapl_current_quote.bid.low), str(aapl_current_quote.bid.close), str(aapl_current_quote.last_bid_size) )) if aapl_current_quote.ask is not None: Log(f"{} :: QUOTE :: {} - Ask O: {} Ask H: {} Ask L: {} Ask C: {} Ask size: {}".format( str(self.time), str(self.aapl_symbol), str(aapl_current_quote.ask.open), str(aapl_current_quote.ask.high), str(aapl_current_quote.ask.low), str(aapl_current_quote.ask.close), str(aapl_current_quote.last_ask_size) ))