'''
Copyright 2019 David Mueller
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
'''
from Correlated_Pairs_Alpha_Model import BasePairsTradingAlphaModel
from Execution.ImmediateExecutionModel import ImmediateExecutionModel
from Portfolio.EqualWeightingPortfolioConstructionModel import EqualWeightingPortfolioConstructionModel
from Risk.MaximumDrawdownPercentPerSecurity import MaximumDrawdownPercentPerSecurity
class ParticleVerticalContainmentField(QCAlgorithm):
def Initialize(self):
self.SetStartDate(2008, 7, 1) # Set Start Date
#self.SetEndDate(2008, 7, 1) # Set Start Date
self.SetCash(100000) # Set Strategy Cash
self.AddAlpha(BasePairsTradingAlphaModel(lookback = 252,
resolution = Resolution.Daily,
threshold = 2.5))
self.SetExecution(ImmediateExecutionModel())
self.SetPortfolioConstruction(EqualWeightingPortfolioConstructionModel())
self.SetRiskManagement(MaximumDrawdownPercentPerSecurity(0.01))
self.__numberOfSymbols = 1000
self.__numberOfSymbolsFine = 500
self.SetUniverseSelection(FineFundamentalUniverseSelectionModel(self.CoarseSelectionFunction, self.FineSelectionFunction, None, None))
self.SetSecurityInitializer(self.CustomSecurityInitializer)
def CustomSecurityInitializer(self, security):
'''Initialize the security with raw prices'''
security.SetDataNormalizationMode(DataNormalizationMode.Raw)
security.SetFeeModel(ConstantFeeModel(0)) # sort the data by daily dollar volume and take the top 'NumberOfSymbol
def CoarseSelectionFunction(self, coarse):
# sort descending by daily dollar volume
sortedByDollarVolume = sorted(coarse, key=lambda x: x.DollarVolume, reverse=True)
# return the symbol objects of the top entries from our sorted collection
return [ x.Symbol for x in sortedByDollarVolume[:self.__numberOfSymbols] if x.HasFundamentalData] #
# sort the data by P/E ratio and take the top 'NumberOfSymbolsFine'
def FineSelectionFunction(self, fine):
# sort descending by P/E ratio
filteredFine = [x for x in fine if x.AssetClassification.MorningstarSectorCode == MorningstarSectorCode.Energy]
if len(filteredFine) > self.__numberOfSymbolsFine:
number_of_fine = self.__numberOfSymbolsFine
else:
number_of_fine = len(filteredFine)
# take the top entries from our sorted collection
return [ x.Symbol for x in filteredFine[:number_of_fine] ]
class StandardDeviationExecutionModel(ExecutionModel):
'''Execution model that submits orders while the current market prices is at least the configured number of standard
deviations away from the mean in the favorable direction (below/above for buy/sell respectively)'''
def __init__(self,
period = 60,
deviations = 2,
resolution = Resolution.Minute):
'''Initializes a new instance of the StandardDeviationExecutionModel class
Args:
period: Period of the standard deviation indicator
deviations: The number of deviations away from the mean before submitting an order
resolution: The resolution of the STD and SMA indicators'''
self.period = period
self.deviations = deviations
self.resolution = resolution
self.targetsCollection = PortfolioTargetCollection()
self.symbolData = {}
# Gets or sets the maximum order value in units of the account currency.
# This defaults to $20,000. For example, if purchasing a stock with a price
# of $100, then the maximum order size would be 200 shares.
self.MaximumOrderValue = 500000
def Execute(self, algorithm, targets):
'''Executes market orders if the standard deviation of price is more
than the configured number of deviations in the favorable direction.
Args:
algorithm: The algorithm instance
targets: The portfolio targets'''
self.targetsCollection.AddRange(targets)
# for performance we check count value, OrderByMarginImpact and ClearFulfilled are expensive to call
if self.targetsCollection.Count > 0:
for target in self.targetsCollection.OrderByMarginImpact(algorithm):
symbol = target.Symbol
# calculate remaining quantity to be ordered
unorderedQuantity = OrderSizing.GetUnorderedQuantity(algorithm, target)
# fetch our symbol data containing our STD/SMA indicators
data = self.symbolData.get(symbol, None)
if data is None: return
# check order entry conditions
if data.STD.IsReady and self.PriceIsFavorable(data, unorderedQuantity):
# get the maximum order size based on total order value
maxOrderSize = OrderSizing.Value(data.Security, self.MaximumOrderValue)
orderSize = np.min([maxOrderSize, np.abs(unorderedQuantity)])
remainder = orderSize % data.Security.SymbolProperties.LotSize
missingForLotSize = data.Security.SymbolProperties.LotSize - remainder
# if the amount we are missing for +1 lot size is 1M part of a lot size
# we suppose its due to floating point error and round up
# Note: this is required to avoid a diff with C# equivalent
if missingForLotSize < (data.Security.SymbolProperties.LotSize / 1000000):
remainder -= data.Security.SymbolProperties.LotSize
# round down to even lot size
orderSize -= remainder
if ((np.sign(unorderedQuantity) * orderSize) >= 1):
algorithm.MarketOrder(symbol, np.sign(unorderedQuantity) * orderSize)
self.targetsCollection.ClearFulfilled(algorithm)
def OnSecuritiesChanged(self, algorithm, changes):
'''Event fired each time the we add/remove securities from the data feed
Args:
algorithm: The algorithm instance that experienced the change in securities
changes: The security additions and removals from the algorithm'''
for added in changes.AddedSecurities:
if added.Symbol not in self.symbolData:
self.symbolData[added.Symbol] = SymbolData(algorithm, added, self.period, self.resolution)
for removed in changes.RemovedSecurities:
# clean up data from removed securities
symbol = removed.Symbol
if symbol in self.symbolData:
if self.IsSafeToRemove(algorithm, symbol):
data = self.symbolData.pop(symbol)
algorithm.SubscriptionManager.RemoveConsolidator(symbol, data.Consolidator)
def PriceIsFavorable(self, data, unorderedQuantity):
'''Determines if the current price is more than the configured
number of standard deviations away from the mean in the favorable direction.'''
sma = data.SMA.Current.Value
deviations = self.deviations * data.STD.Current.Value
if unorderedQuantity > 0:
return data.Security.BidPrice < sma - deviations
else:
return data.Security.AskPrice > sma + deviations
def IsSafeToRemove(self, algorithm, symbol):
'''Determines if it's safe to remove the associated symbol data'''
# confirm the security isn't currently a member of any universe
return not any([kvp.Value.ContainsMember(symbol) for kvp in algorithm.UniverseManager])
class SymbolData:
def __init__(self, algorithm, security, period, resolution):
symbol = security.Symbol
self.Security = security
self.Consolidator = algorithm.ResolveConsolidator(symbol, resolution)
smaName = algorithm.CreateIndicatorName(symbol, f"SMA{period}", resolution)
self.SMA = SimpleMovingAverage(smaName, period)
algorithm.RegisterIndicator(symbol, self.SMA, self.Consolidator)
stdName = algorithm.CreateIndicatorName(symbol, f"STD{period}", resolution)
self.STD = StandardDeviation(stdName, period)
algorithm.RegisterIndicator(symbol, self.STD, self.Consolidator)
# warmup our indicators by pushing history through the indicators
history = algorithm.History(symbol, period, resolution)
if 'close' in history:
history = history.close.unstack(0).squeeze()
for time, value in history.iteritems():
self.SMA.Update(time, value)
self.STD.Update(time, value)
from clr import AddReference
AddReference("QuantConnect.Common")
AddReference("QuantConnect.Algorithm.Framework")
from QuantConnect import Resolution, Extensions
from QuantConnect.Algorithm.Framework.Alphas import *
from QuantConnect.Algorithm.Framework.Portfolio import *
from itertools import groupby
from datetime import datetime, timedelta
from pytz import utc
UTCMIN = datetime.min.replace(tzinfo=utc).time()
import datetime
class EqualWeightingPortfolioConstructionModel(PortfolioConstructionModel):
'''Provides an implementation of IPortfolioConstructionModel that gives equal weighting to all securities.
The target percent holdings of each security is 1/N where N is the number of securities.
For insights of direction InsightDirection.Up, long targets are returned and
for insights of direction InsightDirection.Down, short targets are returned.'''
def __init__(self, resolution = Resolution.Daily):
'''Initialize a new instance of EqualWeightingPortfolioConstructionModel
Args:
resolution: Rebalancing frequency'''
self.insightCollection = InsightCollection()
self.removedSymbols = []
self.nextExpiryTime = UTCMIN
self.rebalancingTime = UTCMIN
self.rebalancingPeriod = Extensions.ToTimeSpan(resolution)
def ShouldCreateTargetForInsight(self, insight):
'''Method that will determine if the portfolio construction model should create a
target for this insight
Args:
insight: The insight to create a target for'''
return True
def DetermineTargetPercent(self, activeInsights):
'''Will determine the target percent for each insight
Args:
activeInsights: The active insights to generate a target for'''
result = {}
# give equal weighting to each security
count = sum(x.Direction != InsightDirection.Flat for x in activeInsights)
percent = 0 if count == 0 else 1.0 / count
for insight in activeInsights:
result[insight] = insight.Direction * percent
return result
def CreateTargets(self, algorithm, insights):
'''Create portfolio targets from the specified insights
Args:
algorithm: The algorithm instance
insights: The insights to create portfolio targets from
Returns:
An enumerable of portfolio targets to be sent to the execution model'''
targets = []
time = algorithm.UtcTime.time()
if (time <= self.nextExpiryTime and
time <= self.rebalancingTime and
len(insights) == 0 and
self.removedSymbols is None):
return targets
# Get insight that haven't expired of each symbol that is still in the universe
activeInsights = self.insightCollection.GetActiveInsights(algorithm.UtcTime)
for insight in insights:
if self.ShouldCreateTargetForInsight(insight):
buy_count = sum(x.Direction != InsightDirection.Flat for x in activeInsights)
#invested_securities = 0
if (buy_count) < 2:
self.insightCollection.Add(insight)
# Create flatten target for each security that was removed from the universe
if self.removedSymbols is not None:
universeDeselectionTargets = [ PortfolioTarget(symbol, 0) for symbol in self.removedSymbols ]
targets.extend(universeDeselectionTargets)
self.removedSymbols = None
# Get the last generated active insight for each symbol
lastActiveInsights = []
for symbol, g in groupby(activeInsights, lambda x: x.Symbol):
lastActiveInsights.append(sorted(g, key = lambda x: x.GeneratedTimeUtc)[-1])
# Determine target percent for the given insights
percents = self.DetermineTargetPercent(lastActiveInsights)
errorSymbols = {}
for insight in lastActiveInsights:
target = PortfolioTarget.Percent(algorithm, insight.Symbol, percents[insight])
if not target is None:
targets.append(target)
else:
errorSymbols[insight.Symbol] = insight.Symbol
# Get expired insights and create flatten targets for each symbol
expiredInsights = self.insightCollection.RemoveExpiredInsights(algorithm.UtcTime)
expiredTargets = []
for symbol, f in groupby(expiredInsights, lambda x: x.Symbol):
if not self.insightCollection.HasActiveInsights(symbol, algorithm.UtcTime) and not symbol in errorSymbols:
expiredTargets.append(PortfolioTarget(symbol, 0))
continue
targets.extend(expiredTargets)
self.nextExpiryTime = self.insightCollection.GetNextExpiryTime()
if self.nextExpiryTime is None:
self.nextExpiryTime = UTCMIN
self.rebalancingTime = algorithm.UtcTime + self.rebalancingPeriod
return targets
def OnSecuritiesChanged(self, algorithm, changes):
'''Event fired each time the we add/remove securities from the data feed
Args:
algorithm: The algorithm instance that experienced the change in securities
changes: The security additions and removals from the algorithm'''
# Get removed symbol and invalidate them in the insight collection
self.removedSymbols = [x.Symbol for x in changes.RemovedSecurities]
self.insightCollection.Clear(self.removedSymbols)
####
# QUANTCONNECT.COM - Democratizing Finance, Empowering Individuals.
# Lean Algorithmic Trading Engine v2.0. Copyright 2014 QuantConnect Corporation.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from clr import AddReference
AddReference("QuantConnect.Common")
AddReference("QuantConnect.Algorithm")
AddReference("QuantConnect.Algorithm.Framework")
AddReference("QuantConnect.Indicators")
from QuantConnect import *
from QuantConnect.Indicators import *
from QuantConnect.Algorithm import *
from QuantConnect.Algorithm.Framework import *
from QuantConnect.Algorithm.Framework.Alphas import *
from datetime import timedelta
from enum import Enum
class BasePairsTradingAlphaModel(AlphaModel):
'''This alpha model is designed to accept every possible pair combination
from securities selected by the universe selection model
This model generates alternating long ratio/short ratio insights emitted as a group'''
def __init__(self, lookback = 252,
resolution = Resolution.Daily,
threshold = 2.5):
''' Initializes a new instance of the PairsTradingAlphaModel class
Args:
lookback: Lookback period of the analysis
resolution: Analysis resolution
threshold: The percent [0, 100] deviation of the ratio from the mean before emitting an insight'''
self.lookback = lookback
self.resolution = resolution
self.threshold = threshold
self.predictionInterval = Time.Multiply(Extensions.ToTimeSpan(self.resolution), self.lookback)
self.pairs = dict()
self.Securities = list()
resolutionString = Extensions.GetEnumString(resolution, Resolution)
self.Name = f'{self.__class__.__name__}({self.lookback},{resolutionString},{Extensions.NormalizeToStr(threshold)})'
def Update(self, algorithm, data):
''' Updates this alpha model with the latest data from the algorithm.
This is called each time the algorithm receives data for subscribed securities
Args:
algorithm: The algorithm instance
data: The new data available
Returns:
The new insights generated'''
insights = []
for key, pair in self.pairs.items():
insights.extend(pair.GetInsightGroup())
return insights
def OnSecuritiesChanged(self, algorithm, changes):
'''Event fired each time the we add/remove securities from the data feed.
Args:
algorithm: The algorithm instance that experienced the change in securities
changes: The security additions and removals from the algorithm'''
for security in changes.AddedSecurities:
#self.AddSecurity(security)
self.Securities.append(security)
#algorithm.Debug(security.Symbol.Value) just ticker
#security.FeeModel = ConstantFeeModel(0)
for security in changes.RemovedSecurities:
if security in self.Securities:
self.Securities.remove(security)
self.UpdatePairs(algorithm)
for security in changes.RemovedSecurities:
keys = [k for k in self.pairs.keys() if security.Symbol in k]
for key in keys:
self.pairs.pop(key)
def UpdatePairs(self, algorithm):
symbols = sorted([x.Symbol for x in self.Securities], key=lambda x: str(x.ID))
for i in range(0, len(symbols)):
asset_i = symbols[i]
for j in range(1 + i, len(symbols)):
asset_j = symbols[j]
pair_symbol = (asset_i, asset_j)
invert = (asset_j, asset_i)
if pair_symbol in self.pairs or invert in self.pairs:
continue
if not self.HasPassedTest(algorithm, asset_i, asset_j):
continue
pair = self.Pair(algorithm, asset_i, asset_j, self.predictionInterval, self.threshold)
self.pairs[pair_symbol] = pair
def HasPassedTest(self, algorithm, asset1, asset2):
'''Check whether the assets pass a pairs trading test
Args:
algorithm: The algorithm instance that experienced the change in securities
asset1: The first asset's symbol in the pair
asset2: The second asset's symbol in the pair
Returns:
True if the statistical test for the pair is successful'''
return True
class Pair:
class State(Enum):
ShortRatio = -1
FlatRatio = 0
LongRatio = 1
def __init__(self, algorithm, asset1, asset2, predictionInterval, threshold):
'''Create a new pair
Args:
algorithm: The algorithm instance that experienced the change in securities
asset1: The first asset's symbol in the pair
asset2: The second asset's symbol in the pair
predictionInterval: Period over which this insight is expected to come to fruition
threshold: The percent [0, 100] deviation of the ratio from the mean before emitting an insight'''
self.state = self.State.FlatRatio
self.asset1 = asset1
self.asset2 = asset2
self.asset1Price = algorithm.Identity(asset1)
self.asset2Price = algorithm.Identity(asset2)
self.ratio = IndicatorExtensions.Over(self.asset1Price, self.asset2Price)
self.mean = IndicatorExtensions.Of(ExponentialMovingAverage(500), self.ratio)
upper = ConstantIndicator[IndicatorDataPoint]("ct", 1 + threshold / 100)
self.upperThreshold = IndicatorExtensions.Times(self.mean, upper)
lower = ConstantIndicator[IndicatorDataPoint]("ct", 1 - threshold / 100)
self.lowerThreshold = IndicatorExtensions.Times(self.mean, lower)
self.predictionInterval = predictionInterval
def GetInsightGroup(self):
'''Gets the insights group for the pair
Returns:
Insights grouped by an unique group id'''
if not self.mean.IsReady:
return []
# don't re-emit the same direction
if self.state is not self.State.LongRatio and self.ratio > self.upperThreshold:
self.state = self.State.LongRatio
# asset1/asset2 is more than 2 std away from mean, short asset1, long asset2
shortAsset1 = Insight.Price(self.asset1, self.predictionInterval, InsightDirection.Down)
longAsset2 = Insight.Price(self.asset2, self.predictionInterval, InsightDirection.Up)
# creates a group id and set the GroupId property on each insight object
return Insight.Group(shortAsset1, longAsset2)
# don't re-emit the same direction
if self.state is not self.State.ShortRatio and self.ratio < self.lowerThreshold:
self.state = self.State.ShortRatio
# asset1/asset2 is less than 2 std away from mean, long asset1, short asset2
longAsset1 = Insight.Price(self.asset1, self.predictionInterval, InsightDirection.Up)
shortAsset2 = Insight.Price(self.asset2, self.predictionInterval, InsightDirection.Down)
# creates a group id and set the GroupId property on each insight object
return Insight.Group(longAsset1, shortAsset2)
return []
###