Overall Statistics
Total Trades
1
Average Win
0%
Average Loss
0%
Compounding Annual Return
76.769%
Drawdown
31.500%
Expectancy
0
Net Profit
58.711%
Sharpe Ratio
1.703
Probabilistic Sharpe Ratio
60.393%
Loss Rate
0%
Win Rate
0%
Profit-Loss Ratio
0
Alpha
0.934
Beta
-0.219
Annual Standard Deviation
0.449
Annual Variance
0.201
Information Ratio
-0.02
Tracking Error
0.7
Treynor Ratio
-3.491
Total Fees
$1.00
import time
from functools import wraps

def time_method(func):
    @wraps(func)
    def timed(*args, **kw):
        time_thresh = 1 # Function time taken printed if greater than this number
        
        ts = time.time()
        result = func(*args, **kw)
        te = time.time()
        
        if te - ts > time_thresh:
            algo = args[0]
            algo.Debug("%r took %2.2f seconds to run." % (func.__name__, te - ts))

        return result

    return timed
import pandas as pd
import numpy as np
from sadf import get_sadf
from datetime import datetime, timedelta
from collections import deque


class TransdimensionalCalibratedChamber(QCAlgorithm):

    def Initialize(self):

        # parameters for one stock
        self.symbol = Symbol.Create("AAPL", SecurityType.Equity, Market.USA)
        self.cash_invested = 10000
        self.confidence_threshold = 1.1
        
        # date, equity, brokerage and bencmark
        self.SetStartDate(2020, 1, 1)
        # self.SetEndDate(2020, 10, 10)
        self.SetCash(self.cash_invested)
        self.SetBrokerageModel(BrokerageName.InteractiveBrokersBrokerage, AccountType.Margin)
        self.SetBenchmark(self.symbol)
        
        # universe; use for fundamental data
        self.SetUniverseSelection(FineFundamentalUniverseSelectionModel(self.CoarseSelectionFunction, self.FineSelectionFunction, None, None))   
        self.UniverseSettings.Resolution = Resolution.Daily
        
        # old way
        # self.AddEquity(self.symbol, Resolution.Hour, fillDataForward=True).SetDataNormalizationMode(DataNormalizationMode.Adjusted)
        
        # warmp up period
        self.lookback = 150
        self.SetWarmUp(self.lookback)

        # # frequency between minute and 30 min
        # thirtyMinuteConsolidator = TradeBarConsolidator(timedelta(minutes=30))
        # self.SubscriptionManager.AddConsolidator("SPY", thirtyMinuteConsolidator)
        # self.RegisterIndicator("SPY", self.sadf, thirtyMinuteConsolidator)

    def OnData(self, data):
        '''OnData event is the primary entry point for your algorithm. Each new data point will be pumped in here.
            Arguments:
                data: Slice object keyed by symbol containing the stock data
        '''

        # if there are no bars data (only stok splits, dividends etc) than cont
        if self.symbol not in data.Bars:
            return

        # continue if warm up has finished
        if self.IsWarmingUp:
            return

        # Alpha
        if not self.Portfolio[self.symbol].Invested and self.sadf.Value < self.confidence_threshold:
            # buy max quantity by buying power sted mannually
            size = int(self.cash_invested / data[self.symbol].Close)
            buy_order = self.MarketOrder(self.symbol, size)

        elif self.Portfolio[self.symbol].IsLong and self.sadf.Value > self.confidence_threshold:
             # = self.close.pct_change(periods=1)
            if self.sadf.CurrentReturn < 0 and self.sadf.PreviousReturn < 0:
                # buy max quantity by buying power
                self.cash_invested = self.cash_invested + self.Portfolio[self.symbol].UnrealizedProfit
                self.Liquidate(self.symbol)

    def CoarseSelectionFunction(self, coarse):
        return [self.symbol]
    
    def FineSelectionFunction(self, fine):
        return [self.symbol]

    def OnSecuritiesChanged(self, changes):
        for security in changes.AddedSecurities:
            self.sadf = SadfIndicator('sadf', self.lookback, self, security.Symbol)
            self.RegisterIndicator(self.symbol, self.sadf, Resolution.Daily)
            self.Log(self.sadf.ValuePe)


