Overall Statistics
Total Trades
94
Average Win
0.02%
Average Loss
-0.02%
Compounding Annual Return
0.166%
Drawdown
0.200%
Expectancy
0.015
Net Profit
0.014%
Sharpe Ratio
0.394
Probabilistic Sharpe Ratio
43.229%
Loss Rate
53%
Win Rate
47%
Profit-Loss Ratio
1.17
Alpha
-0.01
Beta
0.046
Annual Standard Deviation
0.008
Annual Variance
0
Information Ratio
-2.629
Tracking Error
0.109
Treynor Ratio
0.067
Total Fees
$94.00
Estimated Strategy Capacity
$10000000.00
from SymbolData import SymbolData

class PSARStrategy(QCAlgorithm):

    def Initialize(self):
        self.SetStartDate(2020, 11, 16)  # Set Start Date
        self.SetEndDate(2020, 12, 16) # Set End Date
        self.SetCash(100000)  # Set Strategy Cash
        
        # Handles setting benchmark symbol for statistics
        # The benchmark is NOT traded
        self.benchmark = "SPY"
        self.AddEquity(self.benchmark, Resolution.Minute)
        self.SetBenchmark(self.benchmark)
        
        # Trading from 9:30 AM to 11:30 AM
        self.trading_start_time = (9, 30)
        self.trading_end_time = (11, 30)
        
        # Creates a dummy 5 minute consolidator, we can use to calculate signals
        # on a regular interval
        self.five_minute_consolidator = QuoteBarConsolidator(timedelta(minutes=5))
        self.SubscriptionManager.AddConsolidator(self.benchmark, self.five_minute_consolidator)
        self.five_minute_consolidator.DataConsolidated += self.EveryFiveMinutes
        
        # our chosen tickers
        tickers = ["JNJ", "AAPL", "WFC"]
        
        # data subscriptions
        for ticker in tickers:
            symbol = self.AddEquity(ticker, Resolution.Minute).Symbol
            
        # dictionary to hold symbol_data objects for each symbol
        # in our universe
        self.symbols = {}
        
        # EOD scheduled event to liquidate holdings
        self.Schedule.On(self.DateRules.EveryDay(self.benchmark), self.TimeRules.BeforeMarketClose(self.benchmark, 1), self.EndOfDayLiquidate)
    
    
    def EveryFiveMinutes(self, sender, bar):
        '''Fires every 5 minutes
        
        Handles portfolio management logic
        
        1. Loops through universe and finds securities with valid entry signals and no existing position
        2. Liquidates all invested securities which meet exit signal
        3. Distributes portfolio equally across all existing positions'''
        
        # If not trading hours, do nothing
        if not self.DesignatedTradingHours:
            return
        
        # list to hold new entry symbols
        selected_entry_symbols = []
    
        # loop through universe and find uninvested symbols which meet entry signal
        # does not submit orders, just collects them into a list
        for symbol, symbol_data in self.symbols.items():
            
            if not symbol_data.IsReady:
                continue
            
            # self.Debug(f"{symbol} Ready")
            
            valid_entry_signal = self.CalculateEntrySignal(symbol_data)
            
            if not self.Portfolio[symbol].Invested and valid_entry_signal:
                selected_entry_symbols.append(symbol)
      
        ### EXIT LOGIC
        # Loops through universe
        # Looks for invested symbols which meet exit criteria
        for symbol, symbol_data in self.symbols.items():
            
            if not self.Portfolio[symbol].Invested or not symbol_data.IsReady:
                continue
            
            exit_signal = self.CalculateExitSignal(symbol_data)
            
            if exit_signal:
                self.Liquidate(symbol)
                self.Debug(f"Liquidating {symbol}....")
        
        
        # enter new positions
        for symbol in selected_entry_symbols:
            quantity = 3000 // self.Securities[symbol].Price
            self.MarketOrder(symbol, quantity)
            
        
            
    def OnSecuritiesChanged(self, changes):
        '''Fires each time a security is added to our universe
        
        handles creation of symbol data containers'''
        for security in changes.AddedSecurities:
            symbol = security.Symbol
            if symbol not in self.symbols and symbol != self.benchmark:
                self.symbols[symbol] = SymbolData(self, symbol)
                
            
        for security in changes.RemovedSecurities:
            symbol = security.Symbol
            if symbol in self.symbols:
                symbol_data = self.symbols.pop(symbol, None)
                symbol_data.KillConsolidators()
                
                
    def CalculateEntrySignal(self, symbol_data):
        '''Is fired from within SymbolData when each respective
        symbol is ready
        
        Computes total signal'''
        
        ## ENTRY LOGIC
        signal_one = self.SignalOne(symbol_data)
        
        signal_two = self.SignalTwo(symbol_data)
        
        signal_three = self.SignalThree(symbol_data)
        
        signal_four = self.SignalFour(symbol_data)
        
        # Signal One MUST be met and one of Signals 2,3,4 must be met
        valid_entry_signal = signal_one and (signal_two or signal_three or signal_four)
        
        # For logging purposes
        if valid_entry_signal and not self.Portfolio[symbol_data.symbol].Invested:
            self.Debug(f"{symbol_data.symbol} valid entry...signal_one:{signal_one}, signal_two:{signal_two}, signal_three:{signal_three} " \
                    + f", signal_four: {signal_four}")
        
        return valid_entry_signal
        
        
        
    def SignalOne(self, symbol_data):
        '''All 3 currently trending up (slow < moderate < fast)'''
        psar_slow = symbol_data.psar_slow.Current.Value
        psar_moderate = symbol_data.psar_moderate.Current.Value
        psar_fast = symbol_data.psar_fast.Current.Value
        
        return psar_slow < psar_moderate and psar_moderate < psar_fast
        
    
    def SignalTwo(self, symbol_data):
        '''One of the following: 
            a. All 3 indicators equal on the previous bar (slow == moderate == fast)
            b. All 3 indicators within .02 of each other on the previous bar (slow + 0.02/0.01 == moderate/fast)
            c. Slow from 2 bars ago is > open'''
            
            
        # Condition A
        # All 3 indicators equal on the previous bar (slow == moderate == fast)
        previous_psar_slow = symbol_data.psar_slow_window[1].Value
        previous_psar_moderate = symbol_data.psar_moderate_window[1].Value
        previous_psar_fast = symbol_data.psar_fast_window[1].Value
        
        condition_a = previous_psar_slow == previous_psar_moderate and \
            previous_psar_slow == previous_psar_fast
                
        
        # Condition B
        # All 3 indicators within .02 of each other on the previous bar (slow + 0.02/0.01 == moderate/fast)
        max_distance = 0.02
        
        condition_b = abs(previous_psar_slow - previous_psar_moderate) < max_distance and \
                    abs(previous_psar_slow - previous_psar_fast) < max_distance and \
                    abs(previous_psar_fast - previous_psar_moderate) < max_distance
                    
        
        
        # Condition C
        # Slow from 2 bars ago is > open
        two_bars_ago_psar_slow = symbol_data.psar_slow_window[2].Value
        
        current_bar_open = symbol_data.bar_window[0].Open
        
        condition_c = two_bars_ago_psar_slow > current_bar_open
        
        return condition_a or condition_b or condition_c


    def SignalThree(self, symbol_data):
        '''Slow/moderate/fast < open'''
        
        psar_slow = symbol_data.psar_slow.Current.Value
        psar_moderate = symbol_data.psar_moderate.Current.Value
        psar_fast = symbol_data.psar_fast.Current.Value
        
        current_bar_open = symbol_data.bar_window[0].Open
        
        return psar_slow < current_bar_open and \
                psar_moderate < current_bar_open and \
                psar_fast < current_bar_open
    
    def SignalFour(self, symbol_data):
        '''Up to 2 of the previous bars can be equal or within 0.02 of each other.
        Otherwise not valid (example: equal | equal |trending  == valid. Equal | equal |
        equal | trending == false)'''
        
        max_distance = 0.02
        
        current_psar_slow = symbol_data.psar_slow_window[0].Value
        current_psar_moderate = symbol_data.psar_moderate_window[0].Value
        current_psar_fast = symbol_data.psar_fast_window[0].Value
        
        current_bar = abs(current_psar_slow - current_psar_moderate) < max_distance and \
                    abs(current_psar_slow - current_psar_fast) < max_distance and \
                    abs(current_psar_fast - current_psar_moderate) < max_distance
        
        
        previous_psar_slow = symbol_data.psar_slow_window[1].Value
        previous_psar_moderate = symbol_data.psar_moderate_window[1].Value
        previous_psar_fast = symbol_data.psar_fast_window[1].Value
        
        previous_bar = abs(previous_psar_slow - previous_psar_moderate) < max_distance and \
                    abs(previous_psar_slow - previous_psar_fast) < max_distance and \
                    abs(previous_psar_fast - previous_psar_moderate) < max_distance
        
        
        two_bars_ago_psar_slow = symbol_data.psar_slow_window[2].Value
        two_bars_ago_psar_moderate = symbol_data.psar_moderate_window[2].Value
        two_bars_ago_psar_fast = symbol_data.psar_fast_window[2].Value
        
        two_bars_ago = abs(two_bars_ago_psar_slow - two_bars_ago_psar_moderate) < max_distance and \
                    abs(two_bars_ago_psar_slow - two_bars_ago_psar_fast) < max_distance and \
                    abs(two_bars_ago_psar_fast - two_bars_ago_psar_moderate) < max_distance
                    
        not_all_met = not (two_bars_ago and previous_bar and current_bar)
        
        two_met = (two_bars_ago and previous_bar) or (current_bar and previous_bar) or (current_bar and two_bars_ago)
    
        return not_all_met and two_met
        
    def CalculateExitSignal(self, symbol_data):
        '''Calculates the exit signal
        
        Slow is > current bar open
        '''
        current_psar_slow = symbol_data.psar_slow.Current.Value
        current_bar_open = symbol_data.bar_window[0].Open
        
        return current_psar_slow > current_bar_open
    
    @property
    def DesignatedTradingHours(self):
        '''Determines whether we are within the 
        allowed trading time interval'''
        start_hour = self.trading_start_time[0]
        start_minute = self.trading_start_time[1]
        
        end_hour = self.trading_end_time[0]
        end_minute = self.trading_end_time[1]
        
        current_hour = self.Time.hour
        current_minute = self.Time.minute
        
        if current_hour < start_hour or current_hour > end_hour:
            return False
            
        if current_hour == start_hour:
            return current_minute >= start_minute
        
        if current_hour == end_hour:
            return current_minute <= end_minute
            
        return True
   
    
    def EndOfDayLiquidate(self):
        '''Called at end of trading day'''
        if self.Portfolio.Invested:
            invested_tickers = [symbol.Value for symbol in self.Portfolio.Keys if self.Portfolio[symbol].Invested]
            self.Debug(f"Invested in... {invested_tickers}...EOD Liquidate")
            self.Liquidate()
