Overall Statistics |
Total Trades 398 Average Win 1.03% Average Loss -0.77% Compounding Annual Return 9.891% Drawdown 18.800% Expectancy 0.232 Net Profit 40.867% Sharpe Ratio 0.568 Probabilistic Sharpe Ratio 17.130% Loss Rate 47% Win Rate 53% Profit-Loss Ratio 1.33 Alpha 0.102 Beta -0.035 Annual Standard Deviation 0.17 Annual Variance 0.029 Information Ratio -0.174 Tracking Error 0.263 Treynor Ratio -2.735 Total Fees $41411.49 Estimated Strategy Capacity $17000000.00 Lowest Capacity Asset AJG R735QTJ8XC9X |
import numpy as np import pandas as pd from scipy import stats class CryingVioletKitten(QCAlgorithm): def Initialize(self): self.SetStartDate(2018, 1, 27) # Set Start Date self.SetCash(10000000) # Set Strategy Cash # self.AddEquity("SPY", Resolution.Minute) self.pickedUniverse = False self.lastMonth = -1 self.UniverseSettings.Resolution = Resolution.Daily self.market = self.AddEquity("SPY", Resolution.Daily).Symbol #sid(8554) self.market_window = 200 self.atr_window = 20 self.talib_window = self.atr_window + 5 self.risk_factor = 0.002 #0.01 = less position, more % but more risk Default for strategy is .001 - 0.00075 self.momentum_window_length = 90 self.market_cap_limit = 700 self.rank_table_percentile = .3 self.significant_position_difference = 0.1 self.min_momentum = 0.020 self.leverage_factor = 1 #1=2154%. Guy's version is 1.4=3226% self.use_stock_trend_filter = False #either False = Off, True = On self.sma_window_length = 100 #Used for the stock trend filter self.use_market_trend_filter = 1 #either 0 = Off, 1 = On. Filter on SPY self.use_average_true_range = 0 #either 0 = Off, 1 = On. Manage risk with individual stock volatility self.average_true_rage_multipl_factor = 1 #Change the weight of the ATR. 1327% self.assets = [] self.assetATR = {} # Schedule my rebalance function self.Schedule.On(self.DateRules.MonthStart("SPY"), self.TimeRules.BeforeMarketClose(self.market, 0), self.Rebalance) # Cancel all open orders at the end of each day. # self.Schedule.On(self.DateRules.EveryDay("SPY"), # self.TimeRules.BeforeMarketClose(self.market, 0), # self.CancelOpenOrders) self.lastMonth = -1 self.AddUniverse(self.Coarse, self.Fine) def ConvertToQuantopianHistory(self, OHCL, history, isMarket = False): try: prices = {} for tuple in history.itertuples(): symbol = tuple.Index[0] if symbol not in prices: prices[symbol] = [] if OHCL == "high": prices[symbol].append(tuple.high) elif OHCL == "low": prices[symbol].append(tuple.low) elif OHCL == "close": prices[symbol].append(tuple.close) elif OHCL == "open": prices[symbol].append(tuple.open) remove = [] if isMarket == False: for symbol, price in prices.items(): if len(price) != self.talib_window: remove.append(symbol) for symbol in remove: prices.pop(symbol, None) if symbol in self.assets: self.assets.remove(symbol) if symbol in self.assetATR: self.assetATR.pop(symbol, None) self.RemoveSecurity(symbol) prices = pd.DataFrame.from_dict(prices) except: self.Debug("Failed") return prices def SignificantChangeInPositionSize(self, new_position_size, old_position_size): return np.abs((new_position_size - old_position_size) / old_position_size) > self.significant_position_difference def GetPositionSize(self, security): try: averageTrueRange = self.assetATR[security].Current.Value if not self.use_average_true_range: #average_true_range average_true_range = 1 #divide by 1 gives... same initial number self.average_true_rage_multipl_factor = 1 return (self.Portfolio.TotalPortfolioValue * self.risk_factor) / (average_true_range * self.average_true_rage_multipl_factor) except: # return 0 atr = self.ATR(security, self.atr_window, MovingAverageType.Simple, Resolution.Daily) history = self.History(security, self.atr_window, Resolution.Daily) for tuple in history.loc[security].itertuples(): bar = TradeBar(tuple.Index, security, tuple.open, tuple.high, tuple.low, tuple.close, tuple.volume) atr.Update(bar) self.assetATR[security] = atr try: averageTrueRange = self.assetATR[security].Current.Value if not self.use_average_true_range: #average_true_range average_true_range = 1 #divide by 1 gives... same initial number self.average_true_rage_multipl_factor = 1 return (self.Portfolio.TotalPortfolioValue * self.risk_factor) / (average_true_range * self.average_true_rage_multipl_factor) except: return 0 def Rebalance(self): history = self.History(self.assets, self.talib_window, Resolution.Daily) highs = self.ConvertToQuantopianHistory("high", history) lows = self.ConvertToQuantopianHistory("low", history) closes = self.ConvertToQuantopianHistory("close", history) estimated_cash_balance = self.Portfolio.Cash slopes = np.log(closes[self.assets].tail(self.momentum_window_length)).apply(self._slope) # self.Debug(slopes.order(ascending=False).head(10)) slopes = slopes[slopes > self.min_momentum] ranking_table = slopes[slopes > slopes.quantile(1 - self.rank_table_percentile)] # close positions that are no longer in the top of the ranking table positions = [x.Key for x in self.Portfolio if x.Value.Invested] for security in positions: price = self.Securities[security].Price position_size = self.Portfolio[security].Quantity if security not in ranking_table.index: # and data.can_trade: self.Liquidate(security) #order_target(security, 0, style=LimitOrder(price)) estimated_cash_balance += price * position_size else: #if data.can_trade(security): new_position_size = self.GetPositionSize(security) if self.SignificantChangeInPositionSize(new_position_size, position_size): estimated_cost = price * (new_position_size * self.leverage_factor - position_size) if new_position_size * self.leverage_factor == 0: self.Liquidate(security) else: self.MarketOrder(security, int(new_position_size * self.leverage_factor)) # self.SetHoldings(security, new_position_size * self.leverage_factor) #order_target(security, new_position_size * context.leverage_factor, style=LimitOrder(price)) estimated_cash_balance -= estimated_cost # Market history is not used with the trend filter disabled # Removed for efficiency if self.use_market_trend_filter: market_history = self.History(self.market, self.market_window, Resolution.Daily) market_history = self.ConvertToQuantopianHistory("close", market_history, True) current_market_price = self.Securities[self.market].Price average_market_price = market_history.mean() else: average_market_price = 0 if (current_market_price > average_market_price[0]) : #if average is 0 then jump in for security in ranking_table.index: if security not in [x.Key for x in self.Portfolio if x.Value.Invested]: #and data.can_trade(security): new_position_size = self.GetPositionSize(security) estimated_cost = self.Securities[security].Price * new_position_size * self.leverage_factor if estimated_cash_balance > estimated_cost: posSize = new_position_size * self.leverage_factor if posSize == 0: self.Liquidate(security) else: self.MarketOrder(security, int(posSize)) # self.SetHoldings(security, posSize) #order_target(security, new_position_size * context.leverage_factor, style=LimitOrder(data.current(security, "price"))) estimated_cash_balance -= estimated_cost def CancelOpenOrders(self): openOrders = self.Transactions.GetOpenOrders() if len(openOrders)> 0: for x in openOrders: self.Transactions.CancelOrder(x.Id) #record(exposure=context.account.leverage) def OnData(self, data): pass def _slope(self, ts): x = np.arange(len(ts)) slope, intercept, r_value, p_value, std_err = stats.linregress(x, ts) annualized_slope = (1 + slope)**250 return annualized_slope * (r_value ** 2) def OnSecuritiesChanged(self, changes): for security in changes.RemovedSecurities: symbol = security.Symbol self.Liquidate(symbol) if symbol in self.assets: self.assets.remove(symbol) if str(symbol) in self.assetATR: self.assetATR.pop(str(symbol), None) for security in changes.AddedSecurities: symbol = security.Symbol if str(symbol) not in self.assetATR: atr = self.ATR(symbol, self.atr_window, MovingAverageType.Simple, Resolution.Daily) history = self.History(symbol, self.atr_window, Resolution.Daily) for tuple in history.loc[symbol].itertuples(): bar = TradeBar(tuple.Index, symbol, tuple.open, tuple.high, tuple.low, tuple.close, tuple.volume) atr.Update(bar) self.assetATR[str(symbol)] = atr if symbol == self.market: continue if symbol not in self.assets: self.assets.append(symbol) def Coarse(self, coarse): if self.lastMonth == self.Time.month: return Universe.Unchanged self.lastMonth = self.Time.month # if self.pickedUniverse == True: # return Universe.Unchanged filteredCoarse = [x.Symbol for x in coarse if x.HasFundamentalData and x.DollarVolume > 0 and x.Price > 0] return filteredCoarse def Fine(self, fine): marketCapFilter = [x for x in fine] marketCapFilter = sorted(marketCapFilter, key = lambda x: x.MarketCap, reverse = True)[:self.market_cap_limit] fineFiltered = [x for x in marketCapFilter if x.SecurityReference.IsPrimaryShare == True and x.SecurityReference.IsDepositaryReceipt == False] fineFilteredSymbols = [x.Symbol for x in fineFiltered] final = [] if self.use_stock_trend_filter == True: history = self.History(fineFilteredSymbols, self.sma_window_length, Resolution.Daily) sma = {} for symbol in fineFillteredSymbols: curSma = SimpleMovingAverage(self.sma_window_length) for tuple in history.loc[symbol].itertuples(): curSma.Update(tuple.Index, tuple.close) sma[symbol] = curSma.Current.Value smaFiltered = [x.Symbol for x in fineFiltered if x.Price > sma[x.Symbol]] final = smaFiltered else: final = fineFilteredSymbols self.pickedUniverse = True return final