# https://quantpedia.com/Screener/Details/12
# https://www.quantconnect.com/tutorials/strategy-library/pairs-trading-with-stocks
import numpy as np
import pandas as pd
from scipy import stats
from math import floor
from datetime import timedelta
from collections import deque
import itertools as it
from decimal import Decimal
# ! 1. Should we use the rolling window to constantly update our model and the deviations?
# ! 2. Should we trade more than 20 stocks at a time?
# ! 3. Should we find some other formulas to replace the Pearson's correlation in order to find the more correlated paired stocks?
# ! 4. Should we expand the **whole** universe instead of only looking at the stocks in S&P 500?
class PairsTradingAlgorithm(QCAlgorithm):
def Initialize(self):
self.SetStartDate(2018,1,5)
self.SetEndDate(2021,7,1)
self.SetCash(100000)
self.SetUniverseSelection(QC500UniverseSelectionModel())
tickers = [
'XLK', 'QQQ', 'BANC', 'BBVA', 'BBD', 'BCH', 'BLX', 'BSBR', 'BSAC', 'SAN',
'CIB', 'BXS', 'BAC', 'BOH', 'BMO', 'BK', 'BNS', 'BKU', 'BBT','NBHC', 'OFG',
'BFR', 'CM', 'COF', 'C', 'VLY', 'WFC', 'WAL', 'WBK','RBS', 'SHG', 'STT', 'STL',
'SCNB', 'SMFG', 'STI',
# 'DKT', 'DB', 'EVER', 'KB', 'KEY', , 'MTB', 'BMA', 'MFCB', 'MSL', 'MTU', 'MFG',
# 'PVTD', 'PB', 'PFS', 'RF', 'RY', 'RBS', 'SHG', 'STT', 'STL', 'SCNB', 'SMFG', 'STI',
# 'SNV', 'TCB', 'TD', 'USB', 'UBS', 'VLY', 'WFC', 'WAL', 'WBK', 'WF', 'YDKN', 'ZBK'
]
self.threshold = 2
self.symbols = []
self.symbols_has_his = []
for i in tickers:
self.symbols.append(self.AddEquity(i, Resolution.Daily).Symbol)
self.pairs = {}
self.formation_period = 252
self.history_price = {}
self.symbols_has_his = self.symbols.copy()
for symbol in self.symbols:
hist = self.History([symbol], self.formation_period+1, Resolution.Daily)
if hist.empty:
self.symbols_has_his.remove(symbol)
else:
self.history_price[str(symbol)] = deque(maxlen=self.formation_period)
for tuple in hist.loc[str(symbol)].itertuples():
self.history_price[str(symbol)].append(float(tuple.close))
if len(self.history_price[str(symbol)]) < self.formation_period:
self.symbols_has_his.remove(symbol)
self.history_price.pop(str(symbol))
self.symbol_pairs = list(it.combinations(self.symbols_has_his, 2))
# Add the benchmark
self.AddEquity("SPY", Resolution.Daily)
self.Schedule.On(self.DateRules.MonthStart("SPY"), self.TimeRules.AfterMarketOpen("SPY"), self.Rebalance)
self.count = 0
self.sorted_pairs = None
def OnData(self, data):
# Update the price series everyday
for symbol in self.symbols_has_his:
if data.Bars.ContainsKey(symbol) and str(symbol) in self.history_price:
self.history_price[str(symbol)].append(float(data[symbol].Close))
if self.sorted_pairs is None: return
for i in self.sorted_pairs:
# calculate the spread of two price series
spread = np.array(self.history_price[str(i[0])]) - np.array(self.history_price[str(i[1])])
mean = np.mean(spread)
std = np.std(spread)
ratio = self.Portfolio[i[0]].Price / self.Portfolio[i[1]].Price
# long-short position is opened when pair prices have diverged by two standard deviations
if spread[-1] > mean + self.threshold * std:
if not self.Portfolio[i[0]].Invested and not self.Portfolio[i[1]].Invested:
quantity = int(self.CalculateOrderQuantity(i[0], 0.2))
self.Sell(i[0], quantity)
self.Buy(i[1], floor(ratio*quantity))
elif spread[-1] < mean - self.threshold * std:
quantity = int(self.CalculateOrderQuantity(i[0], 0.2))
if not self.Portfolio[i[0]].Invested and not self.Portfolio[i[1]].Invested:
self.Sell(i[1], quantity)
self.Buy(i[0], floor(ratio*quantity))
# the position is closed when prices revert back
elif self.Portfolio[i[0]].Invested and self.Portfolio[i[1]].Invested:
self.Liquidate(i[0])
self.Liquidate(i[1])
def Rebalance(self):
# schedule the event to fire every half year to select pairs with the smallest historical distance
if self.count % 6 == 0:
distances = {}
for i in self.symbol_pairs:
distances[i] = Pair(i[0], i[1], self.history_price[str(i[0])], self.history_price[str(i[1])]).distance()
self.sorted_pairs = sorted(distances, key = lambda x: distances[x])[:4]
self.count += 1
class Pair:
def __init__(self, symbol_a, symbol_b, price_a, price_b):
self.symbol_a = symbol_a
self.symbol_b = symbol_b
self.price_a = price_a
self.price_b = price_b
def distance(self):
# calculate the sum of squared deviations between two normalized price series
norm_a = np.array(self.price_a)/self.price_a[0]
norm_b = np.array(self.price_b)/self.price_b[0]
return sum((norm_a - norm_b)**2)
import numpy as np
import pandas as pd
class DistanceApproachHighVariancePair:
threshold = None
def __init__(
self,
symbol_a:str, symbol_b:str,
price_a, price_b,
threshold=1.5
):
self.symbol_a = symbol_a
self.symbol_b = symbol_b
self.price_a = np.array(price_a)
self.price_b = np.array(price_b)
self.threshold = threshold
self.distance = self._distance()
self.var = self._variance()
self.hist_dev = self._dev()
def _distance(self):
# calculate the sum of squared deviations between two normalized price series
return np.square(self.price_a - self.price_b).sum()
def _variance(self):
spread = self.price_a - self.price_b
mean = np.mean(spread)
var = np.sum(np.square(spread - mean)) / (len(spread) - 1)
return var
def _dev(self):
spread = self.price_a - self.price_b
mean = np.mean(spread)
dev = np.sqrt(np.sum(np.square(spread - mean)) / (len(spread) - 1))
return dev
from AlgorithmImports import *
from Selection.FundamentalUniverseSelectionModel import FundamentalUniverseSelectionModel
from itertools import groupby
from math import ceil
class QC500UniverseSelectionModel(FundamentalUniverseSelectionModel):
'''Defines the QC500 universe as a universe selection model for framework algorithm
For details: https://github.com/QuantConnect/Lean/pull/1663'''
def __init__(self, filterFineData = True, universeSettings = None, numOfUniverse = 500):
'''Initializes a new default instance of the QC500UniverseSelectionModel'''
super().__init__(filterFineData, universeSettings)
self.numberOfSymbolsCoarse = 1000
self.numberOfSymbolsFine = numOfUniverse
self.dollarVolumeBySymbol = {}
self.lastMonth = -1
def SelectCoarse(self, algorithm, coarse):
'''Performs coarse selection for the QC500 constituents.
The stocks must have fundamental data
The stock must have positive previous-day close price
The stock must have positive volume on the previous trading day'''
if algorithm.Time.month == self.lastMonth:
return Universe.Unchanged
sortedByDollarVolume = sorted(
[x for x in coarse if x.HasFundamentalData and x.Volume > 0 and x.Price > 0],
key = lambda x: x.DollarVolume,
reverse=True
)[:self.numberOfSymbolsCoarse]
self.dollarVolumeBySymbol = {x.Symbol:x.DollarVolume for x in sortedByDollarVolume}
# If no security has met the QC500 criteria, the universe is unchanged.
# A new selection will be attempted on the next trading day as self.lastMonth is not updated
if len(self.dollarVolumeBySymbol) == 0:
return Universe.Unchanged
# return the symbol objects our sorted collection
return list(self.dollarVolumeBySymbol.keys())
def SelectFine(self, algorithm, fine):
'''Performs fine selection for the QC500 constituents
The company's headquarter must in the U.S.
The stock must be traded on either the NYSE or NASDAQ
At least half a year since its initial public offering
The stock's market cap must be greater than 500 million'''
sortedBySector = sorted(
[x for x in fine if x.CompanyReference.CountryId == "USA"
and x.CompanyReference.PrimaryExchangeID in ["NYS","NAS"]
and (algorithm.Time - x.SecurityReference.IPODate).days > 180
and x.MarketCap > 5e8],
key = lambda x: x.CompanyReference.IndustryTemplateCode
)
count = len(sortedBySector)
# If no security has met the QC500 criteria, the universe is unchanged.
# A new selection will be attempted on the next trading day as self.lastMonth is not updated
if count == 0:
return Universe.Unchanged
# Update self.lastMonth after all QC500 criteria checks passed
self.lastMonth = algorithm.Time.month
percent = self.numberOfSymbolsFine / count
sortedByDollarVolume = []
# select stocks with top dollar volume in every single sector
for code, g in groupby(sortedBySector, lambda x: x.CompanyReference.IndustryTemplateCode):
y = sorted(
g,
key = lambda x: self.dollarVolumeBySymbol[x.Symbol],
reverse = True
)
c = ceil(len(y) * percent)
sortedByDollarVolume.extend(y[:c])
sortedByDollarVolume = sorted(
sortedByDollarVolume,
key = lambda x: self.dollarVolumeBySymbol[x.Symbol],
reverse=True
)
return [x.Symbol for x in sortedByDollarVolume[:self.numberOfSymbolsFine]]
from AlgorithmImports import *
import itertools as it
import numpy as np
import pandas as pd
from enum import Enum
from sklearn import linear_model
class PairStatus(Enum):
'''Defines the state. This is used to prevent signal spamming and aid in bounce detection.'''
Short = -1 # Status to keep shorting
Flat = 0 # Status to close position
Long = 1 # Status to keep Long
class DistanceApproachPearsonCorrelationPair:
threshold = None
def __init__(
self,
symbol_a:str, symbol_b:str,
price_a, price_b,
threshold=1.5
):
self.symbol_a = symbol_a
self.symbol_b = symbol_b
self.price_a = np.array(price_a)
self.price_b = np.array(price_b)
self.threshold = threshold
self.distance = self._pearson_coef()
def _pearson_coef(self):
# To get the exact correlation from the corr matrix
return_a = np.diff(np.log(self.price_a), axis=0).flatten()
return_b = np.diff(np.log(self.price_b), axis=0).flatten()
coef = np.corrcoef(return_a, return_b)[0][1]
return coef
class DistanceApproachPearsonCorrelationAlphaModel(AlphaModel):
def __init__(
self,
capacity = 20,
resolution = Resolution.Daily
):
self.resolution = resolution
self.symbol_pairs = []
self.formation_period = 22 * 12
self.threshold = 1.5
self.historical_data = dict()
self.invested_long_list = list()
self.invested_short_list = list()
self._changes = None
self.capacity = capacity
self.lastChange = None
# resolutionString = Extensions.GetEnumString(resolution, Resolution)
# self.Name = '{}({},{},{})'.format(self.__class__.__name__, fastPeriod, slowPeriod, resolutionString)
def _create_pairs(self, symbols):
# Creating the pairs
return list(it.combinations(symbols, 2))
def Update(self, algorithm, data):
algorithm.Debug(f'Time is {algorithm.Time}')
insights = []
if self._changes is not None:
for added in self._changes.AddedSecurities:
# * Dealing with new added securities, need to
# * 1. Create the RollingWindow
# * 2. Add the data into the RollingWindow (Up to today)
if added.Symbol not in self.historical_data.keys():
hist = algorithm.History(
[added.Symbol],
self.formation_period,
Resolution.Daily
)
if not hist.empty:
self.historical_data[added.Symbol] = RollingWindow[float](self.formation_period)
for bar in hist.loc[added.Symbol, :].itertuples():
self.historical_data[added.Symbol].Add(bar.close)
if not self.historical_data[added.Symbol].IsReady:
del self.historical_data[added.Symbol]
# algorithm.Debug(f'Historical data of {added.Symbol} at {algorithm.Time} - {hist["close"][-1]}: {self.historical_data[added.Symbol][0]}')
else:
algorithm.Debug('ERROR: Hist is empty')
for removed in self._changes.RemovedSecurities:
# * Dealing with securities removed, need to
# * 1. Check if the removed securities are in the invested list
# * 2. If yes, keep add the latest data into the RollingWindow
# * 3. If not, then we remove this RollingWindow
invested_symbol_list = [s.Key for s in algorithm.Portfolio if s.Value.Invested]
if removed.Symbol not in invested_symbol_list and removed.Symbol in self.historical_data.keys():
# algorithm.Debug(f'Remove {removed.Symbol} from historical_data')
del self.historical_data[removed.Symbol]
for s in self.historical_data.keys():
# * Dealing with already existed securities, need to
# * 1. Add the latest data into the RollingWindow
# ! Don't use the 'data' as sometimes we have securities not in the universe need to be processed.
if self._changes is not None and s in [sec.Symbol for sec in self._changes.AddedSecurities]:
continue
hist = algorithm.History(
[s],
5,
Resolution.Daily
)
# Somehow there are 'close' missing in the returned history.
# Therefore adding this logic to prevent this scenario crash
if 'close' in hist.columns:
self.historical_data[s].Add(hist['close'][-1])
else:
self.historical_data[s].Add(self.historical_data[s][0])
# algorithm.Debug(f'{str(s)} - Current price {hist["close"][-1]}')
# algorithm.Debug(self.historical_data[s][0])
# algorithm.Debug(self.historical_data[s][1])
# algorithm.Debug(self.historical_data[s][2])
# We update date daily, but process data to signals monthly
if self.lastChange == algorithm.Time.month:
return insights
self.lastChange = algorithm.Time.month
self.symbol_pairs = self._create_pairs(self.historical_data.keys())
# algorithm.Debug(f'Length of pairs: {len(self.symbol_pairs)}')
# Calculate distance and then sort by distance
distances = {}
for i in self.symbol_pairs:
distances[i] = DistanceApproachPearsonCorrelationPair(
i[0],
i[1],
self._normalize_data(self.historical_data[i[0]]),
self._normalize_data(self.historical_data[i[1]])
)
# algorithm.Debug(f'Spread of {str(i[0])} - {str(i[1])}: {distances[i].current_spread}')
sorted_pairs = {k:v for k,v in sorted(
distances.items(),
key = lambda x: (x[1].distance)
)}
# For each security, we add the highest 50 correlated securities to the list
pearson_pair_table = dict()
for _, pair in sorted_pairs.items():
if pair.symbol_a not in pearson_pair_table.keys():
pearson_pair_table[pair.symbol_a] = [pair.symbol_b]
else:
if len(pearson_pair_table[pair.symbol_a]) < 50:
pearson_pair_table[pair.symbol_a].append(pair.symbol_b)
if pair.symbol_b not in pearson_pair_table.keys():
pearson_pair_table[pair.symbol_b] = [pair.symbol_a]
else:
if len(pearson_pair_table[pair.symbol_b]) < 50:
pearson_pair_table[pair.symbol_b].append(pair.symbol_a)
self.price_dataframe = pd.concat(
[self._dataframelize(self.historical_data[symbol], symbol) for symbol in self.historical_data.keys()],
axis=1
)
divergence = dict()
for stock, benchmark_portfolio in pearson_pair_table.items():
p = self.price_dataframe[stock]
stock_rtn = np.log(p.pct_change()+1)
# Equal weight to construct this benchmark portfolio return
benchmark_rtn = (self.price_dataframe[benchmark_portfolio].apply(lambda x: np.log(x.pct_change()+1), axis=0).sum(axis=1)/len(benchmark_portfolio))
regr = linear_model.LinearRegression()
# a stock’s return, Lret, deviates from its pairs portfolioreturns, Cret.
# RetDiff = beta ∗ (Cret-Rf) − (Lret−Rf)
cret = np.array(benchmark_rtn).copy()
lret = np.array(stock_rtn).reshape((-1,1)).copy()
# Remove last cell which is na
cret = cret[1:]
lret = lret[1:]
cret_constant = np.array([list(x) for x in zip(cret, [1]*len(cret))])
regr.fit(cret_constant, lret)
beta = regr.coef_[0][0]
divergence[stock] = beta * benchmark_rtn.iloc[-1] - stock_rtn.iloc[-1]
# Cleaning the pair and Create insights
divergence = {k:v for k, v in sorted(
divergence.items(),
key = lambda item: item[1]
)}
# Start submitting the singal
insightExpiry = Expiry.EndOfDay(algorithm.Time)
new_long_list = list(divergence.keys())[:self.capacity]
new_short_list = list(divergence.keys())[-self.capacity:]
flat_list = [s for s in self.invested_long_list if s not in new_long_list] + [s for s in self.invested_short_list if s not in new_short_list]
self.invested_long_list = new_long_list
self.invested_short_list = new_short_list
for symbol in flat_list:
insights.append(
Insight.Price(
str(symbol),
insightExpiry,
InsightDirection.Flat,
None, None, None, 0.05
)
)
for symbol in self.invested_short_list:
insights.append(
Insight.Price(
str(symbol),
insightExpiry,
InsightDirection.Down,
None, None, None, 0.05
)
)
for symbol in self.invested_long_list:
insights.append(
Insight.Price(
str(symbol),
insightExpiry,
InsightDirection.Up,
None, None, None, 0.05
)
)
# Reset the changes
self._changes = None
# algorithm.Debug(f'Sent out {len(insights)} on {algorithm.Time}')
return insights
def OnSecuritiesChanged(self, algorithm, changes):
self._changes = changes
def _normalize_data(self, series):
# Here we only need data from [1:] days, and [0] is only needed when we calculate the spread
arr = np.array([x for x in series.GetEnumerator()])
max = arr[1:].max()
min = arr[1:].min()
return (arr - min) / (max - min)
def _dataframelize(self, price_rolling_window, symbol):
l = [n for n in price_rolling_window]
l.reverse()
df = pd.DataFrame(l, columns=[symbol])
return df
from AlgorithmImports import *
import itertools as it
import numpy as np
from enum import Enum
# * 1. Variants of how to rank the distance + high variance pair
# * 1.1. Should we use IR factor score
# * 1.2. or use linear regression?
class PairStatus(Enum):
'''Defines the state. This is used to prevent signal spamming and aid in bounce detection.'''
Short = -1 # Status to keep shorting
Flat = 0 # Status to close position
Long = 1 # Status to keep Long
class DistanceApproachHighVariancePair:
threshold = None
def __init__(
self,
symbol_a:str, symbol_b:str,
price_a, price_b,
threshold=1.5
):
self.symbol_a = symbol_a
self.symbol_b = symbol_b
self.price_a = np.array(price_a)
self.price_b = np.array(price_b)
self.threshold = threshold
self.distance = self._distance()
self.var = self._variance()
self.hist_dev = self._dev()
self.current_spread = self._spread()
def _distance(self):
# calculate the sum of squared deviations between two normalized price series
return np.square(self.price_a - self.price_b).sum()
def _variance(self):
spread = self.price_a - self.price_b
mean = np.mean(spread)
var = np.sum(np.square(spread - mean)) / (len(spread) - 1)
return var
def _dev(self):
spread = self.price_a - self.price_b
mean = np.mean(spread)
dev = np.sqrt(np.sum(np.square(spread - mean)) / (len(spread) - 1))
return dev
def _spread(self):
spread = self.price_a[0] - self.price_b[0]
return spread
class DistanceApproachHighVarianceAlphaModel(AlphaModel):
def __init__(
self,
capacity = 20,
resolution = Resolution.Daily
):
self.resolution = resolution
self.symbol_pairs = []
self.formation_period = 22 * 12
self.threshold = 1.5
self.max_pair_percentage = 0.3
self.historical_data = dict()
self.invested_pairs = dict()
self._changes = None
self.capacity = capacity
# resolutionString = Extensions.GetEnumString(resolution, Resolution)
# self.Name = '{}({},{},{})'.format(self.__class__.__name__, fastPeriod, slowPeriod, resolutionString)
def _create_pairs(self, symbols):
# Creating the pairs
return list(it.combinations(symbols, 2))
def Update(self, algorithm, data):
# * Example of launching the update on the exact date time
# if algorithm.Time.hour == 9 and algorithm.Time.minute == 31:
# return [Insight.Price("SPY", timedelta(minutes = 20), InsightDirection.Up, None, None, None, 0.5)]
# return []
algorithm.Debug(f'Time is {algorithm.Time}')
insights = []
if self._changes is not None:
for added in self._changes.AddedSecurities:
# * Dealing with new added securities, need to
# * 1. Create the RollingWindow
# * 2. Add the data into the RollingWindow (Up to today)
if added.Symbol not in self.historical_data.keys():
hist = algorithm.History(
[added.Symbol],
self.formation_period,
Resolution.Daily
)
if not hist.empty:
self.historical_data[added.Symbol] = RollingWindow[float](self.formation_period)
for bar in hist.loc[added.Symbol, :].itertuples():
self.historical_data[added.Symbol].Add(bar.close)
if not self.historical_data[added.Symbol].IsReady:
del self.historical_data[added.Symbol]
# algorithm.Debug(f'Historical data of {added.Symbol} at {algorithm.Time} - {hist["close"][-1]}: {self.historical_data[added.Symbol][0]}')
else:
algorithm.Debug('ERROR: Hist is empty')
for removed in self._changes.RemovedSecurities:
# * Dealing with securities removed, need to
# * 1. Check if the removed securities are in the invested list
# * 2. If yes, keep add the latest data into the RollingWindow
# * 3. If not, then we remove this RollingWindow
invested_symbol_list = [s.Key for s in algorithm.Portfolio if s.Value.Invested]
if removed.Symbol not in invested_symbol_list and removed.Symbol in self.historical_data.keys():
# algorithm.Debug(f'Remove {removed.Symbol} from historical_data')
del self.historical_data[removed.Symbol]
for s in self.historical_data.keys():
# * Dealing with already existed securities, need to
# * 1. Add the latest data into the RollingWindow
# ! Don't use the 'data' as sometimes we have securities not in the universe need to be processed.
if self._changes is not None and s in [sec.Symbol for sec in self._changes.AddedSecurities]:
continue
hist = algorithm.History(
[s],
5,
Resolution.Daily
)
# Somehow there are 'close' missing in the returned history.
# Therefore adding this logic to prevent this scenario crash
if 'close' in hist.columns:
self.historical_data[s].Add(hist['close'][-1])
else:
self.historical_data[s].Add(self.historical_data[s][0])
# algorithm.Debug(f'{str(s)} - Current price {hist["close"][-1]}')
# algorithm.Debug(self.historical_data[s][0])
# algorithm.Debug(self.historical_data[s][1])
# algorithm.Debug(self.historical_data[s][2])
self.symbol_pairs = self._create_pairs(self.historical_data.keys())
# algorithm.Debug(f'Length of pairs: {len(self.symbol_pairs)}')
# Calculate distance and then sort by distance
distances = {}
for i in self.symbol_pairs:
distances[i] = DistanceApproachHighVariancePair(
i[0],
i[1],
self._normalize_data(self.historical_data[i[0]]),
self._normalize_data(self.historical_data[i[1]])
)
# algorithm.Debug(f'Spread of {str(i[0])} - {str(i[1])}: {distances[i].current_spread}')
sorted_pairs = {k:v for k, v in sorted(
distances.items(),
key = lambda x: (x[1].distance)
)[:int(len(distances) * self.max_pair_percentage)]}
sorted_pairs = {k:v for k, v in sorted(
sorted_pairs.items(),
key = lambda x: x[1].var,
reverse = True
)}
# Cleaning the pair and Create insights
insightExpiry = Expiry.EndOfDay(algorithm.Time)
for k, v in sorted_pairs.items():
invested_stock_symbols = {paired_key[0] for paired_key in self.invested_pairs.keys()} | {paired_key[1] for paired_key in self.invested_pairs.keys()}
# Taking care of the delisting scenario
if data.Delistings.ContainsKey(str(k[0])) or data.Delistings.ContainsKey(str(k[1])):
self.invested_pairs[k] = PairStatus.Flat
insights.append(
Insight.Price(
str(k[0]),
insightExpiry,
InsightDirection.Flat,
None, None, None, 0.05 # Weight
)
)
insights.append(
Insight.Price(
str(k[1]),
insightExpiry,
InsightDirection.Flat,
None, None, None, 0.05 # Weight
),
)
# algorithm.Debug(f'[{str(k[0])}-{str(k[1])}]: Long->Close ({len(insights)})')
del self.invested_pairs[k]
del self.historical_data[k[0]]
del self.historical_data[k[1]]
continue
if k[0] not in invested_stock_symbols and k[1] not in invested_stock_symbols:
if len(self.invested_pairs) >= self.capacity:
# algorithm.Debug(f'Invested_pairs length = {len(self.invested_pairs)}')
continue
if v.current_spread >= self.threshold * v.hist_dev:
self.invested_pairs[k] = PairStatus.Short
insights.append(
Insight.Price(
str(k[0]),
# timedelta(days=1),
insightExpiry,
InsightDirection.Down,
None, None, None, 0.05 # Weight
)
)
insights.append(
Insight.Price(
str(k[1]),
insightExpiry,
InsightDirection.Up,
None, None, None, 0.05 # Weight
)
)
# algorithm.Debug(f'[{str(k[0])}-{str(k[1])}]: 0->Short ({len(insights)})')
elif v.current_spread <= -(self.threshold * v.hist_dev):
self.invested_pairs[k] = PairStatus.Long
insights.append(
Insight.Price(
str(k[0]),
insightExpiry,
InsightDirection.Up,
None, None, None, 0.05 # Weight
)
)
insights.append(
Insight.Price(
str(k[1]),
insightExpiry,
InsightDirection.Down,
None, None, None, 0.05 # Weight
)
)
# algorithm.Debug(f'[{str(k[0])}-{str(k[1])}]: 0->Long ({len(insights)})')
else:
pass
else:
if k not in self.invested_pairs.keys():
continue
if self.invested_pairs[k] == PairStatus.Long:
if v.current_spread < 0:
# self.invested_pairs[k] = PairStatus.Long
# insights.append(
# Insight.Price(
# str(k[0]),
# insightExpiry,
# InsightDirection.Up,
# None, None, None, 0.05 # Weight
# )
# )
# insights.append(
# Insight.Price(
# str(k[1]),
# insightExpiry,
# InsightDirection.Down,
# None, None, None, 0.05 # Weight
# ),
# )
# algorithm.Debug(f'[{str(k[0])}-{str(k[1])}]: Long->Long ({len(insights)})')
pass
# elif v.current_spread >= self.threshold * v.hist_dev:
# self.invested_pairs[k] = PairStatus.Short
# insights.append(
# Insight.Price(
# str(k[0]),
# insightExpiry,
# InsightDirection.Down,
# None, None, None, 0.05 # Weight
# )
# )
# insights.append(
# Insight.Price(
# str(k[1]),
# insightExpiry,
# InsightDirection.Up,
# None, None, None, 0.05 # Weight
# ),
# )
# # algorithm.Debug(f'[{str(k[0])}-{str(k[1])}]: Long->Short ({len(insights)})')
else:
self.invested_pairs[k] = PairStatus.Flat
insights.append(
Insight.Price(
str(k[0]),
insightExpiry,
InsightDirection.Flat,
None, None, None, 0.05 # Weight
)
)
insights.append(
Insight.Price(
str(k[1]),
insightExpiry,
InsightDirection.Flat,
None, None, None, 0.05 # Weight
),
)
# algorithm.Debug(f'[{str(k[0])}-{str(k[1])}]: Long->Close ({len(insights)})')
del self.invested_pairs[k]
del self.historical_data[k[0]]
del self.historical_data[k[1]]
elif self.invested_pairs[k] == PairStatus.Short:
if v.current_spread > 0:
# self.invested_pairs[k] = PairStatus.Short
# insights.append(
# Insight.Price(
# str(k[0]),
# insightExpiry,
# InsightDirection.Down,
# None, None, None, 0.05 # Weight
# )
# )
# insights.append(
# Insight.Price(
# str(k[1]),
# insightExpiry,
# InsightDirection.Up,
# None, None, None, 0.05 # Weight
# ),
# )
# algorithm.Debug(f'[{str(k[0])}-{str(k[1])}]: Short->Short ({len(insights)})')
pass
# elif v.current_spread <= -(self.threshold * v.hist_dev):
# self.invested_pairs[k] = PairStatus.Long
# insights.append(
# Insight.Price(
# str(k[0]),
# insightExpiry,
# InsightDirection.Up,
# None, None, None, 0.05 # Weight
# )
# )
# insights.append(
# Insight.Price(
# str(k[1]),
# insightExpiry,
# InsightDirection.Down,
# None, None, None, 0.05 # Weight
# ),
# )
# # algorithm.Debug(f'[{str(k[0])}-{str(k[1])}]: Short->Long ({len(insights)})')
else:
self.invested_pairs[k] = PairStatus.Flat
insights.append(
Insight.Price(
str(k[0]),
insightExpiry,
InsightDirection.Flat,
None, None, None, 0.05 # Weight
)
)
insights.append(
Insight.Price(
str(k[1]),
insightExpiry,
InsightDirection.Flat,
None, None, None, 0.05 # Weight
),
)
# algorithm.Debug(f'[{str(k[0])}-{str(k[1])}]: Short->Close ({len(insights)})')
del self.invested_pairs[k]
del self.historical_data[k[0]]
del self.historical_data[k[1]]
# algorithm.Debug(f'{str(k[0])}/{str(k[1])}({v.distance}): {v.current_spread} - {self.threshold * v.hist_dev}')
# Reset the changes
self._changes = None
# algorithm.Debug(f'Sent out {len(insights)} on {algorithm.Time}')
return insights
def OnSecuritiesChanged(self, algorithm, changes):
self._changes = changes
def _normalize_data(self, series):
# Here we only need data from [1:] days, and [0] is only needed when we calculate the spread
arr = np.array([x for x in series.GetEnumerator()])
max = arr[1:].max()
min = arr[1:].min()
return (arr - min) / (max - min)
from AlgorithmImports import *
import itertools as it
import numpy as np
from enum import Enum
# * We don't do Long->Short and Short->Long
# * For these actions, we will close the existing positions and will open the new positions in the next day
class PairStatus(Enum):
'''Defines the state. This is used to prevent signal spamming and aid in bounce detection.'''
Short = -1 # Status to keep shorting
Flat = 0 # Status to close position
Long = 1 # Status to keep Long
class DistanceApproachPair:
def __init__(self,
symbol_a, symbol_b,
price_a, price_b,
threshold=1.5
):
self.threshold = threshold
self.symbol_a = symbol_a
self.symbol_b = symbol_b
self.price_a = np.array(price_a)
self.price_b = np.array(price_b)
self.distance = self._distance()
self.hist_dev = self._dev()
self.current_spread = self._spread()
def _distance(self):
# calculate the sum of squared deviations between two normalized price series
return np.square(self.price_a[1:] - self.price_b[1:]).sum()
def _dev(self):
spread = self.price_a[1:] - self.price_b[1:]
mean = np.mean(spread)
dev = np.sqrt(np.sum(np.square(spread - mean)) / (len(spread) - 1))
return dev
def _spread(self):
spread = self.price_a[0] - self.price_b[0]
return spread
class BasicDistanceApproachAlphaModel(AlphaModel):
def __init__(
self,
capacity = 10,
resolution = Resolution.Daily
):
self.resolution = resolution
self.symbol_pairs = []
self.formation_period = 22 * 12
self.threshold = 1.5
self.historical_data = dict()
self.invested_pairs = dict()
self._changes = None
self.capacity = capacity
# resolutionString = Extensions.GetEnumString(resolution, Resolution)
# self.Name = '{}({},{},{})'.format(self.__class__.__name__, fastPeriod, slowPeriod, resolutionString)
def _create_pairs(self, symbols):
# Creating the pairs
return list(it.combinations(symbols, 2))
def Update(self, algorithm, data):
# * Example of launching the update on the exact date time
# if algorithm.Time.hour == 9 and algorithm.Time.minute == 31:
# return [Insight.Price("SPY", timedelta(minutes = 20), InsightDirection.Up, None, None, None, 0.5)]
# return []
algorithm.Debug(f'Time is {algorithm.Time}')
insights = []
if self._changes is not None:
for added in self._changes.AddedSecurities:
# * Dealing with new added securities, need to
# * 1. Create the RollingWindow
# * 2. Add the data into the RollingWindow (Up to today)
if added.Symbol not in self.historical_data.keys():
hist = algorithm.History(
[added.Symbol],
self.formation_period,
Resolution.Daily
)
if not hist.empty:
self.historical_data[added.Symbol] = RollingWindow[float](self.formation_period)
for bar in hist.loc[added.Symbol, :].itertuples():
self.historical_data[added.Symbol].Add(bar.close)
if not self.historical_data[added.Symbol].IsReady:
del self.historical_data[added.Symbol]
# algorithm.Debug(f'Historical data of {added.Symbol} at {algorithm.Time} - {hist["close"][-1]}: {self.historical_data[added.Symbol][0]}')
else:
algorithm.Debug('ERROR: Hist is empty')
for removed in self._changes.RemovedSecurities:
# * Dealing with securities removed, need to
# * 1. Check if the removed securities are in the invested list
# * 2. If yes, keep add the latest data into the RollingWindow
# * 3. If not, then we remove this RollingWindow
invested_symbol_list = [s.Key for s in algorithm.Portfolio if s.Value.Invested]
if removed.Symbol not in invested_symbol_list and removed.Symbol in self.historical_data.keys():
# algorithm.Debug(f'Remove {removed.Symbol} from historical_data')
del self.historical_data[removed.Symbol]
for s in self.historical_data.keys():
# * Dealing with already existed securities, need to
# * 1. Add the latest data into the RollingWindow
# ! Don't use the 'data' as sometimes we have securities not in the universe need to be processed.
if self._changes is not None and s in [sec.Symbol for sec in self._changes.AddedSecurities]:
continue
hist = algorithm.History(
[s],
5,
Resolution.Daily
)
# Somehow there are 'close' missing in the returned history.
# Therefore adding this logic to prevent this scenario crash
if 'close' in hist.columns:
self.historical_data[s].Add(hist['close'][-1])
else:
self.historical_data[s].Add(self.historical_data[s][0])
# algorithm.Debug(f'{str(s)} - Current price {hist["close"][-1]}')
# algorithm.Debug(self.historical_data[s][0])
# algorithm.Debug(self.historical_data[s][1])
# algorithm.Debug(self.historical_data[s][2])
self.symbol_pairs = self._create_pairs(self.historical_data.keys())
# algorithm.Debug(f'Length of pairs: {len(self.symbol_pairs)}')
# Calculate distance and then sort by distance
distances = {}
for i in self.symbol_pairs:
distances[i] = DistanceApproachPair(
i[0],
i[1],
self._normalize_data(self.historical_data[i[0]]),
self._normalize_data(self.historical_data[i[1]])
)
# algorithm.Debug(f'Spread of {str(i[0])} - {str(i[1])}: {distances[i].current_spread}')
sorted_pairs = {k:v for k,v in sorted(
distances.items(),
key = lambda x: (x[1].distance)
)}
# Updating the pair and Create insights
insightExpiry = Expiry.EndOfDay(algorithm.Time)
for k, v in sorted_pairs.items():
invested_stock_symbols = {paired_key[0] for paired_key in self.invested_pairs.keys()} | {paired_key[1] for paired_key in self.invested_pairs.keys()}
# Taking care of the delisting scenario
if data.Delistings.ContainsKey(str(k[0])) or data.Delistings.ContainsKey(str(k[1])):
self.invested_pairs[k] = PairStatus.Flat
insights.append(
Insight.Price(
str(k[0]),
insightExpiry,
InsightDirection.Flat,
None, None, None, 0.05 # Weight
)
)
insights.append(
Insight.Price(
str(k[1]),
insightExpiry,
InsightDirection.Flat,
None, None, None, 0.05 # Weight
),
)
# algorithm.Debug(f'[{str(k[0])}-{str(k[1])}]: Long->Close ({len(insights)})')
del self.invested_pairs[k]
del self.historical_data[k[0]]
del self.historical_data[k[1]]
continue
if k[0] not in invested_stock_symbols and k[1] not in invested_stock_symbols:
if len(self.invested_pairs) >= self.capacity:
# algorithm.Debug(f'Invested_pairs length = {len(self.invested_pairs)}')
continue
if v.current_spread >= self.threshold * v.hist_dev:
self.invested_pairs[k] = PairStatus.Short
insights.append(
Insight.Price(
str(k[0]),
# timedelta(days=1),
insightExpiry,
InsightDirection.Down,
None, None, None, 0.05 # Weight
)
)
insights.append(
Insight.Price(
str(k[1]),
insightExpiry,
InsightDirection.Up,
None, None, None, 0.05 # Weight
)
)
# algorithm.Debug(f'[{str(k[0])}-{str(k[1])}]: 0->Short ({len(insights)})')
elif v.current_spread <= -(self.threshold * v.hist_dev):
self.invested_pairs[k] = PairStatus.Long
insights.append(
Insight.Price(
str(k[0]),
insightExpiry,
InsightDirection.Up,
None, None, None, 0.05 # Weight
)
)
insights.append(
Insight.Price(
str(k[1]),
insightExpiry,
InsightDirection.Down,
None, None, None, 0.05 # Weight
)
)
# algorithm.Debug(f'[{str(k[0])}-{str(k[1])}]: 0->Long ({len(insights)})')
else:
pass
else:
if k not in self.invested_pairs.keys():
continue
if self.invested_pairs[k] == PairStatus.Long:
if v.current_spread < 0:
# self.invested_pairs[k] = PairStatus.Long
# insights.append(
# Insight.Price(
# str(k[0]),
# insightExpiry,
# InsightDirection.Up,
# None, None, None, 0.05 # Weight
# )
# )
# insights.append(
# Insight.Price(
# str(k[1]),
# insightExpiry,
# InsightDirection.Down,
# None, None, None, 0.05 # Weight
# ),
# )
# algorithm.Debug(f'[{str(k[0])}-{str(k[1])}]: Long->Long ({len(insights)})')
pass
# elif v.current_spread >= self.threshold * v.hist_dev:
# self.invested_pairs[k] = PairStatus.Short
# insights.append(
# Insight.Price(
# str(k[0]),
# insightExpiry,
# InsightDirection.Down,
# None, None, None, 0.05 # Weight
# )
# )
# insights.append(
# Insight.Price(
# str(k[1]),
# insightExpiry,
# InsightDirection.Up,
# None, None, None, 0.05 # Weight
# ),
# )
# # algorithm.Debug(f'[{str(k[0])}-{str(k[1])}]: Long->Short ({len(insights)})')
else:
self.invested_pairs[k] = PairStatus.Flat
insights.append(
Insight.Price(
str(k[0]),
insightExpiry,
InsightDirection.Flat,
None, None, None, 0.05 # Weight
)
)
insights.append(
Insight.Price(
str(k[1]),
insightExpiry,
InsightDirection.Flat,
None, None, None, 0.05 # Weight
),
)
# algorithm.Debug(f'[{str(k[0])}-{str(k[1])}]: Long->Close ({len(insights)})')
del self.invested_pairs[k]
del self.historical_data[k[0]]
del self.historical_data[k[1]]
elif self.invested_pairs[k] == PairStatus.Short:
if v.current_spread > 0:
# self.invested_pairs[k] = PairStatus.Short
# insights.append(
# Insight.Price(
# str(k[0]),
# insightExpiry,
# InsightDirection.Down,
# None, None, None, 0.05 # Weight
# )
# )
# insights.append(
# Insight.Price(
# str(k[1]),
# insightExpiry,
# InsightDirection.Up,
# None, None, None, 0.05 # Weight
# ),
# )
# algorithm.Debug(f'[{str(k[0])}-{str(k[1])}]: Short->Short ({len(insights)})')
pass
# elif v.current_spread <= -(self.threshold * v.hist_dev):
# self.invested_pairs[k] = PairStatus.Long
# insights.append(
# Insight.Price(
# str(k[0]),
# insightExpiry,
# InsightDirection.Up,
# None, None, None, 0.05 # Weight
# )
# )
# insights.append(
# Insight.Price(
# str(k[1]),
# insightExpiry,
# InsightDirection.Down,
# None, None, None, 0.05 # Weight
# ),
# )
# # algorithm.Debug(f'[{str(k[0])}-{str(k[1])}]: Short->Long ({len(insights)})')
else:
self.invested_pairs[k] = PairStatus.Flat
insights.append(
Insight.Price(
str(k[0]),
insightExpiry,
InsightDirection.Flat,
None, None, None, 0.05 # Weight
)
)
insights.append(
Insight.Price(
str(k[1]),
insightExpiry,
InsightDirection.Flat,
None, None, None, 0.05 # Weight
),
)
# algorithm.Debug(f'[{str(k[0])}-{str(k[1])}]: Short->Close ({len(insights)})')
del self.invested_pairs[k]
del self.historical_data[k[0]]
del self.historical_data[k[1]]
# algorithm.Debug(f'{str(k[0])}/{str(k[1])}({v.distance}): {v.current_spread} - {self.threshold * v.hist_dev}')
# Reset the changes
self._changes = None
# algorithm.Debug(f'Sent out {len(insights)} on {algorithm.Time}')
return insights
def OnSecuritiesChanged(self, algorithm, changes):
self._changes = changes
def _normalize_data(self, series):
# Here we only need data from [1:] days, and [0] is only needed when we calculate the spread
arr = np.array([x for x in series.GetEnumerator()])
max = arr[1:].max()
min = arr[1:].min()
return (arr - min) / (max - min)
# from clr import AddReference
# AddReference("QuantConnect.Common")
# AddReference("QuantConnect.Algorithm.Framework")
from QuantConnect import Resolution, Extensions
from QuantConnect.Algorithm.Framework.Alphas import *
from QuantConnect.Algorithm.Framework.Portfolio import *
from itertools import groupby
from datetime import datetime, timedelta
from pytz import utc
UTCMIN = datetime.min.replace(tzinfo=utc)
class CustomEqualWeightingPortfolioConstructionModel1(PortfolioConstructionModel):
'''
Description:
Provide a custom implementation of IPortfolioConstructionModel that gives equal weighting to all active securities
Details:
- The target percent holdings of each security is 1/N where N is the number of securities with active Up/Down insights
- For InsightDirection.Up, long targets are returned
- For InsightDirection.Down, short targets are returned
- For InsightDirection.Flat, closing position targets are returned
'''
def __init__(self, rebalancingParam = False):
'''
Description:
Initialize a new instance of CustomEqualWeightingPortfolioConstructionModel
Args:
rebalancingParam: Integer indicating the number of days for rebalancing (default set to False, no rebalance)
- Independent of this parameter, the portfolio will be rebalanced when a security is added/removed/changed direction
'''
self.insightCollection = InsightCollection()
self.removedSymbols = []
self.nextExpiryTime = UTCMIN
self.rebalancingTime = UTCMIN
# if the rebalancing parameter is not False but a positive integer
# convert rebalancingParam to timedelta and create rebalancingFunc
if rebalancingParam > 0:
self.rebalancing = True
rebalancingParam = timedelta(days = rebalancingParam)
self.rebalancingFunc = lambda dt: dt + rebalancingParam
else:
self.rebalancing = rebalancingParam
def CreateTargets(self, algorithm, insights):
'''
Description:
Create portfolio targets from the specified insights
Args:
algorithm: The algorithm instance
insights: The insights to create portfolio targets from
Returns:
An enumerable of portfolio targets to be sent to the execution model
'''
targets = []
# check if we have new insights coming from the alpha model or if some existing insights have expired
# or if we have removed symbols from the universe
if (len(insights) == 0 and algorithm.UtcTime <= self.nextExpiryTime and self.removedSymbols is None):
return targets
# here we get the new insights and add them to our insight collection
for insight in insights:
self.insightCollection.Add(insight)
# ! Test: to be removed as I don't manage the portfolio by removedSymbols??
# create flatten target for each security that was removed from the universe
# if self.removedSymbols is not None:
# universeDeselectionTargets = [ PortfolioTarget(symbol, 0) for symbol in self.removedSymbols ]
# targets.extend(universeDeselectionTargets)
# self.removedSymbols = None
# get insight that haven't expired of each symbol that is still in the universe
activeInsights = self.insightCollection.GetActiveInsights(algorithm.UtcTime)
# get the last generated active insight for each symbol
lastActiveInsights = []
for symbol, g in groupby(activeInsights, lambda x: x.Symbol):
lastActiveInsights.append(sorted(g, key = lambda x: x.GeneratedTimeUtc)[-1])
# determine target percent for the given insights (check function DetermineTargetPercent for details)
percents = self.DetermineTargetPercent(lastActiveInsights)
errorSymbols = {}
# check if we actually want to create new targets for the securities (check function ShouldCreateTargets for details)
if self.ShouldCreateTargets(algorithm, lastActiveInsights):
for insight in lastActiveInsights:
target = PortfolioTarget.Percent(algorithm, insight.Symbol, percents[insight])
if not target is None:
targets.append(target)
else:
errorSymbols[insight.Symbol] = insight.Symbol
# update rebalancing time
if self.rebalancing:
self.rebalancingTime = self.rebalancingFunc(algorithm.UtcTime)
# ! Test: We don't need to create flatten targets by the expired signals
# get expired insights and create flatten targets for each symbol
# expiredInsights = self.insightCollection.RemoveExpiredInsights(algorithm.UtcTime)
# expiredTargets = []
# for symbol, f in groupby(expiredInsights, lambda x: x.Symbol):
# if not self.insightCollection.HasActiveInsights(symbol, algorithm.UtcTime) and not symbol in errorSymbols:
# expiredTargets.append(PortfolioTarget(symbol, 0))
# continue
# targets.extend(expiredTargets)
# here we update the next expiry date in the insight collection
self.nextExpiryTime = self.insightCollection.GetNextExpiryTime()
if self.nextExpiryTime is None:
self.nextExpiryTime = UTCMIN
return targets
def DetermineTargetPercent(self, lastActiveInsights):
'''
Description:
Determine the target percent from each insight
Args:
lastActiveInsights: The active insights to generate a target from
'''
result = {}
# give equal weighting to each security
count = sum(x.Direction != InsightDirection.Flat for x in lastActiveInsights)
percent = 0 if count == 0 else 1.0 / count
for insight in lastActiveInsights:
result[insight] = insight.Direction * percent
return result
def ShouldCreateTargets(self, algorithm, lastActiveInsights):
'''
Description:
Determine whether we should rebalance the portfolio to keep equal weighting when:
- It is time to rebalance regardless
- We want to include some new security in the portfolio
- We want to modify the direction of some existing security
Args:
lastActiveInsights: The last active insights to check
'''
# it is time to rebalance
# TODO Check out what is the time to rebalance
if self.rebalancing and algorithm.UtcTime >= self.rebalancingTime:
return True
# TODO Adjust the timing when to rebalance
for insight in lastActiveInsights:
# if there is an insight for a new security that's not invested, then rebalance
if not algorithm.Portfolio[insight.Symbol].Invested and insight.Direction != InsightDirection.Flat:
return True
# if there is an insight to close a long position, then rebalance
elif algorithm.Portfolio[insight.Symbol].IsLong and insight.Direction != InsightDirection.Up:
return True
# if there is an insight to close a short position, then rebalance
elif algorithm.Portfolio[insight.Symbol].IsShort and insight.Direction != InsightDirection.Down:
return True
else:
continue
return False
def OnSecuritiesChanged(self, algorithm, changes):
'''
Description:
Event fired each time the we add/remove securities from the data feed
Args:
algorithm: The algorithm instance that experienced the change in securities
changes: The security additions and removals from the algorithm
'''
# get removed symbol and invalidate them in the insight collection
# TODO Do we need to remove this?
self.removedSymbols = [x.Symbol for x in changes.RemovedSecurities]
self.insightCollection.Clear(self.removedSymbols)
from SP500Universe import QC500UniverseSelectionModel
from BasicDistanceApproachAlphaModel import BasicDistanceApproachAlphaModel
from DistanceApproachZeroCrossingAlphaModel import DistanceApproachZeroCrossingAlphaModel
from DistanceApproachHighVarianceAlphaModel import DistanceApproachHighVarianceAlphaModel
from DistanceApproachPearsonCorrelationAlphaModel import DistanceApproachPearsonCorrelationAlphaModel
from DistanceApproachPearsonCorrelationAlphaModel2 import DistanceApproachPearsonCorrelationAlphaModel2
from CustomEqualWeightingPortfolioConstructionModel1 import CustomEqualWeightingPortfolioConstructionModel1
from CustomEqualWeightingPortfolioConstructionModel2 import CustomEqualWeightingPortfolioConstructionModel2, ConstructionMethod
class PairsTradingAlgorithm(QCAlgorithm):
def Initialize(self):
# Official test
self.SetStartDate(2019,1,5)
self.SetEndDate(2021,9,16)
# Debug test
# self.SetStartDate(2021,1,6)
# self.SetEndDate(2021,4,23)
self.SetCash(200000)
self.capacity = 20
# self.spy = self.SetBenchmark(
# self.AddEquity(
# 'SPY', Resolution.Daily
# ).Symbol
# )
# * Not implemented
# SetDataNormalizationMode(???)
self.SetUniverseSelection(QC500UniverseSelectionModel(numOfUniverse=500))
self.UniverseSettings.Resolution = Resolution.Daily
# self.AddAlpha(BasicDistanceApproachAlphaModel(capacity=self.capacity))
# self.AddAlpha(DistanceApproachHighVarianceAlphaModel(capacity=self.capacity))
self.AddAlpha(DistanceApproachZeroCrossingAlphaModel(capacity=self.capacity))
# self.AddAlpha(DistanceApproachPearsonCorrelationAlphaModel(capacity=self.capacity))
# TODO Working on it
# self.AddAlpha(DistanceApproachPearsonCorrelationAlphaModel2(capacity=self.capacity))
# self.SetPortfolioConstruction(EqualWeightingPortfolioConstructionModel())
# self.SetPortfolioConstruction(CustomEqualWeightingPortfolioConstructionModel1())
self.SetPortfolioConstruction(
CustomEqualWeightingPortfolioConstructionModel2
(
capacity=self.capacity,
method=ConstructionMethod.MARKET_NEUTRAL
# method=ConstructionMethod.LONG_ONLY
)
)
# * Not implemented
# self.SetExecution(ImmediateExecutionModel())
def OnData(self, data):
# Update the price series everyday
# self.Log(f'Today onData() is {self.Time}')
# self.Log(data.keys()[300].Value)
pass
###################################################################
# Log the end of day prices and plot the diagram
def OnEndOfDay(self, symbol):
self.Plot('AvailCash', 'Cash', self.Portfolio.Cash)
self.Plot('AvailCash', 'Portfolio', self.Portfolio.TotalPortfolioValue)
self.Plot('HeldPositions', "Positions", len([x.Key for x in self.Portfolio if x.Value.Invested]))
from AlgorithmImports import *
import itertools as it
import numpy as np
import pandas as pd
from enum import Enum
from sklearn import linear_model
class PairStatus(Enum):
'''Defines the state. This is used to prevent signal spamming and aid in bounce detection.'''
Short = -1 # Status to keep shorting
Flat = 0 # Status to close position
Long = 1 # Status to keep Long
class DistanceApproachPearsonCorrelationPair:
threshold = None
def __init__(
self,
symbol_a:str, symbol_b:str,
price_a, price_b,
threshold=1.5
):
self.symbol_a = symbol_a
self.symbol_b = symbol_b
self.price_a = np.array(price_a)
self.price_b = np.array(price_b)
self.threshold = threshold
self.distance = self._pearson_coef()
def _pearson_coef(self):
# To get the exact correlation from the corr matrix
return_a = np.diff(np.log(self.price_a), axis=0).flatten()
return_b = np.diff(np.log(self.price_b), axis=0).flatten()
coef = np.corrcoef(return_a, return_b)[0][1]
return coef
class DistanceApproachPearsonCorrelationAlphaModel2(AlphaModel):
def __init__(
self,
capacity = 20,
resolution = Resolution.Daily
):
self.resolution = resolution
self.symbol_pairs = []
self.formation_period = 22 * 12
self.threshold = 1.5
self.historical_data = dict()
self.invested_long_list = list()
self.invested_short_list = list()
self._changes = None
self.capacity = capacity
self.lastChange = None
# resolutionString = Extensions.GetEnumString(resolution, Resolution)
# self.Name = '{}({},{},{})'.format(self.__class__.__name__, fastPeriod, slowPeriod, resolutionString)
def _create_pairs(self, symbols):
# Creating the pairs
return list(it.combinations(symbols, 2))
def Update(self, algorithm, data):
algorithm.Debug(f'Time is {algorithm.Time}')
insights = []
if self.lastChange == algorithm.Time.month:
return insights
self.lastChange = algorithm.Time.month
if self._changes is not None:
for added in self._changes.AddedSecurities:
# * Dealing with new added securities, need to
# * 1. Create the RollingWindow
# * 2. Add the data into the RollingWindow (Up to today)
if added.Symbol not in self.historical_data.keys():
hist = algorithm.History(
[added.Symbol],
self.formation_period,
Resolution.Daily
)
if not hist.empty:
self.historical_data[added.Symbol] = RollingWindow[float](self.formation_period)
for bar in hist.loc[added.Symbol, :].itertuples():
self.historical_data[added.Symbol].Add(bar.close)
if not self.historical_data[added.Symbol].IsReady:
del self.historical_data[added.Symbol]
# algorithm.Debug(f'Historical data of {added.Symbol} at {algorithm.Time} - {hist["close"][-1]}: {self.historical_data[added.Symbol][0]}')
else:
algorithm.Debug('ERROR: Hist is empty')
for removed in self._changes.RemovedSecurities:
# * Dealing with securities removed, need to
# * 1. Check if the removed securities are in the invested list
# * 2. If yes, keep add the latest data into the RollingWindow
# * 3. If not, then we remove this RollingWindow
invested_symbol_list = [s.Key for s in algorithm.Portfolio if s.Value.Invested]
if removed.Symbol not in invested_symbol_list and removed.Symbol in self.historical_data.keys():
# algorithm.Debug(f'Remove {removed.Symbol} from historical_data')
del self.historical_data[removed.Symbol]
for s in self.historical_data.keys():
# * Dealing with already existed securities, need to
# * 1. Add the latest data into the RollingWindow
# ! Don't use the 'data' as sometimes we have securities not in the universe need to be processed.
if self._changes is not None and s in [sec.Symbol for sec in self._changes.AddedSecurities]:
continue
hist = algorithm.History(
[s],
5,
Resolution.Daily
)
# Somehow there are 'close' missing in the returned history.
# Therefore adding this logic to prevent this scenario crash
if 'close' in hist.columns:
self.historical_data[s].Add(hist['close'][-1])
else:
self.historical_data[s].Add(self.historical_data[s][0])
# algorithm.Debug(f'{str(s)} - Current price {hist["close"][-1]}')
# algorithm.Debug(self.historical_data[s][0])
# algorithm.Debug(self.historical_data[s][1])
# algorithm.Debug(self.historical_data[s][2])
self.symbol_pairs = self._create_pairs(self.historical_data.keys())
# algorithm.Debug(f'Length of pairs: {len(self.symbol_pairs)}')
# Calculate distance and then sort by distance
distances = {}
for i in self.symbol_pairs:
distances[i] = DistanceApproachPearsonCorrelationPair(
i[0],
i[1],
self._normalize_data(self.historical_data[i[0]]),
self._normalize_data(self.historical_data[i[1]])
)
# algorithm.Debug(f'Spread of {str(i[0])} - {str(i[1])}: {distances[i].current_spread}')
sorted_pairs = {k:v for k,v in sorted(
distances.items(),
key = lambda x: (x[1].distance)
)}
# For each security, we add the highest 50 correlated securities to the list
pearson_pair_table = dict()
for _, pair in sorted_pairs.items():
if pair.symbol_a not in pearson_pair_table.keys():
pearson_pair_table[pair.symbol_a] = [pair.symbol_b]
else:
if len(pearson_pair_table[pair.symbol_a]) < 50:
pearson_pair_table[pair.symbol_a].append(pair.symbol_b)
if pair.symbol_b not in pearson_pair_table.keys():
pearson_pair_table[pair.symbol_b] = [pair.symbol_a]
else:
if len(pearson_pair_table[pair.symbol_b]) < 50:
pearson_pair_table[pair.symbol_b].append(pair.symbol_a)
self.price_dataframe = pd.concat(
[self._dataframelize(self.historical_data[symbol], symbol) for symbol in self.historical_data.keys()],
axis=1
)
divergence = dict()
divergence_MA22 = dict()
for stock, benchmark_portfolio in pearson_pair_table.items():
p = self.price_dataframe[stock]
stock_rtn = np.log(p.pct_change()+1)
# Equal weight to construct this benchmark portfolio return
benchmark_rtn = (self.price_dataframe[benchmark_portfolio].apply(lambda x: np.log(x.pct_change()+1), axis=0).sum(axis=1)/len(benchmark_portfolio))
regr = linear_model.LinearRegression()
# a stock’s return, Lret, deviates from its pairs portfolioreturns, Cret.
# RetDiff = beta ∗ (Cret-Rf) − (Lret−Rf)
cret = np.array(benchmark_rtn).copy()
lret = np.array(stock_rtn).reshape((-1,1)).copy()
# Remove last cell which is na
cret = cret[1:]
lret = lret[1:]
cret_constant = np.array([list(x) for x in zip(cret, [1]*len(cret))])
regr.fit(cret_constant, lret)
beta = regr.coef_[0][0]
divergence[stock] = beta * benchmark_rtn.iloc[-1] - stock_rtn.iloc[-1]
divergence_MA22[stock] = (beta * benchmark_rtn - stock_rtn).ewm(span=22, adjust=False).mean().iloc[-1]
# Cleaning the pair and Create insights
divergence = {k:v for k, v in sorted(
divergence_MA22.items(),
# divergence.items(),
key = lambda item: item[1]
)}
# Start submitting the singal
insightExpiry = Expiry.EndOfDay(algorithm.Time)
new_long_list = list(divergence.keys())[:self.capacity]
new_short_list = list(divergence.keys())[-self.capacity:]
flat_list = [s for s in self.invested_long_list if s not in new_long_list] + [s for s in self.invested_short_list if s not in new_short_list]
self.invested_long_list = new_long_list
self.invested_short_list = new_short_list
for symbol in flat_list:
insights.append(
Insight.Price(
str(symbol),
insightExpiry,
InsightDirection.Flat,
None, None, None, 0.05
)
)
for symbol in self.invested_short_list:
insights.append(
Insight.Price(
str(symbol),
insightExpiry,
InsightDirection.Down,
None, None, None, 0.05
)
)
for symbol in self.invested_long_list:
insights.append(
Insight.Price(
str(symbol),
insightExpiry,
InsightDirection.Up,
None, None, None, 0.05
)
)
# Reset the changes
self._changes = None
# algorithm.Debug(f'Sent out {len(insights)} on {algorithm.Time}')
return insights
def OnSecuritiesChanged(self, algorithm, changes):
self._changes = changes
def _normalize_data(self, series):
# Here we only need data from [1:] days, and [0] is only needed when we calculate the spread
arr = np.array([x for x in series.GetEnumerator()])
max = arr[1:].max()
min = arr[1:].min()
return (arr - min) / (max - min)
def _dataframelize(self, price_rolling_window, symbol):
l = [n for n in price_rolling_window]
l.reverse()
df = pd.DataFrame(l, columns=[symbol])
return df
# from clr import AddReference
# AddReference("QuantConnect.Common")
# AddReference("QuantConnect.Algorithm.Framework")
from QuantConnect import Resolution, Extensions
from QuantConnect.Algorithm.Framework.Alphas import *
from QuantConnect.Algorithm.Framework.Portfolio import *
from itertools import groupby
from datetime import datetime, timedelta
from pytz import utc
UTCMIN = datetime.min.replace(tzinfo=utc)
class CustomEqualWeightingPortfolioConstructionModel(PortfolioConstructionModel):
'''
Description:
Provide a custom implementation of IPortfolioConstructionModel that gives equal weighting to all active securities
Details:
- The target percent holdings of each security is 1/N where N is the number of securities with active Up/Down insights
- For InsightDirection.Up, long targets are returned
- For InsightDirection.Down, short targets are returned
- For InsightDirection.Flat, closing position targets are returned
'''
def __init__(self, rebalancingParam = False):
'''
Description:
Initialize a new instance of CustomEqualWeightingPortfolioConstructionModel
Args:
rebalancingParam: Integer indicating the number of days for rebalancing (default set to False, no rebalance)
- Independent of this parameter, the portfolio will be rebalanced when a security is added/removed/changed direction
'''
self.insightCollection = InsightCollection()
self.removedSymbols = []
self.nextExpiryTime = UTCMIN
self.rebalancingTime = UTCMIN
# if the rebalancing parameter is not False but a positive integer
# convert rebalancingParam to timedelta and create rebalancingFunc
if rebalancingParam > 0:
self.rebalancing = True
rebalancingParam = timedelta(days = rebalancingParam)
self.rebalancingFunc = lambda dt: dt + rebalancingParam
else:
self.rebalancing = rebalancingParam
def CreateTargets(self, algorithm, insights):
'''
Description:
Create portfolio targets from the specified insights
Args:
algorithm: The algorithm instance
insights: The insights to create portfolio targets from
Returns:
An enumerable of portfolio targets to be sent to the execution model
'''
targets = []
# check if we have new insights coming from the alpha model or if some existing insights have expired
# or if we have removed symbols from the universe
if (len(insights) == 0 and algorithm.UtcTime <= self.nextExpiryTime and self.removedSymbols is None):
return targets
# here we get the new insights and add them to our insight collection
for insight in insights:
self.insightCollection.Add(insight)
# create flatten target for each security that was removed from the universe
if self.removedSymbols is not None:
universeDeselectionTargets = [ PortfolioTarget(symbol, 0) for symbol in self.removedSymbols ]
targets.extend(universeDeselectionTargets)
self.removedSymbols = None
# get insight that haven't expired of each symbol that is still in the universe
activeInsights = self.insightCollection.GetActiveInsights(algorithm.UtcTime)
# get the last generated active insight for each symbol
lastActiveInsights = []
for symbol, g in groupby(activeInsights, lambda x: x.Symbol):
lastActiveInsights.append(sorted(g, key = lambda x: x.GeneratedTimeUtc)[-1])
# determine target percent for the given insights (check function DetermineTargetPercent for details)
percents = self.DetermineTargetPercent(lastActiveInsights)
errorSymbols = {}
# check if we actually want to create new targets for the securities (check function ShouldCreateTargets for details)
if self.ShouldCreateTargets(algorithm, lastActiveInsights):
for insight in lastActiveInsights:
target = PortfolioTarget.Percent(algorithm, insight.Symbol, percents[insight])
if not target is None:
targets.append(target)
else:
errorSymbols[insight.Symbol] = insight.Symbol
# update rebalancing time
if self.rebalancing:
self.rebalancingTime = self.rebalancingFunc(algorithm.UtcTime)
# get expired insights and create flatten targets for each symbol
expiredInsights = self.insightCollection.RemoveExpiredInsights(algorithm.UtcTime)
expiredTargets = []
for symbol, f in groupby(expiredInsights, lambda x: x.Symbol):
if not self.insightCollection.HasActiveInsights(symbol, algorithm.UtcTime) and not symbol in errorSymbols:
expiredTargets.append(PortfolioTarget(symbol, 0))
continue
targets.extend(expiredTargets)
# here we update the next expiry date in the insight collection
self.nextExpiryTime = self.insightCollection.GetNextExpiryTime()
if self.nextExpiryTime is None:
self.nextExpiryTime = UTCMIN
return targets
def DetermineTargetPercent(self, lastActiveInsights):
'''
Description:
Determine the target percent from each insight
Args:
lastActiveInsights: The active insights to generate a target from
'''
result = {}
# give equal weighting to each security
count = sum(x.Direction != InsightDirection.Flat for x in lastActiveInsights)
percent = 0 if count == 0 else 1.0 / count
for insight in lastActiveInsights:
result[insight] = insight.Direction * percent
return result
def ShouldCreateTargets(self, algorithm, lastActiveInsights):
'''
Description:
Determine whether we should rebalance the portfolio to keep equal weighting when:
- It is time to rebalance regardless
- We want to include some new security in the portfolio
- We want to modify the direction of some existing security
Args:
lastActiveInsights: The last active insights to check
'''
# it is time to rebalance
if self.rebalancing and algorithm.UtcTime >= self.rebalancingTime:
return True
for insight in lastActiveInsights:
# if there is an insight for a new security that's not invested, then rebalance
if not algorithm.Portfolio[insight.Symbol].Invested and insight.Direction != InsightDirection.Flat:
return True
# if there is an insight to close a long position, then rebalance
elif algorithm.Portfolio[insight.Symbol].IsLong and insight.Direction != InsightDirection.Up:
return True
# if there is an insight to close a short position, then rebalance
elif algorithm.Portfolio[insight.Symbol].IsShort and insight.Direction != InsightDirection.Down:
return True
else:
continue
return False
def OnSecuritiesChanged(self, algorithm, changes):
'''
Description:
Event fired each time the we add/remove securities from the data feed
Args:
algorithm: The algorithm instance that experienced the change in securities
changes: The security additions and removals from the algorithm
'''
# get removed symbol and invalidate them in the insight collection
self.removedSymbols = [x.Symbol for x in changes.RemovedSecurities]
self.insightCollection.Clear(self.removedSymbols)
from AlgorithmImports import *
import itertools as it
import numpy as np
from enum import Enum
# * 1. Variants of how to rank the distance + high variance pair
# * 1.1. Should we use IR factor score
# * 1.2. or use linear regression?
class PairStatus(Enum):
'''Defines the state. This is used to prevent signal spamming and aid in bounce detection.'''
Short = -1 # Status to keep shorting
Flat = 0 # Status to close position
Long = 1 # Status to keep Long
class DistanceApproachZeroCrossingPair:
threshold = None
def __init__(
self,
symbol_a:str, symbol_b:str,
price_a, price_b,
threshold=1.5
):
self.symbol_a = symbol_a
self.symbol_b = symbol_b
self.price_a = np.array(price_a)
self.price_b = np.array(price_b)
self.threshold = threshold
self.distance = self._distance()
self.num_of_crossing = self._zeroCrossing()
self.hist_dev = self._dev()
self.current_spread = self._spread()
def _distance(self):
# calculate the sum of squared deviations between two normalized price series
return np.square(self.price_a - self.price_b).sum()
def _zeroCrossing(self):
def rolling_window(a, window):
shape = a.shape[:-1] + (a.shape[-1] - window + 1, window)
strides = a.strides + (a.strides[-1],)
return np.lib.stride_tricks.as_strided(a, shape=shape, strides=strides)
spread = self.price_a - self.price_b
num_of_zero_crossing = 0
for x in rolling_window(spread, 2):
if (x[0] * x[1]) < 0:
num_of_zero_crossing += 1
return num_of_zero_crossing
def _dev(self):
spread = self.price_a - self.price_b
mean = np.mean(spread)
dev = np.sqrt(np.sum(np.square(spread - mean)) / (len(spread) - 1))
return dev
def _spread(self):
spread = self.price_a[0] - self.price_b[0]
return spread
class DistanceApproachZeroCrossingAlphaModel(AlphaModel):
def __init__(
self,
capacity = 20,
resolution = Resolution.Daily
):
self.resolution = resolution
self.symbol_pairs = []
self.formation_period = 22 * 12
self.threshold = 1.5
self.max_pair_percentage = 0.3
self.historical_data = dict()
self.invested_pairs = dict()
self._changes = None
self.capacity = capacity
# resolutionString = Extensions.GetEnumString(resolution, Resolution)
# self.Name = '{}({},{},{})'.format(self.__class__.__name__, fastPeriod, slowPeriod, resolutionString)
def _create_pairs(self, symbols):
# Creating the pairs
return list(it.combinations(symbols, 2))
def Update(self, algorithm, data):
# * Example of launching the update on the exact date time
# if algorithm.Time.hour == 9 and algorithm.Time.minute == 31:
# return [Insight.Price("SPY", timedelta(minutes = 20), InsightDirection.Up, None, None, None, 0.5)]
# return []
algorithm.Debug(f'Time is {algorithm.Time}')
insights = []
if self._changes is not None:
for added in self._changes.AddedSecurities:
# * Dealing with new added securities, need to
# * 1. Create the RollingWindow
# * 2. Add the data into the RollingWindow (Up to today)
if added.Symbol not in self.historical_data.keys():
hist = algorithm.History(
[added.Symbol],
self.formation_period,
Resolution.Daily
)
if not hist.empty:
self.historical_data[added.Symbol] = RollingWindow[float](self.formation_period)
for bar in hist.loc[added.Symbol, :].itertuples():
self.historical_data[added.Symbol].Add(bar.close)
if not self.historical_data[added.Symbol].IsReady:
del self.historical_data[added.Symbol]
# algorithm.Debug(f'Historical data of {added.Symbol} at {algorithm.Time} - {hist["close"][-1]}: {self.historical_data[added.Symbol][0]}')
else:
algorithm.Debug('ERROR: Hist is empty')
for removed in self._changes.RemovedSecurities:
# * Dealing with securities removed, need to
# * 1. Check if the removed securities are in the invested list
# * 2. If yes, keep add the latest data into the RollingWindow
# * 3. If not, then we remove this RollingWindow
invested_symbol_list = [s.Key for s in algorithm.Portfolio if s.Value.Invested]
if removed.Symbol not in invested_symbol_list and removed.Symbol in self.historical_data.keys():
# algorithm.Debug(f'Remove {removed.Symbol} from historical_data')
del self.historical_data[removed.Symbol]
for s in self.historical_data.keys():
# * Dealing with already existed securities, need to
# * 1. Add the latest data into the RollingWindow
# ! Don't use the 'data' as sometimes we have securities not in the universe need to be processed.
if self._changes is not None and s in [sec.Symbol for sec in self._changes.AddedSecurities]:
continue
hist = algorithm.History(
[s],
5,
Resolution.Daily
)
# Somehow there are 'close' missing in the returned history.
# Therefore adding this logic to prevent this scenario crash
if 'close' in hist.columns:
self.historical_data[s].Add(hist['close'][-1])
else:
self.historical_data[s].Add(self.historical_data[s][0])
# algorithm.Debug(f'{str(s)} - Current price {hist["close"][-1]}')
# algorithm.Debug(self.historical_data[s][0])
# algorithm.Debug(self.historical_data[s][1])
# algorithm.Debug(self.historical_data[s][2])
self.symbol_pairs = self._create_pairs(self.historical_data.keys())
# algorithm.Debug(f'Length of pairs: {len(self.symbol_pairs)}')
# Calculate distance and then sort by distance
distances = {}
for i in self.symbol_pairs:
distances[i] = DistanceApproachZeroCrossingPair(
i[0],
i[1],
self._normalize_data(self.historical_data[i[0]]),
self._normalize_data(self.historical_data[i[1]])
)
# algorithm.Debug(f'Spread of {str(i[0])} - {str(i[1])}: {distances[i].current_spread}')
sorted_pairs = {k:v for k, v in sorted(
distances.items(),
key = lambda x: (x[1].distance)
)[:int(len(distances) * self.max_pair_percentage)]}
sorted_pairs = {k:v for k, v in sorted(
sorted_pairs.items(),
key = lambda x: x[1].num_of_crossing,
reverse = True
)}
# Cleaning the pair and Create insights
insightExpiry = Expiry.EndOfDay(algorithm.Time)
for k, v in sorted_pairs.items():
invested_stock_symbols = {paired_key[0] for paired_key in self.invested_pairs.keys()} | {paired_key[1] for paired_key in self.invested_pairs.keys()}
# Taking care of the delisting scenario
if data.Delistings.ContainsKey(str(k[0])) or data.Delistings.ContainsKey(str(k[1])):
self.invested_pairs[k] = PairStatus.Flat
insights.append(
Insight.Price(
str(k[0]),
insightExpiry,
InsightDirection.Flat,
None, None, None, 0.05 # Weight
)
)
insights.append(
Insight.Price(
str(k[1]),
insightExpiry,
InsightDirection.Flat,
None, None, None, 0.05 # Weight
),
)
# algorithm.Debug(f'[{str(k[0])}-{str(k[1])}]: Long->Close ({len(insights)})')
del self.invested_pairs[k]
del self.historical_data[k[0]]
del self.historical_data[k[1]]
continue
if k[0] not in invested_stock_symbols and k[1] not in invested_stock_symbols:
if len(self.invested_pairs) >= self.capacity:
# algorithm.Debug(f'Invested_pairs length = {len(self.invested_pairs)}')
continue
if v.current_spread >= self.threshold * v.hist_dev:
self.invested_pairs[k] = PairStatus.Short
insights.append(
Insight.Price(
str(k[0]),
# timedelta(days=1),
insightExpiry,
InsightDirection.Down,
None, None, None, 0.05 # Weight
)
)
insights.append(
Insight.Price(
str(k[1]),
insightExpiry,
InsightDirection.Up,
None, None, None, 0.05 # Weight
)
)
# algorithm.Debug(f'[{str(k[0])}-{str(k[1])}]: 0->Short ({len(insights)})')
elif v.current_spread <= -(self.threshold * v.hist_dev):
self.invested_pairs[k] = PairStatus.Long
insights.append(
Insight.Price(
str(k[0]),
insightExpiry,
InsightDirection.Up,
None, None, None, 0.05 # Weight
)
)
insights.append(
Insight.Price(
str(k[1]),
insightExpiry,
InsightDirection.Down,
None, None, None, 0.05 # Weight
)
)
# algorithm.Debug(f'[{str(k[0])}-{str(k[1])}]: 0->Long ({len(insights)})')
else:
pass
else:
if k not in self.invested_pairs.keys():
continue
if self.invested_pairs[k] == PairStatus.Long:
if v.current_spread < 0:
# self.invested_pairs[k] = PairStatus.Long
# insights.append(
# Insight.Price(
# str(k[0]),
# insightExpiry,
# InsightDirection.Up,
# None, None, None, 0.05 # Weight
# )
# )
# insights.append(
# Insight.Price(
# str(k[1]),
# insightExpiry,
# InsightDirection.Down,
# None, None, None, 0.05 # Weight
# ),
# )
# algorithm.Debug(f'[{str(k[0])}-{str(k[1])}]: Long->Long ({len(insights)})')
pass
# elif v.current_spread >= self.threshold * v.hist_dev:
# self.invested_pairs[k] = PairStatus.Short
# insights.append(
# Insight.Price(
# str(k[0]),
# insightExpiry,
# InsightDirection.Down,
# None, None, None, 0.05 # Weight
# )
# )
# insights.append(
# Insight.Price(
# str(k[1]),
# insightExpiry,
# InsightDirection.Up,
# None, None, None, 0.05 # Weight
# ),
# )
# # algorithm.Debug(f'[{str(k[0])}-{str(k[1])}]: Long->Short ({len(insights)})')
else:
self.invested_pairs[k] = PairStatus.Flat
insights.append(
Insight.Price(
str(k[0]),
insightExpiry,
InsightDirection.Flat,
None, None, None, 0.05 # Weight
)
)
insights.append(
Insight.Price(
str(k[1]),
insightExpiry,
InsightDirection.Flat,
None, None, None, 0.05 # Weight
),
)
# algorithm.Debug(f'[{str(k[0])}-{str(k[1])}]: Long->Close ({len(insights)})')
del self.invested_pairs[k]
del self.historical_data[k[0]]
del self.historical_data[k[1]]
elif self.invested_pairs[k] == PairStatus.Short:
if v.current_spread > 0:
# self.invested_pairs[k] = PairStatus.Short
# insights.append(
# Insight.Price(
# str(k[0]),
# insightExpiry,
# InsightDirection.Down,
# None, None, None, 0.05 # Weight
# )
# )
# insights.append(
# Insight.Price(
# str(k[1]),
# insightExpiry,
# InsightDirection.Up,
# None, None, None, 0.05 # Weight
# ),
# )
# algorithm.Debug(f'[{str(k[0])}-{str(k[1])}]: Short->Short ({len(insights)})')
pass
# elif v.current_spread <= -(self.threshold * v.hist_dev):
# self.invested_pairs[k] = PairStatus.Long
# insights.append(
# Insight.Price(
# str(k[0]),
# insightExpiry,
# InsightDirection.Up,
# None, None, None, 0.05 # Weight
# )
# )
# insights.append(
# Insight.Price(
# str(k[1]),
# insightExpiry,
# InsightDirection.Down,
# None, None, None, 0.05 # Weight
# ),
# )
# # algorithm.Debug(f'[{str(k[0])}-{str(k[1])}]: Short->Long ({len(insights)})')
else:
self.invested_pairs[k] = PairStatus.Flat
insights.append(
Insight.Price(
str(k[0]),
insightExpiry,
InsightDirection.Flat,
None, None, None, 0.05 # Weight
)
)
insights.append(
Insight.Price(
str(k[1]),
insightExpiry,
InsightDirection.Flat,
None, None, None, 0.05 # Weight
),
)
# algorithm.Debug(f'[{str(k[0])}-{str(k[1])}]: Short->Close ({len(insights)})')
del self.invested_pairs[k]
del self.historical_data[k[0]]
del self.historical_data[k[1]]
# algorithm.Debug(f'{str(k[0])}/{str(k[1])}({v.distance}): {v.current_spread} - {self.threshold * v.hist_dev}')
# Reset the changes
self._changes = None
# algorithm.Debug(f'Sent out {len(insights)} on {algorithm.Time}')
return insights
def OnSecuritiesChanged(self, algorithm, changes):
self._changes = changes
def _normalize_data(self, series):
# Here we only need data from [1:] days, and [0] is only needed when we calculate the spread
arr = np.array([x for x in series.GetEnumerator()])
max = arr[1:].max()
min = arr[1:].min()
return (arr - min) / (max - min)
# from clr import AddReference
# AddReference("QuantConnect.Common")
# AddReference("QuantConnect.Algorithm.Framework")
from QuantConnect import Resolution, Extensions
from QuantConnect.Algorithm.Framework.Alphas import *
from QuantConnect.Algorithm.Framework.Portfolio import *
from itertools import groupby
from datetime import datetime, timedelta
from pytz import utc
UTCMIN = datetime.min.replace(tzinfo=utc)
from enum import Enum
class ConstructionMethod(Enum):
MARKET_NEUTRAL = 1
LONG_ONLY = 2
class CustomEqualWeightingPortfolioConstructionModel2(PortfolioConstructionModel):
'''
Description:
Provide a custom implementation of IPortfolioConstructionModel that gives equal weighting to all active securities
Details:
- The target percent holdings of each security is 1/N where N is the number of securities with active Up/Down insights
- For InsightDirection.Up, long targets are returned
- For InsightDirection.Down, short targets are returned
- For InsightDirection.Flat, closing position targets are returned
'''
def __init__(
self,
# rebalancingParam = False
capacity = 10,
# method = ConstructionMethod.LONG_ONLY,
method = ConstructionMethod.MARKET_NEUTRAL
):
'''
Description:
Initialize a new instance of CustomEqualWeightingPortfolioConstructionModel
Args:
'''
self.insightCollection = InsightCollection()
self.removedSymbols = []
self.nextExpiryTime = UTCMIN
self.construction_method = method
self.capacity = capacity
def CreateTargets(self, algorithm, insights):
'''
Description:
Create portfolio targets from the specified insights
Args:
algorithm: The algorithm instance
insights: The insights to create portfolio targets from
Returns:
An enumerable of portfolio targets to be sent to the execution model
'''
targets = []
# check if we have new insights coming from the alpha model or if some existing insights have expired
# or if we have removed symbols from the universe
if (len(insights) == 0 and algorithm.UtcTime <= self.nextExpiryTime and self.removedSymbols is None):
return targets
# here we get the new insights and add them to our insight collection
for insight in insights:
self.insightCollection.Add(insight)
# get insight that haven't expired of each symbol that is still in the universe
activeInsights = self.insightCollection.GetActiveInsights(algorithm.UtcTime)
# get the last generated active insight for each symbol
lastActiveInsights = []
for symbol, g in groupby(activeInsights, lambda x: x.Symbol):
lastActiveInsights.append(sorted(g, key = lambda x: x.GeneratedTimeUtc)[-1])
# determine target percent for the given insights
percents = self.DetermineTargetPercent(lastActiveInsights, self.capacity)
errorSymbols = {}
# check if we actually want to create new targets for the securities
for insight in lastActiveInsights:
if self.ShouldCreateTargets(algorithm, insight):
target = PortfolioTarget.Percent(algorithm, insight.Symbol, percents[insight])
if not target is None:
targets.append(target)
else:
errorSymbols[insight.Symbol] = insight.Symbol
# here we update the next expiry date in the insight collection
self.nextExpiryTime = self.insightCollection.GetNextExpiryTime()
if self.nextExpiryTime is None:
self.nextExpiryTime = UTCMIN
return targets
def DetermineTargetPercent(self, lastActiveInsights, capacity):
'''
Description:
Determine the target percent from each insight
Args:
lastActiveInsights: The active insights to generate a target from
'''
result = {}
# give equal weighting to each security
# ! Don't use insight! Need to find another fix number to replace this, such as `capacity`
if self.construction_method == ConstructionMethod.MARKET_NEUTRAL:
count = capacity * 2
# count = sum(x.Direction != InsightDirection.Flat for x in lastActiveInsights)
elif self.construction_method == ConstructionMethod.LONG_ONLY:
count = capacity
# count = sum(x.Direction == InsightDirection.Up for x in lastActiveInsights)
percent = 0 if count == 0 else 1.0 / count
for insight in lastActiveInsights:
result[insight] = insight.Direction * percent
return result
def ShouldCreateTargets(self, algorithm, insight):
# if there is an insight for a new security that's not invested, then rebalance
if not algorithm.Portfolio[insight.Symbol].Invested and insight.Direction != InsightDirection.Flat:
if self.construction_method == ConstructionMethod.MARKET_NEUTRAL:
pass
elif self.construction_method == ConstructionMethod.LONG_ONLY and insight.Direction == InsightDirection.Down:
return False
return True
# if there is an insight to close a long position, then rebalance
elif algorithm.Portfolio[insight.Symbol].IsLong and insight.Direction != InsightDirection.Up:
return True
# if there is an insight to close a short position, then rebalance
elif algorithm.Portfolio[insight.Symbol].IsShort and insight.Direction != InsightDirection.Down:
return True
return False
def OnSecuritiesChanged(self, algorithm, changes):
# get removed symbol and invalidate them in the insight collection
pass