class SadfIndicator(PythonIndicator):
    def __init__(self, name, period, algorithm, symbol):
        self.Name = name
        self.period = period
        self.Time = datetime.min
        self.Value = 0
        self.ValuePe = 0
        self.queue = deque(maxlen=period)
        self.queueTime = deque(maxlen=period)
        self.queuePe = deque(maxlen=period)
        self.CurrentReturn = 0
        self.algorithm = algorithm
        self.security = algorithm.Securities[symbol]

    def sadf_last(self, value):
        sadf_linear = get_sadf(
            value,
            min_length=50,
            add_const=True,
            model='linear',
            # phi=0.5,
            lags=4)
        if len(sadf_linear) > 0:
            last_value = sadf_linear.values[-1].item()
        else:
            last_value = 0
        return last_value
        
    def Update(self, input):
        self.algorithm.Log(f'Input type: {type(input)}')
        
        pe_ratio = self.security.Fundamentals.ValuationRatios.NormalizedPERatio
        self.algorithm.Plot('Normalized PE', 'Ratio', pe_ratio)
        
        self.queue.appendleft(input.Price)
        self.queueTime.appendleft(input.EndTime)
        self.queuePe.appendleft(pe_ratio)
        self.Time = input.EndTime
        if len(self.queue) > self.period:
            close_ = pd.Series(self.queue, index=self.queueTime).rename('close').sort_index()
            pe_ = pd.Series(self.queuePe, index=self.queueTime).rename('pe').sort_index()
            self.CurrentReturn = close_.pct_change(periods=1)[-1]
            self.PreviousReturn = close_.pct_change(periods=1)[-2]
            self.Value = self.sadf_last(close=close_)
            self.ValuePe = self.sadf_last(close=close_)
        count = len(self.queue)
        # self.IsReady = count == self.queue.maxlen
        return count == self.queue.maxlen

## AKO CU TREBATI KASNIJE HISTORYI DATA    
# df = self.History(self.tickers, 10, Resolution.Daily).close
# prices = pd.concat([df.loc[x] for x in self.tickers], axis = 1)
# prices.columns = self.tickers
# Copyright 2019, Hudson and Thames Quantitative Research
# All rights reserved
# Read more: https://github.com/hudson-and-thames/mlfinlab/blob/master/LICENSE.txt

"""
Explosiveness tests: SADF
"""

from typing import Union, Tuple
import pandas as pd
import numpy as np


# pylint: disable=invalid-name

def _get_sadf_at_t(X: pd.DataFrame, y: pd.DataFrame, min_length: int, model: str, phi: float) -> float:
    """
    Advances in Financial Machine Learning, Snippet 17.2, page 258.

    SADF's Inner Loop (get SADF value at t)

    :param X: (pd.DataFrame) Lagged values, constants, trend coefficients
    :param y: (pd.DataFrame) Y values (either y or y.diff())
    :param min_length: (int) Minimum number of samples needed for estimation
    :param model: (str) Either 'linear', 'quadratic', 'sm_poly_1', 'sm_poly_2', 'sm_exp', 'sm_power'
    :param phi: (float) Coefficient to penalize large sample lengths when computing SMT, in [0, 1]
    :return: (float) SADF statistics for y.index[-1]
    """
    start_points, bsadf = range(0, y.shape[0] - min_length + 1), -np.inf
    for start in start_points:
        y_, X_ = y[start:], X[start:]
        b_mean_, b_std_ = get_betas(X_, y_)
        if not np.isnan(b_mean_[0]):
            b_mean_, b_std_ = b_mean_[0, 0], b_std_[0, 0] ** 0.5
            # TODO: Rewrite logic of this module to avoid division by zero
            with np.errstate(invalid='ignore'):
                all_adf = b_mean_ / b_std_
            if model[:2] == 'sm':
                all_adf = np.abs(all_adf) / (y.shape[0]**phi)
            if all_adf > bsadf:
                bsadf = all_adf
    return bsadf


