Overall Statistics |
Total Trades 0 Average Win 0% Average Loss 0% Compounding Annual Return 0% Drawdown 0% Expectancy 0 Net Profit 0% Sharpe Ratio 0 Loss Rate 0% Win Rate 0% Profit-Loss Ratio 0 Alpha 0 Beta 0 Annual Standard Deviation 0 Annual Variance 0 Information Ratio 0 Tracking Error 0 Treynor Ratio 0 Total Fees $0.00 |
from scipy.optimize import leastsq from statsmodels.tsa import stattools, ar_model import pandas as pd from datetime import timedelta from enum import Enum import logging log = logging.getLogger() class Weights: def __init__(self, names): self.values = {n: 1/len(names) for n in names} def set(self, **kwargs): for n, v in kwargs.items(): if n in self.values: self.values[n] = v # scale weights to one s = np.sum(self.values.values()) self.values = {n: v/s for n, v in self.values.items()} def get(self, n): if n in self.values: return self.values[n] def estimate(self, df): pass def calculate_cointegration_weights(arr, p_thres=0.05): """calculate and tests cointegration of two symbols Args: arr (np.ndarray): of size (T, 2) Returns: weights (float): normed weights for cointegration portfolio """ if arr.shape[1] != 2: raise Exception("Second dimension must be of size 2") loss = lambda b: arr[:, 0] - b*arr[:, 1] beta, _ = leastsq(loss, 0) adres = stattools.adfuller(resi, regresults=True, maxlag=1) if adres[1] > p_thres: raise ValueError("p-value %s ad fuller is above threshold of (%s)"%(adres[1], p_thres)) return np.array([1, -beta])/(1+beta) class CointegrationAlphaModel(AlphaModel): """use cointegration estimated from historical prices""" class State(Enum): ShortRatio = -1 FlatRatio = 0 LongRatio = 1 def __init__(self, asset1, asset2, lookback, weights, threshold=1): """Initializes a new default instance of the CointegrationAlphaModel class. Args: asset1: The first asset's symbol in the pair asset2: The second asset's symbol in the pair lookback(int): Historical return lookback period """ self.asset1 = asset1 self.asset2 = asset2 self.asset1Price = RollingWindow[IndicatorDataPoint](lookback) self.asset2Price = RollingWindow[IndicatorDataPoint](lookback) self.weights = weights self.state = self.State.FlatRatio self.ratio = None self.mean = None self.threshold = threshold self.upperThreshold = None; self.lowerThreshold = None; def Update(self, algorithm, data): ''' Updates this alpha model with the latest data from the algorithm. This is called each time the algorithm receives data for subscribed securities Args: algorithm: The algorithm instance data: The new data available Returns: The new insights generated''' if self.mean is None or not self.mean.IsReady: return [] # don't re-emit the same direction if self.state is not self.State.LongRatio and self.ratio > self.upperThreshold: self.state = self.State.LongRatio # asset1/asset2 is more than 2 std away from mean, short asset1, long asset2 shortAsset1 = Insight.Price(self.asset1, timedelta(minutes = 15), InsightDirection.Down) longAsset2 = Insight.Price(self.asset2, timedelta(minutes = 15), InsightDirection.Up) # creates a group id and set the GroupId property on each insight object return Insight.Group(shortAsset1, longAsset2) # don't re-emit the same direction if self.state is not self.State.ShortRatio and self.ratio < self.lowerThreshold: self.state = self.State.ShortRatio # asset1/asset2 is less than 2 std away from mean, long asset1, short asset2 longAsset1 = Insight.Price(self.asset1, timedelta(minutes = 15), InsightDirection.Up) shortAsset2 = Insight.Price(self.asset2, timedelta(minutes = 15), InsightDirection.Down) # creates a group id and set the GroupId property on each insight object return Insight.Group(longAsset1, shortAsset2) return [] def OnSecuritiesChanged(self, algorithm, changes): '''Event fired each time the we add/remove securities from the data feed. Args: algorithm: The algorithm instance that experienced the change in securities changes: The security additions and removals from the algorithm''' for added in changes.AddedSecurities: # this model is limitted to looking at a single pair of assets if added.Symbol != self.asset1 and added.Symbol != self.asset2: continue if added.Symbol == self.asset1: self.asset1Price.Add(algorithm.Identity(added.Symbol)) else: self.asset2Price.Add(algorithm.Identity(added.Symbol)) if self.ratio is None: # initialize indicators dependent on both assets if self.asset1Price.IsReady() and self.asset2Price.IsReady(): self.ratio = IndicatorExtensions.Over(self.asset1Price, self.asset2Price) self.mean = IndicatorExtensions.Of(ExponentialMovingAverage(500), self.ratio) upper = ConstantIndicator[IndicatorDataPoint]("ct", 1 + self.threshold / 100) self.upperThreshold = IndicatorExtensions.Times(self.mean, upper) lower = ConstantIndicator[IndicatorDataPoint]("ct", 1 - self.threshold / 100) self.lowerThreshold = IndicatorExtensions.Times(self.mean, lower)
import numpy as np class WeightedPortfolioConstructionModel(PortfolioConstructionModel): '''Provides an implementation of IPortfolioConstructionModel that gives equal weighting to all securities. The target percent holdings of each security is 1/N where N is the number of securities. For insights of direction InsightDirection.Up, long targets are returned and for insights of direction InsightDirection.Down, short targets are returned.''' def __init__(self, weights): self.weights = weights self.insightCollection = InsightCollection() self.removedSymbols = [] def CreateTargets(self, algorithm, insights): '''Create portfolio targets from the specified insights Args: algorithm: The algorithm instance insights: The insights to create portoflio targets from Returns: An enumerable of portoflio targets to be sent to the execution model''' self.insightCollection.AddRange(insights) targets = [] if self.removedSymbols is not None: # zero out securities removes from the universe for symbol in self.removedSymbols: targets.append(PortfolioTarget(symbol, 0)) self.removedSymbols = None if len(insights) == 0: return targets # Get symbols that have emit insights and still in the universe symbols = list(set([x.Symbol for x in self.insightCollection if x.CloseTimeUtc > algorithm.UtcTime])) for symbol in symbols: activeInsights = [ x for x in self.insightCollection if x.Symbol == symbol ] w = self.weights.get(symbol) if w is None: return [] direction = activeInsights[-1].Direction targets.append(PortfolioTarget.Percent(algorithm, symbol, direction * w)) return targets def OnSecuritiesChanged(self, algorithm, changes): '''Event fired each time the we add/remove securities from the data feed Args: algorithm: The algorithm instance that experienced the change in securities changes: The security additions and removals from the algorithm''' # save securities removed so we can zero out our holdings self.removedSymbols = [x.Symbol for x in changes.RemovedSecurities] # remove the insights of the removed symbol from the collection for removedSymbol in self.removedSymbols: if self.insightCollection.ContainsKey(removedSymbol): for insight in self.insightCollection[removedSymbol]: self.insightCollection.Remove(insight)
import numpy as np import pandas as pd def calculate_cointegration_weights(arr, p_thres=0.05): """calculate and tests cointegration of two symbols Args: arr (np.ndarray): of size (T, 2) Returns: weights (float): normed weights for cointegration portfolio """ if arr.shape[1] != 2: raise Exception("Second dimension must be of size 2") loss = lambda b: arr[:, 0] - b*arr[:, 1] beta, _ = leastsq(loss, 0) adres = stattools.adfuller(resi, regresults=True, maxlag=1) if adres[1] > p_thres: raise ValueError("p-value %s ad fuller is above threshold of (%s)"%(adres[1], p_thres)) return np.array([1, -beta])/(1+beta) class LongShortCointegrationPairs(QCAlgorithm): def Initialize(self): self.asset1 = 'SPY' self.asset2 = 'IWM' self.periods = 10 self.SetStartDate(2002, 1, 1) #Set Start Date self.SetEndDate(2005, 12, 31) #Set End Date self.SetCash(10000) #Set Strategy Cash for sym in [self.asset1, self.asset2]: self.AddEquity(sym, Resolution.Daily) self.p1 = RollingWindow[float](self.periods) self.p2 = RollingWindow[float](self.periods) self.weights = None def onData(self, data): # check if prices for both asset are present if self.asset1 in data and self.asset2 in data: self.p1.Add(data[self.asset1].Close) self.p2.Add(data[self.asset2].Close) else: return None if self.p1.IsReady() and self.p2.IsReady() and self.weights is None: arr = np.matrix([self.p1[:], self.p2[:]]).T self.weights = calculate_cointegration_weights(arr) self.SetHoldings("SPY", 1)