Overall Statistics
Total Trades
398
Average Win
1.03%
Average Loss
-0.77%
Compounding Annual Return
9.891%
Drawdown
18.800%
Expectancy
0.232
Net Profit
40.867%
Sharpe Ratio
0.568
Probabilistic Sharpe Ratio
17.130%
Loss Rate
47%
Win Rate
53%
Profit-Loss Ratio
1.33
Alpha
0.102
Beta
-0.035
Annual Standard Deviation
0.17
Annual Variance
0.029
Information Ratio
-0.174
Tracking Error
0.263
Treynor Ratio
-2.735
Total Fees
$41411.49
Estimated Strategy Capacity
$17000000.00
Lowest Capacity Asset
AJG R735QTJ8XC9X
import numpy as np
import pandas as pd
from scipy import stats

class CryingVioletKitten(QCAlgorithm):

    def Initialize(self):
        self.SetStartDate(2018, 1, 27)  # Set Start Date
        self.SetCash(10000000)  # Set Strategy Cash
        # self.AddEquity("SPY", Resolution.Minute)
        self.pickedUniverse = False
        self.lastMonth = -1
        self.UniverseSettings.Resolution = Resolution.Daily
        self.market = self.AddEquity("SPY", Resolution.Daily).Symbol #sid(8554)
        self.market_window = 200
        self.atr_window = 20
        self.talib_window = self.atr_window + 5
        self.risk_factor = 0.002 #0.01 = less position, more % but more risk Default for strategy is .001 - 0.00075
        self.momentum_window_length = 90
        self.market_cap_limit = 700
        self.rank_table_percentile = .3
        self.significant_position_difference = 0.1
        self.min_momentum = 0.020
        self.leverage_factor = 1  #1=2154%. Guy's version is 1.4=3226%
        self.use_stock_trend_filter = False #either False = Off, True = On
        self.sma_window_length = 100 #Used for the stock trend filter
        self.use_market_trend_filter = 1 #either 0 = Off, 1 = On. Filter on SPY
        self.use_average_true_range = 0 #either 0 = Off, 1 = On. Manage risk with individual stock volatility
        self.average_true_rage_multipl_factor = 1 #Change the weight of the ATR. 1327%
        self.assets = []
        self.assetATR = {}

        # Schedule my rebalance function
        self.Schedule.On(self.DateRules.MonthStart("SPY"),
                 self.TimeRules.BeforeMarketClose(self.market, 0),       
                 self.Rebalance)
        # Cancel all open orders at the end of each day.
        # self.Schedule.On(self.DateRules.EveryDay("SPY"),
        #          self.TimeRules.BeforeMarketClose(self.market, 0),       
        #          self.CancelOpenOrders)
        
        self.lastMonth = -1
        
        self.AddUniverse(self.Coarse, self.Fine)

    def ConvertToQuantopianHistory(self, OHCL, history, isMarket = False):
        try:
            prices = {}
            for tuple in history.itertuples():
                symbol = tuple.Index[0]
                if symbol not in prices:
                    prices[symbol] = []
                
                if OHCL == "high":
                    prices[symbol].append(tuple.high)
                elif OHCL == "low":
                    prices[symbol].append(tuple.low)
                elif OHCL == "close":
                    prices[symbol].append(tuple.close)
                elif OHCL == "open":
                    prices[symbol].append(tuple.open)
            remove = []
            if isMarket == False:
                for symbol, price in prices.items():
                    if len(price) != self.talib_window:
                        remove.append(symbol)
                for symbol in remove:
                    prices.pop(symbol, None)
                    if symbol in self.assets:
                        self.assets.remove(symbol)
                    if symbol in self.assetATR:
                        self.assetATR.pop(symbol, None)
                    self.RemoveSecurity(symbol)
            prices = pd.DataFrame.from_dict(prices)
        except:
            self.Debug("Failed")
        return prices
        
    def SignificantChangeInPositionSize(self, new_position_size, old_position_size):
        return np.abs((new_position_size - old_position_size)  / old_position_size) > self.significant_position_difference
        
    def GetPositionSize(self, security):
        try:
            averageTrueRange = self.assetATR[security].Current.Value
            if not self.use_average_true_range: #average_true_range
                average_true_range = 1 #divide by 1 gives... same initial number
                self.average_true_rage_multipl_factor = 1
            return (self.Portfolio.TotalPortfolioValue * self.risk_factor)  / (average_true_range * self.average_true_rage_multipl_factor) 
        except:
            # return 0
            atr = self.ATR(security, self.atr_window, MovingAverageType.Simple, Resolution.Daily)
            history = self.History(security, self.atr_window, Resolution.Daily)
            for tuple in history.loc[security].itertuples():
                bar = TradeBar(tuple.Index, security, tuple.open, tuple.high, tuple.low, tuple.close, tuple.volume)
                atr.Update(bar)
            self.assetATR[security] = atr
            try:
                averageTrueRange = self.assetATR[security].Current.Value
                if not self.use_average_true_range: #average_true_range
                    average_true_range = 1 #divide by 1 gives... same initial number
                    self.average_true_rage_multipl_factor = 1
                return (self.Portfolio.TotalPortfolioValue * self.risk_factor)  / (average_true_range * self.average_true_rage_multipl_factor) 
            except:
                return 0

    def Rebalance(self):
        history = self.History(self.assets, self.talib_window, Resolution.Daily)
        highs = self.ConvertToQuantopianHistory("high", history)
        lows = self.ConvertToQuantopianHistory("low", history)
        closes = self.ConvertToQuantopianHistory("close", history)
    
        estimated_cash_balance = self.Portfolio.Cash
        slopes = np.log(closes[self.assets].tail(self.momentum_window_length)).apply(self._slope)
        # self.Debug(slopes.order(ascending=False).head(10))
        slopes = slopes[slopes > self.min_momentum]
        ranking_table = slopes[slopes > slopes.quantile(1 - self.rank_table_percentile)]
        # close positions that are no longer in the top of the ranking table
        positions = [x.Key for x in self.Portfolio if x.Value.Invested]
        for security in positions:
            price = self.Securities[security].Price
            position_size = self.Portfolio[security].Quantity
            if security not in ranking_table.index: # and data.can_trade:
                self.Liquidate(security)
                #order_target(security, 0, style=LimitOrder(price))
                estimated_cash_balance += price * position_size
            else: #if data.can_trade(security):
                new_position_size = self.GetPositionSize(security)
                if self.SignificantChangeInPositionSize(new_position_size, position_size):
                    estimated_cost = price * (new_position_size * self.leverage_factor - position_size)
                    if new_position_size * self.leverage_factor == 0:
                        self.Liquidate(security)
                    else:
                        self.MarketOrder(security, int(new_position_size * self.leverage_factor))
                        # self.SetHoldings(security, new_position_size * self.leverage_factor)
                    
                    #order_target(security, new_position_size * context.leverage_factor, style=LimitOrder(price))
                    estimated_cash_balance -= estimated_cost
    
        # Market history is not used with the trend filter disabled
        # Removed for efficiency
        if self.use_market_trend_filter:
            market_history = self.History(self.market, self.market_window, Resolution.Daily)
            market_history = self.ConvertToQuantopianHistory("close", market_history, True)
            current_market_price = self.Securities[self.market].Price
            average_market_price = market_history.mean()
        else:
            average_market_price = 0
    
        if (current_market_price > average_market_price[0]) :  #if average is 0 then jump in
            for security in ranking_table.index:
                if security not in [x.Key for x in self.Portfolio if x.Value.Invested]: #and data.can_trade(security):
                    new_position_size = self.GetPositionSize(security)
                    estimated_cost = self.Securities[security].Price * new_position_size * self.leverage_factor
                    if estimated_cash_balance > estimated_cost:
                        posSize = new_position_size * self.leverage_factor
                        if posSize == 0:
                            self.Liquidate(security)
                        else:
                            self.MarketOrder(security, int(posSize))
                            # self.SetHoldings(security, posSize)
                        #order_target(security, new_position_size * context.leverage_factor, style=LimitOrder(data.current(security, "price")))
                        estimated_cash_balance -= estimated_cost
    
    def CancelOpenOrders(self):
        openOrders = self.Transactions.GetOpenOrders()
        if len(openOrders)> 0:
            for x in openOrders:
                 self.Transactions.CancelOrder(x.Id)
                 
        #record(exposure=context.account.leverage)

    def OnData(self, data):
        pass
    
    def _slope(self, ts):
        x = np.arange(len(ts))
        slope, intercept, r_value, p_value, std_err = stats.linregress(x, ts)
        annualized_slope = (1 + slope)**250
        return annualized_slope * (r_value ** 2)
        
    def OnSecuritiesChanged(self, changes):
        for security in changes.RemovedSecurities:
            symbol = security.Symbol
            self.Liquidate(symbol)
            if symbol in self.assets:
                self.assets.remove(symbol)
            if str(symbol) in self.assetATR:
                self.assetATR.pop(str(symbol), None)
        
        for security in changes.AddedSecurities:
            symbol = security.Symbol
            if str(symbol) not in self.assetATR:
                atr = self.ATR(symbol, self.atr_window, MovingAverageType.Simple, Resolution.Daily)
                history = self.History(symbol, self.atr_window, Resolution.Daily)
                for tuple in history.loc[symbol].itertuples():
                    bar = TradeBar(tuple.Index, symbol, tuple.open, tuple.high, tuple.low, tuple.close, tuple.volume)
                    atr.Update(bar)
                self.assetATR[str(symbol)] = atr
            
            if symbol == self.market: continue
            if symbol not in self.assets:
                self.assets.append(symbol)
        
    def Coarse(self, coarse):
        if self.lastMonth == self.Time.month:
            return Universe.Unchanged
        self.lastMonth = self.Time.month
        
        # if self.pickedUniverse == True:
        #     return Universe.Unchanged
        filteredCoarse = [x.Symbol for x in coarse if x.HasFundamentalData and x.DollarVolume > 0 and x.Price > 0]
        return filteredCoarse
    
    def Fine(self, fine):
        marketCapFilter = [x for x in fine]
        marketCapFilter = sorted(marketCapFilter, key = lambda x: x.MarketCap, reverse = True)[:self.market_cap_limit]
        fineFiltered = [x for x in marketCapFilter if x.SecurityReference.IsPrimaryShare == True
                                                        and x.SecurityReference.IsDepositaryReceipt == False]
                                                        
        fineFilteredSymbols = [x.Symbol for x in fineFiltered]
        final = []
        
        if self.use_stock_trend_filter == True:
        
            history = self.History(fineFilteredSymbols, self.sma_window_length, Resolution.Daily)
            sma = {}
            for symbol in fineFillteredSymbols:
                curSma = SimpleMovingAverage(self.sma_window_length)
                for tuple in history.loc[symbol].itertuples():
                    curSma.Update(tuple.Index, tuple.close)
                sma[symbol] = curSma.Current.Value
            smaFiltered = [x.Symbol for x in fineFiltered if x.Price > sma[x.Symbol]]
            final = smaFiltered
        else:
            final = fineFilteredSymbols
        
        self.pickedUniverse = True
        return final