Overall Statistics
Total Trades
76
Average Win
0.53%
Average Loss
-0.40%
Compounding Annual Return
0.758%
Drawdown
2.100%
Expectancy
0.104
Net Profit
1.521%
Sharpe Ratio
0.241
Probabilistic Sharpe Ratio
9.724%
Loss Rate
53%
Win Rate
47%
Profit-Loss Ratio
1.33
Alpha
0.004
Beta
0.013
Annual Standard Deviation
0.023
Annual Variance
0.001
Information Ratio
-0.672
Tracking Error
0.125
Treynor Ratio
0.416
Total Fees
$1978.81
Estimated Strategy Capacity
$5800000.00
Lowest Capacity Asset
V U12VRGLO8PR9
# region imports
from AlgorithmImports import *

from QuantConnect.Data.UniverseSelection import *
from Selection.FundamentalUniverseSelectionModel import FundamentalUniverseSelectionModel

import itertools

import statsmodels.api as sm
from statsmodels.tsa.stattools import coint, adfuller
# endregion

class DeterminedTanKitten(QCAlgorithm):

    def Initialize(self):
        self.SetStartDate(2018, 1, 1)  # Set Start Date
        self.SetEndDate(2020, 1, 1)  # Set Start Date
        self.SetCash(1000000)  # Set Strategy Cash

        self.UniverseSettings.Resolution = Resolution.Hour
        self.AddUniverseSelection(FSTopMarketCapUniverseSelectionModel(sector = MorningstarSectorCode.FinancialServices,
                                                                    number = 3,
                                                                    universe_settings = self.UniverseSettings))

        self.lookback = 20
        self.entry_th = 2
        self.exit_th = 0
    
        self.last_p = None
        self.last_q = None

        self.securityTracker = set()

    def OnSecuritiesChanged(self, changes: SecurityChanges) -> None:
        for security in changes.AddedSecurities:
            self.securityTracker.add(security.Symbol)
            self.Debug(f"Added {security.Symbol}")

        for security in changes.RemovedSecurities:
            self.securityTracker.remove(security.Symbol)
            self.Debug(f"Removed {security.Symbol}")
        
        # compute stats
        self.mx, self.mn = self.compute_stats(verbose=True)

## Assignment 5.2.2 - 5.2.5 
# Assignment 5.2.2    
    def compute_stats(self, verbose=False):
        pairs = [p for p in itertools.permutations(self.securityTracker, 2)]
        values = {}
        mn = 1e6
        mx = -1e6
        stats_mx = None
        stats_mn = None
        for p in pairs:
            p1, p2= p[0], p[1]
            [zscore, slope, adf] = self.stats([p1, p2])
            values[f'{p1}-{p2}'] = {'p-value':adf['p-value'],'Test Statistic':adf['Test Statistic']}
            if verbose:
                self.Log('ADF Test for pairs: {} and {}'.format(p1,p2))
                self.Log('\tTest Statistic = {}'.format(values[f'{p1}-{p2}']['Test Statistic']))
                self.Log('\tP-value = {}'.format(values[f'{p1}-{p2}']['p-value']))

            if values[f'{p1}-{p2}']['Test Statistic'] < mn and values[f'{p1}-{p2}']['p-value'] < 0.05:
                stats_mn = [p1,p2, zscore, slope, adf]
                mn = values[f'{p1}-{p2}']['Test Statistic']
            
            if values[f'{p1}-{p2}']['Test Statistic'] > mx and values[f'{p1}-{p2}']['p-value'] < 0.05:
                stats_mx = [p1, p2, zscore, slope, adf]
                mx = values[f'{p1}-{p2}']['Test Statistic']
        return stats_mx, stats_mn
    
    def stats(self, symbols):
        #symbols is a pair of QC Symbol Object
        self.df = self.History(symbols, self.lookback, Resolution.Daily)
        self.dg = self.df["open"].unstack(level=0)
        ticker1= str(symbols[0])
        ticker2= str(symbols[1])
        Y = self.dg[ticker1].apply(lambda x: math.log(x))
        X = self.dg[ticker2].apply(lambda x: math.log(x))
        X = sm.add_constant(X)
        model = sm.OLS(Y,X)
        results = model.fit()
        #standard deviation of the residual
        sigma = np.sqrt(results.mse_resid) 
        slope = results.params[1]
        intercept = results.params[0]
        #regression residual has mean =0 by definition
        res = results.resid 
        zscore = res/sigma
        adf = adfuller (res)
        adf = pd.Series(adf[0:4], index=['Test Statistic','p-value','#Lags Used','Number of Observations Used'])
        return [zscore.values[-1], slope, adf]

    def get_quantity_target(self, p, q, zscore, slope):
        quantity_p = (1/(1+slope))*self.Portfolio.GetBuyingPower(p)/self.Portfolio[p].Price
        quantity_q = (slope/(1+slope))*self.Portfolio.GetBuyingPower(q)/self.Portfolio[q].Price
        return quantity_p, quantity_q

