from clr import AddReference
AddReference("System")
AddReference("QuantConnect.Common")
AddReference("QuantConnect.Indicators")
AddReference("QuantConnect.Algorithm.Framework")
from QuantConnect.Data.UniverseSelection import *
from Selection.FundamentalUniverseSelectionModel import FundamentalUniverseSelectionModel
import numpy as np
import pandas as pd
class CustomUniverseSelectionModel(FundamentalUniverseSelectionModel):
'''
Description:
This Universe model selects high P/E, EPS growth
Details:
The important thing to understand here is the internal flow of the Universe module:
1) SelectCoarse filters stocks with price above and dollar
2) SelectFine further filters those stocks by fundamental data. In this case, we use Market Cap and P/E Ratio
'''
def __init__(self,
filterFineData = True,
universeSettings = None,
securityInitializer = None):
super().__init__(filterFineData, universeSettings, securityInitializer)
self.periodCheck = -1 # initialize a variable to check when the period changes
self.fineNumber = 20
self.averages = { }
def SelectCoarse(self, algorithm, coarse):
filterCoarse = []
''' Coarse selection based on price and volume '''
# this ensures the universe selection only runs once a year
if algorithm.Time.isocalendar()[1] == self.periodCheck:
return Universe.Unchanged
self.periodCheck = algorithm.Time.isocalendar()[1]
## securities must have fundamental data (to avoid ETFs)
CoarseUniverse = [x for x in coarse if x.HasFundamentalData]
for coarse in CoarseUniverse:
symbol = coarse.Symbol
if symbol not in self.averages:
history = algorithm.History(symbol, 200, Resolution.Daily)
self.averages[symbol] = SelectionData(history)
self.averages[symbol].update(algorithm.Time, coarse.AdjustedPrice)
if self.averages[symbol].is_ready() and self.averages[symbol].fast > self.averages[symbol].slow and self.averages[symbol].fastten > self.averages[symbol].fastfive and self.averages[symbol].fastone > self.averages[symbol].fasttwo:
filterCoarse.append(symbol)
algorithm.Log('stocks with fundamental data: ' + str(len(filterCoarse)))
return filterCoarse
def SelectFine(self, algorithm, fine):
''' Fine selection based on fundamental data '''
# select small caps only (market cap more than $300 million, )
filtered_fine = [x for x in fine if 3e8 < x.MarketCap and x.ValuationRatios.PERatio > 0 and x.ValuationRatios.PriceChange1M > 0]
algorithm.Log('total number of stocks in fine selection: ' + str(len(filtered_fine)))
# rank stocks by three factor.
sortedByfactor1 = sorted(filtered_fine, key=lambda x: x.ValuationRatios.PERatio, reverse=True)
sortedByfactor2 = sorted(filtered_fine, key=lambda x: x.OperationRatios.ROE.Value, reverse=True)
sortedByfactor3 = sorted(filtered_fine, key=lambda x: x.OperationRatios.OperationRevenueGrowth3MonthAvg.Value, reverse=True)
sortedByfactor4 = sorted(filtered_fine, key=lambda x: x.ValuationRatios.PriceChange1M, reverse=True)
sortedByfactor5 = sorted(filtered_fine, key=lambda x: x.ValuationRatios.PERatio1YearGrowth, reverse=True)
stock_dict = {}
# assign a score to each stock, you can also change the rule of scoring here.
for i,ele in enumerate(sortedByfactor1):
rank1 = i
rank2 = sortedByfactor2.index(ele)
rank3 = sortedByfactor3.index(ele)
rank4 = sortedByfactor4.index(ele)
rank5 = sortedByfactor5.index(ele)
score = sum([rank1*(1/5),rank2*(1/5),rank3*(1/5),rank4*(1/5), rank5*(1/5)])
stock_dict[ele] = score
sorted_stock = sorted(stock_dict.items(), key=lambda d:d[1],reverse=False)[:self.fineNumber] #d[1] means 2nd dictionary meaning value(price) in this case, https://docs.python.org/3/howto/sorting.html
fineSelection = [x[0].Symbol for x in sorted_stock]
fine_series = pd.Series(sorted_stock)
algorithm.Log('Best stocks selection: ' + str(fine_series))
# return fineSelection ready for Alpha module
return fineSelection
class SelectionData():
#3. Update the constructor to accept a history array
def __init__(self, history):
self.slow = ExponentialMovingAverage(200)
self.fast = ExponentialMovingAverage(30)
self.fastfive = ExponentialMovingAverage(5)
self.fastten = ExponentialMovingAverage(10)
self.fastthree = ExponentialMovingAverage(3)
self.fasttwo = ExponentialMovingAverage(2)
self.fastone = ExponentialMovingAverage(1)
#4. Loop over the history data and update the indicators
for bar in history.itertuples():
self.fast.Update(bar.Index[1], bar.close)
self.slow.Update(bar.Index[1], bar.close)
self.fastfive.Update(bar.Index[1], bar.close)
self.fastten.Update(bar.Index[1], bar.close)
self.fastthree.Update(bar.Index[1], bar.close)
self.fasttwo.Update(bar.Index[1], bar.close)
self.fastone.Update(bar.Index[1], bar.close)
def is_ready(self):
return self.slow.IsReady and self.fast.IsReady and self.fastfive.IsReady and self.fastten.IsReady and self.fastthree.IsReady and self.fasttwo.IsReady and self.fastone.IsReady
def update(self, time, price):
self.fast.Update(time, price)
self.slow.Update(time, price)
self.fastfive.Update(time, price)
self.fastten.Update(time, price)
self.fastthree.Update(time, price)
self.fasttwo.Update(time, price)
self.fastone.Update(time, price)
### PRODUCT INFORMATION --------------------------------------------------------------------------------
# Copyright InnoQuantivity.com, granted to the public domain.
# Use entirely at your own risk.
# This algorithm contains open source code from other sources and no claim is being made to such code.
# Do not remove this copyright notice.
### ----------------------------------------------------------------------------------------------------
from CustomUniverseSelection import CustomUniverseSelectionModel #"From" (file name) "import" class name
from LongOnlyConstantAlphaCreation import LongOnlyConstantAlphaCreationModel
from CustomEqualWeightingPortfolioConstruction import CustomEqualWeightingPortfolioConstructionModel
class LongOnlySmallCapsLowPERatioFrameworkAlgorithm(QCAlgorithmFramework):
'''
Trading Logic:
This algorithm buys at the start of every year Small Caps with low P/E Ratio
Universe: Dynamically selects stocks at the start of each year based on:
- Price above $5
- Small Caps (Market Cap between $300 million and $2 billion)
- Then select stocks in the 1st percentile of Price To Earnings Ratio (PE Ratio)
Alpha: Constant creation of Up Insights every trading bar
Portfolio: Equal Weighting (allocate equal amounts of portfolio % to each security)
- To rebalance the portfolio periodically to ensure equal weighting, change the rebalancingParam below
Execution: Immediate Execution with Market Orders
Risk: Null
'''
def Initialize(self):
### user-defined inputs --------------------------------------------------------------
self.SetStartDate(2019, 4, 1) # set start date
#self.SetEndDate(2019, 1, 4) # set end date
self.SetCash(10000) # set strategy cash
# True/False to enable/disable filtering by fundamental data
filterFineData = True
# rebalancing period (to enable rebalancing enter an integer for number of days, e.g. 1, 7, 30, 365)
rebalancingParam = 7
### -----------------------------------------------------------------------------------
# set the brokerage model for slippage and fees
# self.SetSecurityInitializer(self.CustomSecurityInitializer)
# self.SetBrokerageModel(AlphaStreamsBrokerageModel())
# set requested data resolution and disable fill forward data
self.UniverseSettings.Resolution = Resolution.Daily
self.UniverseSettings.FillForward = False
# select modules
self.SetUniverseSelection(CustomUniverseSelectionModel(filterFineData = filterFineData))
self.SetAlpha(LongOnlyConstantAlphaCreationModel())
self.SetPortfolioConstruction(CustomEqualWeightingPortfolioConstructionModel(rebalancingParam = rebalancingParam))
self.SetExecution(ImmediateExecutionModel())
self.SetRiskManagement(CompositeRiskManagementModel(
MaximumUnrealizedProfitPercentPerSecurity(0.3),
MaximumDrawdownPercentPerSecurity(0.15)
))
# def CustomSecurityInitializer(self, security):
# '''
# Description:
# Initialize the security with adjusted prices
# Args:
# security: Security which characteristics we want to change
# '''
# security.SetDataNormalizationMode(DataNormalizationMode.Adjusted)
from clr import AddReference
AddReference("System")
AddReference("QuantConnect.Common")
AddReference("QuantConnect.Algorithm")
AddReference("QuantConnect.Algorithm.Framework")
from System import *
from QuantConnect import *
from QuantConnect.Algorithm import *
from QuantConnect.Algorithm.Framework import *
from QuantConnect.Algorithm.Framework.Alphas import AlphaModel, Insight, InsightType, InsightDirection
class LongOnlyConstantAlphaCreationModel(AlphaModel):
'''
Description:
This Alpha model creates InsightDirection.Up (to go Long) for a duration of 1 day, every day for all active securities in our Universe
Details:
The important thing to understand here is the concept of Insight:
- A prediction about the future of the security, indicating an expected Up, Down or Flat move
- This prediction has an expiration time/date, meaning we think the insight holds for some amount of time
- In the case of a constant long-only strategy, we are just updating every day the Up prediction for another extra day
- In other words, every day we are making the conscious decision of staying invested in the security one more day
'''
def __init__(self, resolution = Resolution.Daily):
self.insightExpiry = Time.Multiply(Extensions.ToTimeSpan(resolution), 0.25) # insight duration
self.insightDirection = InsightDirection.Up # insight direction
self.securities = [] # list to store securities to consider
def Update(self, algorithm, data):
insights = [] # list to store the new insights to be created
# loop through securities and generate insights
for security in self.securities:
# check if there's new data for the security or we're already invested
# if there's no new data but we're invested, we keep updating the insight since we don't really need to place orders
if data.ContainsKey(security.Symbol) or algorithm.Portfolio[security.Symbol].Invested:
# append the insights list with the prediction for each symbol
insights.append(Insight.Price(security.Symbol, self.insightExpiry, self.insightDirection))
else:
algorithm.Log('excluding this security due to missing data: ' + str(security.Symbol.Value))
return insights
def OnSecuritiesChanged(self, algorithm, changes):
'''
Description:
Event fired each time the we add/remove securities from the data feed
Args:
algorithm: The algorithm instance that experienced the change in securities
changes: The security additions and removals from the algorithm
'''
# add new securities
for added in changes.AddedSecurities:
self.securities.append(added)
# remove securities
for removed in changes.RemovedSecurities:
if removed in self.securities:
self.securities.remove(removed)
from clr import AddReference
AddReference("QuantConnect.Common")
AddReference("QuantConnect.Algorithm.Framework")
from QuantConnect import Resolution, Extensions
from QuantConnect.Algorithm.Framework.Alphas import *
from QuantConnect.Algorithm.Framework.Portfolio import *
from itertools import groupby
from datetime import datetime, timedelta
from pytz import utc
UTCMIN = datetime.min.replace(tzinfo=utc)
class CustomEqualWeightingPortfolioConstructionModel(PortfolioConstructionModel):
'''
Description:
Provide a custom implementation of IPortfolioConstructionModel that gives equal weighting to all active securities
Details:
- The target percent holdings of each security is 1/N where N is the number of securities with active Up/Down insights
- For InsightDirection.Up, long targets are returned
- For InsightDirection.Down, short targets are returned
- For InsightDirection.Flat, closing position targets are returned
'''
def __init__(self, rebalancingParam = False):
'''
Description:
Initialize a new instance of CustomEqualWeightingPortfolioConstructionModel
Args:
rebalancingParam: Integer indicating the number of days for rebalancing (default set to False, no rebalance)
- Independent of this parameter, the portfolio will be rebalanced when a security is added/removed/changed direction
'''
self.insightCollection = InsightCollection()
self.removedSymbols = []
self.nextExpiryTime = UTCMIN
self.rebalancingTime = UTCMIN
# if the rebalancing parameter is not False but a positive integer
# convert rebalancingParam to timedelta and create rebalancingFunc
if rebalancingParam > 0:
self.rebalancing = True
rebalancingParam = timedelta(days = rebalancingParam)
self.rebalancingFunc = lambda dt: dt + rebalancingParam
else:
self.rebalancing = rebalancingParam
def CreateTargets(self, algorithm, insights):
'''
Description:
Create portfolio targets from the specified insights
Args:
algorithm: The algorithm instance
insights: The insights to create portfolio targets from
Returns:
An enumerable of portfolio targets to be sent to the execution model
'''
targets = []
# check if we have new insights coming from the alpha model or if some existing insights have expired
# or if we have removed symbols from the universe
if (len(insights) == 0 and algorithm.UtcTime <= self.nextExpiryTime and self.removedSymbols is None):
return targets
# here we get the new insights and add them to our insight collection
for insight in insights:
self.insightCollection.Add(insight)
# create flatten target for each security that was removed from the universe
if self.removedSymbols is not None:
universeDeselectionTargets = [ PortfolioTarget(symbol, 0) for symbol in self.removedSymbols ]
targets.extend(universeDeselectionTargets)
self.removedSymbols = None
# get insight that haven't expired of each symbol that is still in the universe
activeInsights = self.insightCollection.GetActiveInsights(algorithm.UtcTime)
# get the last generated active insight for each symbol
lastActiveInsights = []
for symbol, g in groupby(activeInsights, lambda x: x.Symbol):
lastActiveInsights.append(sorted(g, key = lambda x: x.GeneratedTimeUtc)[-1])
errorSymbols = {}
# check if we actually want to create new targets for the securities (check function ShouldCreateTargets for details)
if self.ShouldCreateTargets(algorithm, lastActiveInsights):
# determine target percent for the given insights (check function DetermineTargetPercent for details)
percents = self.DetermineTargetPercent(lastActiveInsights)
for insight in lastActiveInsights:
target = PortfolioTarget.Percent(algorithm, insight.Symbol, percents[insight])
if not target is None:
targets.append(target)
else:
errorSymbols[insight.Symbol] = insight.Symbol
# update rebalancing time
if self.rebalancing:
self.rebalancingTime = self.rebalancingFunc(algorithm.UtcTime)
# get expired insights and create flatten targets for each symbol
expiredInsights = self.insightCollection.RemoveExpiredInsights(algorithm.UtcTime)
expiredTargets = []
for symbol, f in groupby(expiredInsights, lambda x: x.Symbol):
if not self.insightCollection.HasActiveInsights(symbol, algorithm.UtcTime) and not symbol in errorSymbols:
expiredTargets.append(PortfolioTarget(symbol, 0))
continue
targets.extend(expiredTargets)
# here we update the next expiry date in the insight collection
self.nextExpiryTime = self.insightCollection.GetNextExpiryTime()
if self.nextExpiryTime is None:
self.nextExpiryTime = UTCMIN
return targets
def ShouldCreateTargets(self, algorithm, lastActiveInsights):
'''
Description:
Determine whether we should rebalance the portfolio to keep equal weighting when:
- It is time to rebalance regardless
- We want to include some new security in the portfolio
- We want to modify the direction of some existing security
Args:
lastActiveInsights: The last active insights to check
'''
# it is time to rebalance
if self.rebalancing and algorithm.UtcTime >= self.rebalancingTime:
return True
for insight in lastActiveInsights:
# if there is an insight for a new security that's not invested, then rebalance
if not algorithm.Portfolio[insight.Symbol].Invested and insight.Direction != InsightDirection.Flat:
return True
# if there is an insight to close a long position, then rebalance
elif algorithm.Portfolio[insight.Symbol].IsLong and insight.Direction != InsightDirection.Up:
return True
# if there is an insight to close a short position, then rebalance
elif algorithm.Portfolio[insight.Symbol].IsShort and insight.Direction != InsightDirection.Down:
return True
else:
continue
return False
def DetermineTargetPercent(self, lastActiveInsights):
'''
Description:
Determine the target percent from each insight
Args:
lastActiveInsights: The active insights to generate a target from
'''
result = {}
# give equal weighting to each security
count = sum(x.Direction != InsightDirection.Flat for x in lastActiveInsights)
percent = 0 if count == 0 else 1.0 / count
for insight in lastActiveInsights:
result[insight] = insight.Direction * percent
return result
def OnSecuritiesChanged(self, algorithm, changes):
'''
Description:
Event fired each time the we add/remove securities from the data feed
Args:
algorithm: The algorithm instance that experienced the change in securities
changes: The security additions and removals from the algorithm
'''
# get removed symbol and invalidate them in the insight collection
self.removedSymbols = [x.Symbol for x in changes.RemovedSecurities]
self.insightCollection.Clear(self.removedSymbols)