class SymbolData:
    
    def __init__(self, algorithm, symbol):
        
        self.algorithm = algorithm
        self.symbol = symbol
        
        self.consolidator = QuoteBarConsolidator(timedelta(minutes=5))
        self.algorithm.SubscriptionManager.AddConsolidator(self.symbol, self.consolidator)
        
        self.consolidator.DataConsolidated += self.OnFiveMinuteBar
        
        self.psar_slow = ParabolicStopAndReverse(0.01, 0.01, 0.20)
        self.psar_moderate = ParabolicStopAndReverse(0.01, 0.02, 0.20)
        self.psar_fast = ParabolicStopAndReverse(0.01, 0.03, 0.20)
    
        self.algorithm.RegisterIndicator(self.symbol, self.psar_slow, self.consolidator)
        self.algorithm.RegisterIndicator(self.symbol, self.psar_moderate, self.consolidator)
        self.algorithm.RegisterIndicator(self.symbol, self.psar_fast, self.consolidator)
        
        self.psar_slow.Updated += self.OnPSARSlow
        self.psar_moderate.Updated += self.OnPSARModerate
        self.psar_fast.Updated += self.OnPSARFast
        
        
        self.psar_slow_window = RollingWindow[IndicatorDataPoint](3)
        self.psar_moderate_window = RollingWindow[IndicatorDataPoint](3)
        self.psar_fast_window = RollingWindow[IndicatorDataPoint](3)
        
        
        self.bar_window = RollingWindow[QuoteBar](5)
    
    
    def OnFiveMinuteBar(self, sender, bar):
        '''Fires each time there is a new five minute bar
        Stores five minute bars as they arrive'''
        self.bar_window.Add(bar)
        
    def OnPSARSlow(self, sender, updated):
        '''Fires each time psar_slow is updated'''
        if self.psar_slow.IsReady:
            self.psar_slow_window.Add(self.psar_slow.Current)
    
    def OnPSARModerate(self, sender, updated):
        '''Fires each time psar_moderate is updated'''
        if self.psar_moderate.IsReady:
            self.psar_moderate_window.Add(self.psar_moderate.Current)
            
    def OnPSARFast(self, sender, updated):
        '''Fires each time psar_fast is updated'''
        if self.psar_fast.IsReady:
            self.psar_fast_window.Add(self.psar_fast.Current)
    
    @property
    def IsReady(self):
        '''Checks whether all data is ready to be used to calculate signals'''
        return self.psar_slow_window.IsReady and self.psar_fast_window.IsReady and \
            self.psar_moderate_window.IsReady and self.bar_window.IsReady
                
                
    def KillConsolidators(self):
        '''Removes data subscriptions'''
        self.algorithm.SubscriptionManager.RemoveConsolidator(self.symbol, self.consolidator)