from Correlated_Pairs_Alpha_Model import BasePairsTradingAlphaModel
from Execution.ImmediateExecutionModel import ImmediateExecutionModel
from Portfolio.EqualWeightingPortfolioConstructionModel import EqualWeightingPortfolioConstructionModel
from Risk.MaximumDrawdownPercentPerSecurity import MaximumDrawdownPercentPerSecurity
class ParticleVerticalContainmentField(QCAlgorithm):
def Initialize(self):
self.SetStartDate(2002, 7, 1) # Set Start Date
self.SetCash(100000) # Set Strategy Cash
self.AddAlpha(BasePairsTradingAlphaModel(lookback = 252,
resolution = Resolution.Daily,
threshold = 2.5))
self.SetExecution(ImmediateExecutionModel())
self.SetPortfolioConstruction(EqualWeightingPortfolioConstructionModel())
self.SetRiskManagement(MaximumDrawdownPercentPerSecurity(0.01))
self.__numberOfSymbols = 1000
self.__numberOfSymbolsFine = 500
self.SetUniverseSelection(FineFundamentalUniverseSelectionModel(self.CoarseSelectionFunction, self.FineSelectionFunction, None, None))
# sort the data by daily dollar volume and take the top 'NumberOfSymbols'
def CoarseSelectionFunction(self, coarse):
# sort descending by daily dollar volume
sortedByDollarVolume = sorted(coarse, key=lambda x: x.DollarVolume, reverse=True)
# return the symbol objects of the top entries from our sorted collection
return [ x.Symbol for x in sortedByDollarVolume[:self.__numberOfSymbols] ]
# sort the data by P/E ratio and take the top 'NumberOfSymbolsFine'
def FineSelectionFunction(self, fine):
# sort descending by P/E ratio
filteredFine = [x for x in fine if x.AssetClassification.MorningstarSectorCode == MorningstarSectorCode.Energy]
if len(filteredFine) > self.__numberOfSymbolsFine:
number_of_fine = self.__numberOfSymbolsFine
else:
number_of_fine = len(filteredFine)
# take the top entries from our sorted collection
return [ x.Symbol for x in filteredFine[:number_of_fine] ]
'''
class CustomFeeModel(FeeModel):
def __init__(self, algorithm):
self.algorithm = algorithm
def GetOrderFee(self, parameters):
# custom fee math
fee = 0
#fee = max(1, parameters.Security.Price
#* parameters.Order.AbsoluteQuantity
#* 0.00001)
#self.algorithm.Log("CustomFeeModel: " + str(fee))
return OrderFee(CashAmount(fee, "USD"))
class CustomSlippageModel:
def __init__(self, algorithm):
self.algorithm = algorithm
def GetSlippageApproximation(self, asset, order):
# custom slippage math
slippage = asset.Price * 0.0001 * np.log10(2*float(order.AbsoluteQuantity))
self.algorithm.Log("CustomSlippageModel: " + str(slippage))
return slippage
'''
class StandardDeviationExecutionModel(ExecutionModel):
'''Execution model that submits orders while the current market prices is at least the configured number of standard
deviations away from the mean in the favorable direction (below/above for buy/sell respectively)'''
def __init__(self,
period = 60,
deviations = 2,
resolution = Resolution.Minute):
'''Initializes a new instance of the StandardDeviationExecutionModel class
Args:
period: Period of the standard deviation indicator
deviations: The number of deviations away from the mean before submitting an order
resolution: The resolution of the STD and SMA indicators'''
self.period = period
self.deviations = deviations
self.resolution = resolution
self.targetsCollection = PortfolioTargetCollection()
self.symbolData = {}
# Gets or sets the maximum order value in units of the account currency.
# This defaults to $20,000. For example, if purchasing a stock with a price
# of $100, then the maximum order size would be 200 shares.
self.MaximumOrderValue = 500000
def Execute(self, algorithm, targets):
'''Executes market orders if the standard deviation of price is more
than the configured number of deviations in the favorable direction.
Args:
algorithm: The algorithm instance
targets: The portfolio targets'''
self.targetsCollection.AddRange(targets)
# for performance we check count value, OrderByMarginImpact and ClearFulfilled are expensive to call
if self.targetsCollection.Count > 0:
for target in self.targetsCollection.OrderByMarginImpact(algorithm):
symbol = target.Symbol
# calculate remaining quantity to be ordered
unorderedQuantity = OrderSizing.GetUnorderedQuantity(algorithm, target)
# fetch our symbol data containing our STD/SMA indicators
data = self.symbolData.get(symbol, None)
if data is None: return
# check order entry conditions
if data.STD.IsReady and self.PriceIsFavorable(data, unorderedQuantity):
# get the maximum order size based on total order value
maxOrderSize = OrderSizing.Value(data.Security, self.MaximumOrderValue)
orderSize = np.min([maxOrderSize, np.abs(unorderedQuantity)])
remainder = orderSize % data.Security.SymbolProperties.LotSize
missingForLotSize = data.Security.SymbolProperties.LotSize - remainder
# if the amount we are missing for +1 lot size is 1M part of a lot size
# we suppose its due to floating point error and round up
# Note: this is required to avoid a diff with C# equivalent
if missingForLotSize < (data.Security.SymbolProperties.LotSize / 1000000):
remainder -= data.Security.SymbolProperties.LotSize
# round down to even lot size
orderSize -= remainder
if ((np.sign(unorderedQuantity) * orderSize) >= 1):
algorithm.MarketOrder(symbol, np.sign(unorderedQuantity) * orderSize)
self.targetsCollection.ClearFulfilled(algorithm)
def OnSecuritiesChanged(self, algorithm, changes):
'''Event fired each time the we add/remove securities from the data feed
Args:
algorithm: The algorithm instance that experienced the change in securities
changes: The security additions and removals from the algorithm'''
for added in changes.AddedSecurities:
if added.Symbol not in self.symbolData:
self.symbolData[added.Symbol] = SymbolData(algorithm, added, self.period, self.resolution)
for removed in changes.RemovedSecurities:
# clean up data from removed securities
symbol = removed.Symbol
if symbol in self.symbolData:
if self.IsSafeToRemove(algorithm, symbol):
data = self.symbolData.pop(symbol)
algorithm.SubscriptionManager.RemoveConsolidator(symbol, data.Consolidator)
def PriceIsFavorable(self, data, unorderedQuantity):
'''Determines if the current price is more than the configured
number of standard deviations away from the mean in the favorable direction.'''
sma = data.SMA.Current.Value
deviations = self.deviations * data.STD.Current.Value
if unorderedQuantity > 0:
return data.Security.BidPrice < sma - deviations
else:
return data.Security.AskPrice > sma + deviations
def IsSafeToRemove(self, algorithm, symbol):
'''Determines if it's safe to remove the associated symbol data'''
# confirm the security isn't currently a member of any universe
return not any([kvp.Value.ContainsMember(symbol) for kvp in algorithm.UniverseManager])
class SymbolData:
def __init__(self, algorithm, security, period, resolution):
symbol = security.Symbol
self.Security = security
self.Consolidator = algorithm.ResolveConsolidator(symbol, resolution)
smaName = algorithm.CreateIndicatorName(symbol, f"SMA{period}", resolution)
self.SMA = SimpleMovingAverage(smaName, period)
algorithm.RegisterIndicator(symbol, self.SMA, self.Consolidator)
stdName = algorithm.CreateIndicatorName(symbol, f"STD{period}", resolution)
self.STD = StandardDeviation(stdName, period)
algorithm.RegisterIndicator(symbol, self.STD, self.Consolidator)
# warmup our indicators by pushing history through the indicators
history = algorithm.History(symbol, period, resolution)
if 'close' in history:
history = history.close.unstack(0).squeeze()
for time, value in history.iteritems():
self.SMA.Update(time, value)
self.STD.Update(time, value)
from clr import AddReference
AddReference("QuantConnect.Common")
AddReference("QuantConnect.Algorithm.Framework")
from QuantConnect import Resolution, Extensions
from QuantConnect.Algorithm.Framework.Alphas import *
from QuantConnect.Algorithm.Framework.Portfolio import *
from itertools import groupby
from datetime import datetime, timedelta
from pytz import utc
UTCMIN = datetime.min.replace(tzinfo=utc).time()
import datetime
class EqualWeightingPortfolioConstructionModel(PortfolioConstructionModel):
'''Provides an implementation of IPortfolioConstructionModel that gives equal weighting to all securities.
The target percent holdings of each security is 1/N where N is the number of securities.
For insights of direction InsightDirection.Up, long targets are returned and
for insights of direction InsightDirection.Down, short targets are returned.'''
def __init__(self, resolution = Resolution.Daily):
'''Initialize a new instance of EqualWeightingPortfolioConstructionModel
Args:
resolution: Rebalancing frequency'''
self.insightCollection = InsightCollection()
self.removedSymbols = []
self.nextExpiryTime = UTCMIN
self.rebalancingTime = UTCMIN
self.rebalancingPeriod = Extensions.ToTimeSpan(resolution)
def ShouldCreateTargetForInsight(self, insight):
'''Method that will determine if the portfolio construction model should create a
target for this insight
Args:
insight: The insight to create a target for'''
return True
def DetermineTargetPercent(self, activeInsights):
'''Will determine the target percent for each insight
Args:
activeInsights: The active insights to generate a target for'''
result = {}
# give equal weighting to each security
count = sum(x.Direction != InsightDirection.Flat for x in activeInsights)
percent = 0 if count == 0 else 1.0 / count
for insight in activeInsights:
result[insight] = insight.Direction * percent
return result
def CreateTargets(self, algorithm, insights):
'''Create portfolio targets from the specified insights
Args:
algorithm: The algorithm instance
insights: The insights to create portfolio targets from
Returns:
An enumerable of portfolio targets to be sent to the execution model'''
targets = []
time = algorithm.UtcTime.time()
if (time <= self.nextExpiryTime and
time <= self.rebalancingTime and
len(insights) == 0 and
self.removedSymbols is None):
return targets
# Get insight that haven't expired of each symbol that is still in the universe
activeInsights = self.insightCollection.GetActiveInsights(algorithm.UtcTime)
for insight in insights:
if self.ShouldCreateTargetForInsight(insight):
buy_count = sum(x.Direction != InsightDirection.Flat for x in activeInsights)
#invested_securities = 0
if (buy_count) < 2:
self.insightCollection.Add(insight)
# Create flatten target for each security that was removed from the universe
if self.removedSymbols is not None:
universeDeselectionTargets = [ PortfolioTarget(symbol, 0) for symbol in self.removedSymbols ]
targets.extend(universeDeselectionTargets)
self.removedSymbols = None
# Get the last generated active insight for each symbol
lastActiveInsights = []
for symbol, g in groupby(activeInsights, lambda x: x.Symbol):
lastActiveInsights.append(sorted(g, key = lambda x: x.GeneratedTimeUtc)[-1])
# Determine target percent for the given insights
percents = self.DetermineTargetPercent(lastActiveInsights)
errorSymbols = {}
for insight in lastActiveInsights:
target = PortfolioTarget.Percent(algorithm, insight.Symbol, percents[insight])
if not target is None:
targets.append(target)
else:
errorSymbols[insight.Symbol] = insight.Symbol
# Get expired insights and create flatten targets for each symbol
expiredInsights = self.insightCollection.RemoveExpiredInsights(algorithm.UtcTime)
expiredTargets = []
for symbol, f in groupby(expiredInsights, lambda x: x.Symbol):
if not self.insightCollection.HasActiveInsights(symbol, algorithm.UtcTime) and not symbol in errorSymbols:
expiredTargets.append(PortfolioTarget(symbol, 0))
continue
targets.extend(expiredTargets)
self.nextExpiryTime = self.insightCollection.GetNextExpiryTime()
if self.nextExpiryTime is None:
self.nextExpiryTime = UTCMIN
self.rebalancingTime = algorithm.UtcTime + self.rebalancingPeriod
return targets
def OnSecuritiesChanged(self, algorithm, changes):
'''Event fired each time the we add/remove securities from the data feed
Args:
algorithm: The algorithm instance that experienced the change in securities
changes: The security additions and removals from the algorithm'''
# Get removed symbol and invalidate them in the insight collection
self.removedSymbols = [x.Symbol for x in changes.RemovedSecurities]
self.insightCollection.Clear(self.removedSymbols)
####
# QUANTCONNECT.COM - Democratizing Finance, Empowering Individuals.
# Lean Algorithmic Trading Engine v2.0. Copyright 2014 QuantConnect Corporation.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from clr import AddReference
AddReference("QuantConnect.Common")
AddReference("QuantConnect.Algorithm")
AddReference("QuantConnect.Algorithm.Framework")
AddReference("QuantConnect.Indicators")
from QuantConnect import *
from QuantConnect.Indicators import *
from QuantConnect.Algorithm import *
from QuantConnect.Algorithm.Framework import *
from QuantConnect.Algorithm.Framework.Alphas import *
from datetime import timedelta
from enum import Enum
class BasePairsTradingAlphaModel(AlphaModel):
'''This alpha model is designed to accept every possible pair combination
from securities selected by the universe selection model
This model generates alternating long ratio/short ratio insights emitted as a group'''
def __init__(self, lookback = 252,
resolution = Resolution.Daily,
threshold = 2.5):
''' Initializes a new instance of the PairsTradingAlphaModel class
Args:
lookback: Lookback period of the analysis
resolution: Analysis resolution
threshold: The percent [0, 100] deviation of the ratio from the mean before emitting an insight'''
self.lookback = lookback
self.resolution = resolution
self.threshold = threshold
self.predictionInterval = Time.Multiply(Extensions.ToTimeSpan(self.resolution), self.lookback)
self.pairs = dict()
self.Securities = list()
resolutionString = Extensions.GetEnumString(resolution, Resolution)
self.Name = f'{self.__class__.__name__}({self.lookback},{resolutionString},{Extensions.NormalizeToStr(threshold)})'
def Update(self, algorithm, data):
''' Updates this alpha model with the latest data from the algorithm.
This is called each time the algorithm receives data for subscribed securities
Args:
algorithm: The algorithm instance
data: The new data available
Returns:
The new insights generated'''
insights = []
for key, pair in self.pairs.items():
insights.extend(pair.GetInsightGroup())
return insights
def OnSecuritiesChanged(self, algorithm, changes):
'''Event fired each time the we add/remove securities from the data feed.
Args:
algorithm: The algorithm instance that experienced the change in securities
changes: The security additions and removals from the algorithm'''
for security in changes.AddedSecurities:
self.Securities.append(security)
#algorithm.Debug(security.Symbol.Value) just ticker
security.FeeModel = ConstantFeeModel(0)
for security in changes.RemovedSecurities:
if security in self.Securities:
self.Securities.remove(security)
self.UpdatePairs(algorithm)
for security in changes.RemovedSecurities:
keys = [k for k in self.pairs.keys() if security.Symbol in k]
for key in keys:
self.pairs.pop(key)
def UpdatePairs(self, algorithm):
symbols = sorted([x.Symbol for x in self.Securities], key=lambda x: str(x.ID))
for i in range(0, len(symbols)):
asset_i = symbols[i]
for j in range(1 + i, len(symbols)):
asset_j = symbols[j]
pair_symbol = (asset_i, asset_j)
invert = (asset_j, asset_i)
if pair_symbol in self.pairs or invert in self.pairs:
continue
if not self.HasPassedTest(algorithm, asset_i, asset_j):
continue
pair = self.Pair(algorithm, asset_i, asset_j, self.predictionInterval, self.threshold)
self.pairs[pair_symbol] = pair
def HasPassedTest(self, algorithm, asset1, asset2):
'''Check whether the assets pass a pairs trading test
Args:
algorithm: The algorithm instance that experienced the change in securities
asset1: The first asset's symbol in the pair
asset2: The second asset's symbol in the pair
Returns:
True if the statistical test for the pair is successful'''
return True
class Pair:
class State(Enum):
ShortRatio = -1
FlatRatio = 0
LongRatio = 1
def __init__(self, algorithm, asset1, asset2, predictionInterval, threshold):
'''Create a new pair
Args:
algorithm: The algorithm instance that experienced the change in securities
asset1: The first asset's symbol in the pair
asset2: The second asset's symbol in the pair
predictionInterval: Period over which this insight is expected to come to fruition
threshold: The percent [0, 100] deviation of the ratio from the mean before emitting an insight'''
self.state = self.State.FlatRatio
self.asset1 = asset1
self.asset2 = asset2
self.asset1Price = algorithm.Identity(asset1)
self.asset2Price = algorithm.Identity(asset2)
self.ratio = IndicatorExtensions.Over(self.asset1Price, self.asset2Price)
self.mean = IndicatorExtensions.Of(ExponentialMovingAverage(500), self.ratio)
upper = ConstantIndicator[IndicatorDataPoint]("ct", 1 + threshold / 100)
self.upperThreshold = IndicatorExtensions.Times(self.mean, upper)
lower = ConstantIndicator[IndicatorDataPoint]("ct", 1 - threshold / 100)
self.lowerThreshold = IndicatorExtensions.Times(self.mean, lower)
self.predictionInterval = predictionInterval
def GetInsightGroup(self):
'''Gets the insights group for the pair
Returns:
Insights grouped by an unique group id'''
if not self.mean.IsReady:
return []
# don't re-emit the same direction
if self.state is not self.State.LongRatio and self.ratio > self.upperThreshold:
self.state = self.State.LongRatio
# asset1/asset2 is more than 2 std away from mean, short asset1, long asset2
shortAsset1 = Insight.Price(self.asset1, self.predictionInterval, InsightDirection.Down)
longAsset2 = Insight.Price(self.asset2, self.predictionInterval, InsightDirection.Up)
# creates a group id and set the GroupId property on each insight object
return Insight.Group(shortAsset1, longAsset2)
# don't re-emit the same direction
if self.state is not self.State.ShortRatio and self.ratio < self.lowerThreshold:
self.state = self.State.ShortRatio
# asset1/asset2 is less than 2 std away from mean, long asset1, short asset2
longAsset1 = Insight.Price(self.asset1, self.predictionInterval, InsightDirection.Up)
shortAsset2 = Insight.Price(self.asset2, self.predictionInterval, InsightDirection.Down)
# creates a group id and set the GroupId property on each insight object
return Insight.Group(longAsset1, shortAsset2)
return []
###