Overall Statistics |
Total Trades 2604 Average Win 0.50% Average Loss -0.28% Compounding Annual Return 22.301% Drawdown 30.700% Expectancy 0.477 Net Profit 1029.454% Sharpe Ratio 0.815 Loss Rate 47% Win Rate 53% Profit-Loss Ratio 1.78 Alpha 0.194 Beta -0.013 Annual Standard Deviation 0.236 Annual Variance 0.056 Information Ratio 0.365 Tracking Error 0.294 Treynor Ratio -14.602 Total Fees $3627.05 |
import numpy as np import datetime from scipy import stats class StocksOnTheMove(QCAlgorithm): '''Basic template algorithm simply initializes the date range and cash''' def Initialize(self): '''Initialise the data and resolution required, as well as the cash and start-end dates for your algorithm. All algorithms must initialized.''' self.SetStartDate(2006,1,1) #Set Start Date #self.SetEndDate(2012,3,1) #Set End Date self.SetCash(100000) #Set Strategy Cash # Find more symbols here: http://quantconnect.com/data self.AddEquity("SPY", Resolution.Minute) self.SetBenchmark("SPY") # what resolution should the data *added* to the universe be? self.UniverseSettings.Resolution = Resolution.Daily # How many stocks in the starting universe? self.__numberOfSymbols = 300 # How many stocks in the portfolio? self.number_stocks = 30 # this add universe method accepts two parameters: self.AddUniverse(self.CoarseSelectionFunction) # How far back are we looking for momentum? self.momentum_period = 126 self.SetWarmUp(self.momentum_period) # Schedule Indicator Update, Ranking + Rebal self.Schedule.On(self.DateRules.MonthStart("SPY"), self.TimeRules.AfterMarketOpen("SPY", 30), Action(self.rebalance)) self.Schedule.On(self.DateRules.MonthStart("SPY"), self.TimeRules.BeforeMarketClose("SPY", 0), Action(self.UpdateIndicators)) # Set Risk Factor for position sizing self.risk_factor = 0.001 # Set empty list for universe self.universe = [] # Set empty dictionary for managing & ranking the slope self.indicators_r2 = {} self.last_month_fired_coarse = None #we cannot rely on Day==1 like before self.last_month_fired_rebalance = None #we cannot rely on Day==1 like before self.kama = self.KAMA("SPY", 200, Resolution.Daily) def UpdateIndicators(self): # This updates the indicators at each data step for symbol in self.universe: # is symbol iin Slice object? (do we even have data on this step for this asset) if self.Securities.ContainsKey(symbol): # Update the dictionary for the indicator if symbol in self.indicators_r2: self.indicators_r2[symbol].update(self.Securities[symbol].Price) # Run a coarse selection filter for starting universe def CoarseSelectionFunction(self, coarse): today = self.Time #self.Log("Day = {} Month = {}".format(today.day,today.month)) # Set the Universe to rebalance on the 1st day of each quarter (can play around with this as required) if self.last_month_fired_coarse != today.month: self.last_month_fired_coarse = today.month CoarseWithFundamental = [x for x in coarse if x.HasFundamentalData] sortedByDollarVolume = sorted(CoarseWithFundamental, key=lambda x: x.DollarVolume, reverse=True) result = [ x.Symbol for x in sortedByDollarVolume[:self.__numberOfSymbols] ] self.universe = result return self.universe else: return self.universe def OnSecuritiesChanged(self, changes): # Delete indicator from the dict to save Ram for security in changes.RemovedSecurities: if security.Symbol in self.indicators_r2: del self.indicators_r2[security.Symbol] self.Liquidate(security.Symbol) # Init a new custom indicator for security in changes.AddedSecurities: self.indicators_r2[security.Symbol] = RegressionSlope(self, security.Symbol, self.momentum_period, Resolution.Daily) def rebalance(self): today = self.Time if self.last_month_fired_rebalance != self.last_month_fired_coarse: # ensure we are fireing after coarse self.last_month_fired_rebalance = self.last_month_fired_coarse self.Log("Rebalance") # get values from dict symbols, slopes = zip(*[(symbol, self.indicators_r2[symbol].value) \ for symbol in self.indicators_r2 \ if self.indicators_r2[symbol].value is not None]) # sort idx_sorted = np.argsort(slopes)[::-1] # [::-1] slices backwards i.e. flips to reverse the sort order symbols =np.array(symbols)[idx_sorted] slopes = np.array(slopes)[idx_sorted] # Sort the Dictionary from highest to lowest and take the top values self.target_portfolio = symbols self.Log(str(self.target_portfolio)) # Enter or exit positions for symbol in self.universe: if self.Securities["SPY"].Price > self.kama.Current.Value: # Case: invested in the current symbol if self.Portfolio[symbol].HoldStock: # Exit if not a target aset if symbol not in self.target_portfolio: self.Liquidate(symbol) elif symbol in self.target_portfolio: continue # Case: not invested in the current symbol else: # symbol is a target, enter position if symbol in self.target_portfolio: # Update ATR for the stock in the new dictionary # Send Orders self.SetHoldings(symbol, 1./float(self.number_stocks)) else: self.Liquidate(symbol) class RegressionSlope(): def __init__(self, algo, symbol, window, resolution): # set up params of per-asset rolling metric calculation self.symbol = symbol self.window = window self.resolution = resolution # the value we access, None until properly calulated self.value = None # We will store the historical window here, and keep it a fixed length in update self.history = [] # download the window. Prob not great to drag algo scope in here. Could get outside and pass in. hist_df = algo.History([symbol], window, self.resolution) # Case where no data to return for this asset. New asset? if 'close' not in hist_df.columns: return # store the target time series self.history = hist_df.close.values # calulate the metrics for the current window self.compute() def update(self, value): # update history, retain length self.history = np.append(self.history, float(value))[1:] # calulate the metrics for the current window self.compute() def compute(self): # Case where History faiiled to return window, waiting to acrew # prevent calc until window is statisfied if len(self.history) < self.window: return # copied from previous x = np.arange(len(self.history)) log_ts = np.log(self.history) slope, intercept, r_value, p_value, std_err = stats.linregress(x, log_ts) annualized_slope = (np.power(np.exp(slope), 250) - 1) * 100 annualized_slope = annualized_slope * (r_value ** 2) # update value self.value = annualized_slope