from QuantConnect import *
from QuantConnect.Algorithm import *
from QuantConnect.Algorithm.Framework import *
from QuantConnect.Algorithm.Framework.Alphas import *
from QuantConnect.Algorithm.Framework.Execution import *
from QuantConnect.Algorithm.Framework.Portfolio import *
from QuantConnect.Algorithm.Framework.Risk import *
from QuantConnect.Algorithm.Framework.Selection import *
from QuantConnect.Data.UniverseSelection import *
from QuantConnect.Indicators import Variance
from QuantConnect.Orders import *
from Selection.FundamentalUniverseSelectionModel import FundamentalUniverseSelectionModel
from datetime import timedelta
import math as m
import numpy as np
import pandas as pd
import ev_ebit
from pricehist import PriceHistoryManager
class DownThenFlat(QCAlgorithm):
def __init__(self):
# Number of stocks to select from the alpha model
self._alpha_selection_max = 20
def Initialize(self):
# Short backtest
# self.SetStartDate(2018, 1, 1)
# Long backtest - errors out
self.SetStartDate(2005, 1, 1)
# If not specified, the backtesting EndDate would be today
self.UniverseSettings.Resolution = Resolution.Daily
# self.SetEndDate(2015, 7, 1)
self.AddUniverseSelection(ValueUniverseSelection(0.4, 0.7))
self.SetPortfolioConstruction(EqualWeightingPortfolioConstructionModel(timedelta(int(BARS_PER_YEAR/24) + 1)))
# self.SetExecution(StandardDeviationExecutionModel(deviations=1, period=5, resolution=Resolution.Daily))
def has_all_fundamentals(fine_data):
return ev_ebit.has_ebit_ev_fundamentals(fine_data) and fine_data.MarketCap
def select_from(algorithm, pool, sort_value, reverse=False, proportion=None, value_name='Value'):
if proportion is not None:
select_count = int(len(pool) * proportion)
raise Exception("Proportion not provided")
selection = sorted(pool, key=sort_value, reverse=reverse)[:select_count]
algorithm.Log(f'{value_name} between {sort_value(selection[0])} and {sort_value(selection[-1])}')
return selection
class ValueUniverseSelection(FundamentalUniverseSelectionModel):
def __init__(self, _percent_volume, _percent_price):
super().__init__(True, None, None)
self._last_month = None
# Number of stocks to pass CoarseSelection process
self._p_coarse_vol = _percent_volume
self._p_coarse_price = _percent_price
# One quarter of trading year
self._num_bars = int(BARS_PER_YEAR / 4)
self._price_history_manager = PriceHistoryManager(self._num_bars)
def close_histories(self):
return self._price_history_manager.close_histories
def select_by_variance(self, algorithm):
close_variances = {}
for sym, closes in self.close_histories.items():
variance = closes.var()
if m.isfinite(variance):
close_variances[sym] = variance
algorithm.Log(f'Dropping {sym} which has variance {variance}')
sorted_by_var = sorted(close_variances.items(), key=lambda kv: kv[1])
head_tail_size = int(len(sorted_by_var) / 4)
medium_var = sorted_by_var[head_tail_size:-head_tail_size]
algorithm.Log('Dropping highest and lowest {} by variance. Min/max: {}/{}'
.format(head_tail_size, medium_var[0][1], medium_var[-1][1]))
return [s[0] for s in medium_var]
def SelectCoarse(self, algorithm, coarse):
if self._last_month == algorithm.Time.month:
return Universe.Unchanged
# Setting last_month happens in fine selection
algorithm.Log('Running coarse selection')
with_fundamental = [x for x in coarse if x.HasFundamentalData]
by_vol = select_from(algorithm, with_fundamental, lambda s: s.DollarVolume,
reverse=True, proportion=self._p_coarse_vol, value_name='Volume')
by_price = select_from(algorithm, with_fundamental, lambda s: s.Price,
reverse=True, proportion=self._p_coarse_price, value_name='Price')
by_price_and_vol = set(by_vol).intersection(by_price)
symbols_by_pv = [s.Symbol for s in by_price_and_vol]
self._price_history_manager.update_close_histories(algorithm, symbols_by_pv)
by_var = self.select_by_variance(algorithm)
return by_var
def SelectFine(self, algorithm, fine):
if self._last_month == algorithm.Time.month:
return Universe.Unchanged
self._last_month = algorithm.Time.month
algorithm.Log('Running fine selection')
with_fundamentals = [f for f in fine if has_all_fundamentals(f)]
min_cap = np.percentile([f.MarketCap for f in fine], 60)
with_cap = [f for f in with_fundamentals if f.MarketCap >= min_cap]
algorithm.Log(f'Market cap limited at {min_cap} yields {len(with_cap)} candidates.')
return [f.Symbol for f in with_cap]
def compute_returns(history):
return history[-1]/history[0] - 1
class LongShortAllValueAlphaModel(AlphaModel):
def __init__(self, _alpha_max_count):
self._last_month = None
self._max_long_count = _alpha_max_count
self._max_short_count = _alpha_max_count
self._candidate_factor = 3
# For the purposes of the algorithm, the number of days we want to assume
# for each month. This is not exact, but it should be a good enough measure
self._month_days = timedelta(28)
# Longs and shorts will be set by OnSecuritiesChanged, and will be used in
# Update. The latter will not do any data retrieval.
self._longs = []
self._shorts = []
self._price_history_manager = PriceHistoryManager(BARS_PER_YEAR)
self._universe = set([])
def split_long_short_candidates(self):
# TODO Include debt for sorting
ebit_evs = [ev_ebit.ebit_to_ev(f.Fundamentals) for f in self._universe]
ebit_ev_syms = [(e.Symbol, ee) for e, ee in zip(self._universe, ebit_evs) if m.isfinite(ee) and ee != 0]
ebit_ev_sorted = [kv[0] for kv in sorted(ebit_ev_syms, key=lambda kv: kv[1], reverse=True)]
long_candidates_count = self._max_long_count * self._candidate_factor
short_candidates_count = self._max_short_count * self._candidate_factor
if long_candidates_count + short_candidates_count > len(ebit_ev_sorted):
long_candidates_count = int(len(ebit_ev_sorted) / 2)
short_candidates_count = len(ebit_ev_sorted) - long_candidates_count
return ebit_ev_sorted[:long_candidates_count], ebit_ev_sorted[-short_candidates_count:]
def order_candidates(self, candidates):
down_phase, flat_phase = self._price_history_manager.split_at_ratio(0.75, candidates)
down_ranks = (pd.Series({s: compute_returns(h) for s, h in down_phase.items()})
flat_ranks = (pd.Series({s1: compute_returns(h1) for s1, h1 in flat_phase.items()})
aggregate_rank = down_ranks + flat_ranks
return aggregate_rank.sort_values().index
def OnSecuritiesChanged(self, algorithm, changes):
added = set(changes.AddedSecurities)
removed = set(changes.RemovedSecurities)
self._universe = self._universe.union(added).difference(removed)
algorithm.Log(f'Updating securities in alpha model.'
f' Added: {len(added)}. Removed: {len(removed)}. Universe: {len(self._universe)}')
algorithm.Log(f'Active securities: {algorithm.ActiveSecurities.Count}')
long_candidates, short_candidates = self.split_long_short_candidates()
self._price_history_manager.update_close_histories(algorithm, long_candidates + short_candidates)
self._longs = self.order_candidates(long_candidates)[:self._max_long_count]
self._shorts = list(reversed(self.order_candidates(short_candidates)))[:self._max_short_count]
def Update(self, algorithm, data):
# Emit signals once a month
if self._last_month == algorithm.Time.month:
return []
self._last_month = algorithm.Time.month
insights = []
for kvp in algorithm.Portfolio:
holding = kvp.Value
symbol = holding.Symbol
if holding.Invested and symbol not in self._longs and symbol not in self._shorts:
insights.append(Insight.Price(symbol, self._month_days, InsightDirection.Flat))
for symbol in self._longs:
insights.append(Insight.Price(symbol, self._month_days, InsightDirection.Up))
# for symbol in self._shorts:
# insights.append(Insight.Price(symbol, self._month_days, InsightDirection.Down))
return insights