def _get_y_x(series: pd.Series, model: str, lags: Union[int, list],
             add_const: bool) -> Tuple[pd.DataFrame, pd.DataFrame]:
    """
    Advances in Financial Machine Learning, Snippet 17.2, page 258-259.

    Preparing The Datasets

    :param series: (pd.Series) Series to prepare for test statistics generation (for example log prices)
    :param model: (str) Either 'linear', 'quadratic', 'sm_poly_1', 'sm_poly_2', 'sm_exp', 'sm_power'
    :param lags: (int or list) Either number of lags to use or array of specified lags
    :param add_const: (bool) Flag to add constant
    :return: (pd.DataFrame, pd.DataFrame) Prepared y and X for SADF generation
    """
    series = pd.DataFrame(series)
    series_diff = series.diff().dropna()
    x = _lag_df(series_diff, lags).dropna()
    x['y_lagged'] = series.shift(1).loc[x.index]  # add y_(t-1) column
    y = series_diff.loc[x.index]

    if add_const is True:
        x['const'] = 1

    if model == 'linear':
        x['trend'] = np.arange(x.shape[0])  # Add t to the model (0, 1, 2, 3, 4, 5, .... t)
        beta_column = 'y_lagged'  # Column which is used to estimate test beta statistics
    elif model == 'quadratic':
        x['trend'] = np.arange(x.shape[0]) # Add t to the model (0, 1, 2, 3, 4, 5, .... t)
        x['quad_trend'] = np.arange(x.shape[0]) ** 2 # Add t^2 to the model (0, 1, 4, 9, ....)
        beta_column = 'y_lagged'  # Column which is used to estimate test beta statistics
    elif model == 'sm_poly_1':
        y = series.loc[y.index]
        x = pd.DataFrame(index=y.index)
        x['const'] = 1
        x['trend'] = np.arange(x.shape[0])
        x['quad_trend'] = np.arange(x.shape[0]) ** 2
        beta_column = 'quad_trend'
    elif model == 'sm_poly_2':
        y = np.log(series.loc[y.index])
        x = pd.DataFrame(index=y.index)
        x['const'] = 1
        x['trend'] = np.arange(x.shape[0])
        x['quad_trend'] = np.arange(x.shape[0]) ** 2
        beta_column = 'quad_trend'
    elif model == 'sm_exp':
        y = np.log(series.loc[y.index])
        x = pd.DataFrame(index=y.index)
        x['const'] = 1
        x['trend'] = np.arange(x.shape[0])
        beta_column = 'trend'
    elif model == 'sm_power':
        y = np.log(series.loc[y.index])
        x = pd.DataFrame(index=y.index)
        x['const'] = 1
        # TODO: Rewrite logic of this module to avoid division by zero
        with np.errstate(divide='ignore'):
            x['log_trend'] = np.log(np.arange(x.shape[0]))
        beta_column = 'log_trend'
    else:
        raise ValueError('Unknown model')

    # Move y_lagged column to the front for further extraction
    columns = list(x.columns)
    columns.insert(0, columns.pop(columns.index(beta_column)))
    x = x[columns]
    return x, y


def _lag_df(df: pd.DataFrame, lags: Union[int, list]) -> pd.DataFrame:
    """
    Advances in Financial Machine Learning, Snipet 17.3, page 259.

    Apply Lags to DataFrame

    :param df: (int or list) Either number of lags to use or array of specified lags
    :param lags: (int or list) Lag(s) to use
    :return: (pd.DataFrame) Dataframe with lags
    """
    df_lagged = pd.DataFrame()
    if isinstance(lags, int):
        lags = range(1, lags + 1)
    else:
        lags = [int(lag) for lag in lags]

    for lag in lags:
        temp_df = df.shift(lag).copy(deep=True)
        temp_df.columns = [str(i) + '_' + str(lag) for i in temp_df.columns]
        df_lagged = df_lagged.join(temp_df, how='outer')
    return df_lagged


