Overall Statistics |
Total Trades 11 Average Win 0% Average Loss 0% Compounding Annual Return 388.152% Drawdown 5.300% Expectancy 0 Net Profit 30.339% Sharpe Ratio 5.488 Loss Rate 0% Win Rate 0% Profit-Loss Ratio 0 Alpha 1.594 Beta -15.26 Annual Standard Deviation 0.244 Annual Variance 0.06 Information Ratio 5.42 Tracking Error 0.244 Treynor Ratio -0.088 Total Fees $45.58 |
import numpy as np import datetime from scipy import stats ### <summary> ### Basic template algorithm simply initializes the date range and cash. This is a skeleton ### framework you can use for designing an algorithm. ### </summary> class StocksOnTheMove(QCAlgorithm): '''Basic template algorithm simply initializes the date range and cash''' def Initialize(self): '''Initialise the data and resolution required, as well as the cash and start-end dates for your algorithm. All algorithms must initialized.''' self.SetStartDate(2012,1,1) #Set Start Date self.SetEndDate(2012,3,1) #Set End Date self.SetCash(100000) #Set Strategy Cash # Find more symbols here: http://quantconnect.com/data self.AddEquity("SPY", Resolution.Minute) # what resolution should the data *added* to the universe be? self.UniverseSettings.Resolution = Resolution.Minute # How many stocks in the starting universe? self.__numberOfSymbols = 20 # How many stocks in the portfolio? self.number_stocks = 5 # this add universe method accepts two parameters: self.AddUniverse(self.CoarseSelectionFunction) # How far back are we looking for momentum? self.momentum_period = 20 # Schedule Indicator Update, Ranking + Rebal self.Schedule.On(self.DateRules.EveryDay("SPY"), self.TimeRules.AfterMarketOpen("SPY", 30), Action(self.rebalance)) self.Schedule.On(self.DateRules.EveryDay("SPY"), self.TimeRules.BeforeMarketClose("SPY", 0), Action(self.UpdateIndicators)) # Set Risk Factor for position sizing self.risk_factor = 0.001 # Set empty list for universe self.universe = [] # Set empty dictionary for managing & ranking the slope self.indicators_r2 = {} self.last_month_fired_coarse = None #we cannot rely on Day==1 like before self.last_month_fired_rebalance = None #we cannot rely on Day==1 like before def UpdateIndicators(self): # This updates the indicators at each data step for symbol in self.universe: # is symbol iin Slice object? (do we even have data on this step for this asset) if self.Securities.ContainsKey(symbol): # Update the dictionary for the indicator if symbol in self.indicators_r2: self.indicators_r2[symbol].update(self.Securities[symbol].Price) # Run a coarse selection filter for starting universe def CoarseSelectionFunction(self, coarse): today = self.Time #self.Log("Day = {} Month = {}".format(today.day,today.month)) # Set the Universe to rebalance on the 1st day of each quarter (can play around with this as required) if self.last_month_fired_coarse != today.month and (today.month == 1 or today.month == 4 or today.month == 7 or today.month == 10): self.last_month_fired_coarse = today.month self.Log("Day = {} Month = {}".format(today.day,today.month)) CoarseWithFundamental = [x for x in coarse if x.HasFundamentalData] sortedByDollarVolume = sorted(CoarseWithFundamental, key=lambda x: x.DollarVolume, reverse=True) result = [ x.Symbol for x in sortedByDollarVolume[:self.__numberOfSymbols] ] self.universe = result return self.universe else: return self.universe def OnSecuritiesChanged(self, changes): # Delete indicator from the dict to save Ram for security in changes.RemovedSecurities: if security.Symbol in self.indicators_r2: del self.indicators_r2[security.Symbol] self.Liquidate(security.Symbol) # Init a new custom indicator for security in changes.AddedSecurities: self.indicators_r2[security.Symbol] = RegressionSlope(self, security.Symbol, self.momentum_period, Resolution.Daily) def rebalance(self): today = self.Time if self.last_month_fired_rebalance != self.last_month_fired_coarse: # ensure we are fireing after coarse self.last_month_fired_rebalance = self.last_month_fired_coarse self.Log("Rebalance") # get values from dict symbols, slopes = zip(*[(symbol, self.indicators_r2[symbol].value) \ for symbol in self.indicators_r2 \ if self.indicators_r2[symbol].value is not None]) # sort idx_sorted = np.argsort(slopes)[::-1] # [::-1] slices backwards i.e. flips to reverse the sort order symbols =np.array(symbols)[idx_sorted] slopes = np.array(slopes)[idx_sorted] # Sort the Dictionary from highest to lowest and take the top values self.target_portfolio = symbols self.Log(str(self.target_portfolio)) # Enter or exit positions for symbol in self.universe: # Case: invested in the current symbol if self.Portfolio[symbol].HoldStock: # Exit if not a target aset if symbol not in self.target_portfolio: self.Liquidate(symbol) elif symbol in self.target_portfolio: continue # Case: not invested in the current symbol else: # symbol is a target, enter position if symbol in self.target_portfolio: # Update ATR for the stock in the new dictionary self.Log("{} {} {}".format(symbol, self.Securities[symbol].Price, self.indicators_r2[symbol].value)) # Send Orders self.SetHoldings(symbol, 1./float(self.number_stocks)) class RegressionSlope(): def __init__(self, algo, symbol, window, resolution): # set up params of per-asset rolling metric calculation self.symbol = symbol self.window = window self.resolution = resolution # the value we access, None until properly calulated self.value = None # We will store the historical window here, and keep it a fixed length in update self.history = [] # download the window. Prob not great to drag algo scope in here. Could get outside and pass in. hist_df = algo.History([symbol], window, self.resolution) # Case where no data to return for this asset. New asset? if 'close' not in hist_df.columns: return # store the target time series self.history = hist_df.close.values # calulate the metrics for the current window self.compute() def update(self, value): # update history, retain length self.history = np.append(self.history, float(value))[1:] # calulate the metrics for the current window self.compute() def compute(self): # Case where History faiiled to return window, waiting to acrew # prevent calc until window is statisfied if len(self.history) < self.window: return # copied from previous x = np.arange(len(self.history)) log_ts = np.log(self.history) slope, intercept, r_value, p_value, std_err = stats.linregress(x, log_ts) annualized_slope = (np.power(np.exp(slope), 250) - 1) * 100 annualized_slope = annualized_slope * (r_value ** 2) # update value self.value = annualized_slope