# Assignment 5.2.3 - 5.2.4 - 5.2.5
    # 5.2.3 Case
    # def OnData(self, slice):
    #     if self.Time.hour != 12:
    #         return
    #     if len(self.securityTracker)<3:
    #         return
    #     self.mx, self.mn = self.compute_stats()
    #     if self.mn is None: # In case of no statistical significance reached
    #         self.Liquidate()
    #         return
    #     p, q, zscore, slope = self.mn[0], self.mn[1], self.mn[2], self.mn[3]
    #     if (self.last_p is not None and self.last_q is not None):
    #         if p != self.last_p or q != self.last_q: # if tha better combination changes
    #             self.Liquidate()
    #     if not self.Portfolio.Invested:
    #         if zscore > self.entry_th:
    #             self.spread = 'long'
    #             self.last_p = p
    #             self.last_q = q
    #             quantity_p, quantity_q = self.get_quantity_target(p, q, zscore, slope)
    #             self.MarketOrder(p, -quantity_p)
    #             self.MarketOrder(q, quantity_q)
    #         elif zscore < -1*self.entry_th:
    #             self.spread = 'short'
    #             self.last_p = p
    #             self.last_q = q
    #             quantity_p, quantity_q = self.get_quantity_target(p, q, zscore, slope)
    #             self.MarketOrder(p, quantity_p)
    #             self.MarketOrder(q, -quantity_q)
    #     else:
    #         if zscore > -1*self.exit_th and self.spread == 'long':
    #             self.Liquidate()
    #         elif zscore < -1*self.exit_th and self.spread == 'short':
    #             self.Liquidate()

    # # 5.2.4 Case
    def OnData(self, slice):
        if self.Time.hour != 12:
            return
        if len(self.securityTracker)<3:
            return
        self.mx, self.mn = self.compute_stats()
        if self.mx is None: # In case of no statistical significance reached
            self.Liquidate()
            return
        p, q, zscore, slope = self.mx[0], self.mx[1], self.mx[2], self.mx[3]
        if (self.last_p is not None and self.last_q is not None):
            if p != self.last_p or q != self.last_q: # if tha better combination changes
                self.Liquidate()
        if not self.Portfolio.Invested:
            if zscore > self.entry_th:
                self.spread = 'long'
                self.last_p = p
                self.last_q = q
                quantity_p, quantity_q = self.get_quantity_target(p, q, zscore, slope)
                self.MarketOrder(p, -quantity_p)
                self.MarketOrder(q, quantity_q)
            elif zscore < -1*self.entry_th:
                self.spread = 'short'
                self.last_p = p
                self.last_q = q
                quantity_p, quantity_q = self.get_quantity_target(p, q, zscore, slope)
                self.MarketOrder(p, quantity_p)
                self.MarketOrder(q, -quantity_q)
        else:
            if zscore > -1*self.exit_th and self.spread == 'long':
                self.Liquidate()
            elif zscore < -1*self.exit_th and self.spread == 'short':
                self.Liquidate()

    # # 5.2.5 Case
    # def OnData(self, slice):
    #     if self.Time.hour != 12:
    #         return
    #     if len(self.securityTracker)<3:
    #         return
    #     self.mx, self.mn = self.compute_stats()
    #     if self.mn is None: # In case of no statistical significance reached
    #         self.Liquidate()
    #         return
    #     p, q, zscore, slope = self.mn[0], self.mn[1], self.mn[2], self.mn[3]
    #     if (self.last_p is not None and self.last_q is not None):
    #         if p != self.last_p or q != self.last_q: # if tha better combination changes
    #             self.Liquidate()
    #     if not self.Portfolio.Invested:
    #         if zscore > self.entry_th:
    #             self.spread = 'long'
    #             self.last_p = p
    #             self.last_q = q
    #             quantity_p, quantity_q = self.get_quantity_target(p, q, zscore, slope)
    #             self.MarketOrder(p, -quantity_p)
    #             self.MarketOrder(q, quantity_q)
    #         elif zscore < -1*self.entry_th:
    #             self.spread = 'short'
    #             self.last_p = p
    #             self.last_q = q
    #             quantity_p, quantity_q = self.get_quantity_target(p, q, zscore, slope)
    #             self.MarketOrder(p, quantity_p)
    #             self.MarketOrder(q, -quantity_q)
    #     else:
    #         if abs(zscore) > 3:
    #             self.Liquidate()
    #         if zscore > -1*self.exit_th and self.spread == 'long':
    #             self.Liquidate()
    #         elif zscore < -1*self.exit_th and self.spread == 'short':
    #             self.Liquidate()

## Assignment 5.2.1
class FSTopMarketCapUniverseSelectionModel(FineFundamentalUniverseSelectionModel):
    
    def __init__(self, sector: MorningstarSectorCode, number: int,
                universe_settings: UniverseSettings = None) -> None:
        super().__init__(self.SelectCoarse, self.SelectFine, universe_settings)
        self.sector = sector
        self.number = number

    def SelectCoarse(self, coarse: List[CoarseFundamental]) -> List[Symbol]:
        #1. Filt to securities with fundamental data
        return [c.Symbol for c in coarse if c.HasFundamentalData]
    
    def SelectFine(self, fine: List[FineFundamental]) -> List[Symbol]:
        #2. Select financial sector
        filtered_fine = [x for x in fine if x.AssetClassification.MorningstarSectorCode == self.sector]
        #3. Order by market cap descending. 
        sorted_by_mkcap = sorted(filtered_fine, key=lambda x: x.MarketCap, reverse=True)
        #4. From different companies
        companyId = {}
        for c in sorted_by_mkcap:
            identifier = c.CompanyReference.CompanyId
            if not(companyId.get(identifier)):
                companyId[identifier] = c.Symbol 

            if len(companyId)>=self.number:
                return list(companyId.values())
        return list(companyId.values()) # Return Top "number" assets by highest Market Cap in fianancial sector