def get_betas(X: pd.DataFrame, y: pd.DataFrame) -> Tuple[np.array, np.array]:
    """
    Advances in Financial Machine Learning, Snippet 17.4, page 259.

    Fitting The ADF Specification (get beta estimate and estimate variance)

    :param X: (pd.DataFrame) Features(factors)
    :param y: (pd.DataFrame) Outcomes
    :return: (np.array, np.array) Betas and variances of estimates
    """
    xy = np.dot(X.T, y)
    xx = np.dot(X.T, X)

    try:
        xx_inv = np.linalg.inv(xx)
    except np.linalg.LinAlgError:
        return [np.nan], [[np.nan, np.nan]]

    b_mean = np.dot(xx_inv, xy)
    err = y - np.dot(X, b_mean)
    b_var = np.dot(err.T, err) / (X.shape[0] - X.shape[1]) * xx_inv

    return b_mean, b_var



def _sadf_outer_loop(X: pd.DataFrame, y: pd.DataFrame, min_length: int, model: str, phi: float,
                     molecule: list) -> pd.Series:
    """
    This function gets SADF for t times from molecule

    :param X: (pd.DataFrame) Features(factors)
    :param y: (pd.DataFrame) Outcomes
    :param min_length: (int) Minimum number of observations
    :param model: (str) Either 'linear', 'quadratic', 'sm_poly_1', 'sm_poly_2', 'sm_exp', 'sm_power'
    :param phi: (float) Coefficient to penalize large sample lengths when computing SMT, in [0, 1]
    :param molecule: (list) Indices to get SADF
    :return: (pd.Series) SADF statistics
    """
    sadf_series = pd.Series(index=molecule, dtype='float64')
    for index in molecule:
        X_subset = X.loc[:index].values
        y_subset = y.loc[:index].values.reshape(-1, 1)
        value = _get_sadf_at_t(X_subset, y_subset, min_length, model, phi)
        sadf_series[index] = value
    return sadf_series


def get_sadf(series: pd.Series, model: str, lags: Union[int, list], min_length: int, add_const: bool = False,
             phi: float = 0, num_threads: int = 8, verbose: bool = True) -> pd.Series:
    """
    Advances in Financial Machine Learning, p. 258-259.

    Multithread implementation of SADF

    SADF fits the ADF regression at each end point t with backwards expanding start points. For the estimation
    of SADF(t), the right side of the window is fixed at t. SADF recursively expands the beginning of the sample
    up to t - min_length, and returns the sup of this set.

    When doing with sub- or super-martingale test, the variance of beta of a weak long-run bubble may be smaller than
    one of a strong short-run bubble, hence biasing the method towards long-run bubbles. To correct for this bias,
    ADF statistic in samples with large lengths can be penalized with the coefficient phi in [0, 1] such that:

    ADF_penalized = ADF / (sample_length ^ phi)

    :param series: (pd.Series) Series for which SADF statistics are generated
    :param model: (str) Either 'linear', 'quadratic', 'sm_poly_1', 'sm_poly_2', 'sm_exp', 'sm_power'
    :param lags: (int or list) Either number of lags to use or array of specified lags
    :param min_length: (int) Minimum number of observations needed for estimation
    :param add_const: (bool) Flag to add constant
    :param phi: (float) Coefficient to penalize large sample lengths when computing SMT, in [0, 1]
    :param num_threads: (int) Number of cores to use
    :param verbose: (bool) Flag to report progress on asynch jobs
    :return: (pd.Series) SADF statistics
    """
    X, y = _get_y_x(series, model, lags, add_const)
    molecule = y.index[min_length:y.shape[0]]

    sadf_series = _sadf_outer_loop(X=X, y=y, min_length=min_length, model=model, phi=phi,
                     molecule=molecule)
    
    return sadf_series
# QUANTCONNECT.COM - Democratizing Finance, Empowering Individuals.
# Lean Algorithmic Trading Engine v2.0. Copyright 2014 QuantConnect Corporation.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

from clr import AddReference
AddReference("QuantConnect.Common")
AddReference("QuantConnect.Algorithm")
AddReference("QuantConnect.Algorithm.Framework")
AddReference("QuantConnect.Indicators")

from QuantConnect import *
from QuantConnect.Indicators import *
from QuantConnect.Algorithm import *
from QuantConnect.Algorithm.Framework import *
from QuantConnect.Algorithm.Framework.Alphas import *


class EmaCrossAlphaModel(AlphaModel):
    '''Alpha model that uses an EMA cross to create insights'''

    def __init__(self,
                 fastPeriod = 12,
                 slowPeriod = 26,
                 resolution = Resolution.Daily):
        '''Initializes a new instance of the EmaCrossAlphaModel class
        Args:
            fastPeriod: The fast EMA period
            slowPeriod: The slow EMA period'''
        self.fastPeriod = fastPeriod
        self.slowPeriod = slowPeriod
        self.resolution = resolution
        self.predictionInterval = Time.Multiply(Extensions.ToTimeSpan(resolution), fastPeriod)
        self.symbolDataBySymbol = {}

        resolutionString = Extensions.GetEnumString(resolution, Resolution)
        self.Name = '{}({},{},{})'.format(self.__class__.__name__, fastPeriod, slowPeriod, resolutionString)


    def Update(self, algorithm, data):
        '''Updates this alpha model with the latest data from the algorithm.
        This is called each time the algorithm receives data for subscribed securities
        Args:
            algorithm: The algorithm instance
            data: The new data available
        Returns:
            The new insights generated'''
        insights = []
        for symbol, symbolData in self.symbolDataBySymbol.items():
            if symbolData.Fast.IsReady and symbolData.Slow.IsReady:

                if symbolData.FastIsOverSlow:
                    if symbolData.Slow > symbolData.Fast:
                        insights.append(Insight.Price(symbolData.Symbol, self.predictionInterval, InsightDirection.Down))

                elif symbolData.SlowIsOverFast:
                    if symbolData.Fast > symbolData.Slow:
                        insights.append(Insight.Price(symbolData.Symbol, self.predictionInterval, InsightDirection.Up))

            symbolData.FastIsOverSlow = symbolData.Fast > symbolData.Slow

        return insights

    def OnSecuritiesChanged(self, algorithm, changes):
        '''Event fired each time the we add/remove securities from the data feed
        Args:
            algorithm: The algorithm instance that experienced the change in securities
            changes: The security additions and removals from the algorithm'''
        for added in changes.AddedSecurities:
            symbolData = self.symbolDataBySymbol.get(added.Symbol)
            if symbolData is None:
                # create fast/slow EMAs
                symbolData = SymbolData(added)
                symbolData.Fast = algorithm.EMA(added.Symbol, self.fastPeriod, self.resolution)
                symbolData.Slow = algorithm.EMA(added.Symbol, self.slowPeriod, self.resolution)
                self.symbolDataBySymbol[added.Symbol] = symbolData
            else:
                # a security that was already initialized was re-added, reset the indicators
                symbolData.Fast.Reset()
                symbolData.Slow.Reset()


class SymbolData:
    '''Contains data specific to a symbol required by this model'''
    def __init__(self, security):
        self.Security = security
        self.Symbol = security.Symbol
        self.Fast = None
        self.Slow = None

        # True if the fast is above the slow, otherwise false.
        # This is used to prevent emitting the same signal repeatedly
        self.FastIsOverSlow = False

    @property
    def SlowIsOverFast(self):
        return not self.FastIsOverSlow