Overall Statistics
Total Orders
123
Average Win
1.84%
Average Loss
-2.15%
Compounding Annual Return
-2.041%
Drawdown
19.600%
Expectancy
-0.056
Start Equity
10000.00
End Equity
9398.55
Net Profit
-6.015%
Sharpe Ratio
-0.131
Sortino Ratio
-0.153
Probabilistic Sharpe Ratio
1.321%
Loss Rate
49%
Win Rate
51%
Profit-Loss Ratio
0.86
Alpha
-0.027
Beta
0.163
Annual Standard Deviation
0.117
Annual Variance
0.014
Information Ratio
-0.6
Tracking Error
0.148
Treynor Ratio
-0.094
Total Fees
$0.00
Estimated Strategy Capacity
$48000.00
Lowest Capacity Asset
USDJPY 8G
Portfolio Turnover
11.16%
#region imports
from AlgorithmImports import *

import statsmodels.formula.api as sm
from scipy import stats
#endregion


class MeanReversionAndMomentumForexAlgorithm(QCAlgorithm):

    def _get_history(self,symbol, num):
        data = {}
        dates = []
        history = self.history([symbol], num, Resolution.DAILY).loc[symbol]['close'] #request the historical data for a single symbol
        for time in history.index:
            t = time.to_pydatetime().date()
            dates.append(t)
        dates = pd.to_datetime(dates) 
        df = pd.DataFrame(history)
        df.reset_index(drop=True)
        df.index = dates
        df.columns = ['price']

        return df

    def _calculate_return(self,df):
        #calculate the mean for further use
        mean = np.mean(df.price)
        # cauculate the standard deviation
        sd = np.std(df.price)
        # pandas method to take the last datapoint of each month.
        df = df.resample('BM').last()
        # the following three lines are for further experiment purpose
        # df['j1'] = df.price.shift(1) - df.price.shift(2)
        # df['j2'] = df.price.shift(2) - df.price.shift(3)
        # df['j3'] = df.price.shift(3) - df.price.shift(4)
        # take the return as depend variable
        df['log_return'] = df.price - df.price.shift(1)
        # calculate the reversal factor
        df['reversal'] = (df.price.shift(1) - mean)/sd
        # calculate the momentum factor
        df['mom'] = df.price.shift(1) - df.price.shift(4)
        df = df.dropna() #remove nan value
        return (df,mean,sd)

    def _calculate_input(self,df,mean,sd):
        # df['j1'] = df.price - df.price.shift(1)
        # df['j2'] = df.price.shift(1) - df.price.shift(2)
        # df['j3'] = df.price.shift(2) - df.price.shift(3)
        df['reversal'] = (df.price - mean)/sd
        df['mom'] = df.price - df.price.shift(3)
        df = df.dropna()
        return df

    def _OLS(self,df):
        res = sm.ols(formula = 'log_return ~ reversal + mom',data = df).fit()
        return res

    def _concat(self):
        # we requested as many daily tradebars as we can
        his = self._get_history(self._quoted[0].value,20*365)
        # get the clean DataFrame for linear regression
        his = self._calculate_return(his)
        # add property to the symbol object for further use.
        self._quoted[0].mean = his[1]
        self._quoted[0].sd = his[2]
        df = his[0]
        # repeat the above procedure for each symbols, and concat the dataframes
        for i in range(1,len(self._quoted)):
            his = self._get_history(self._quoted[i].value,20*365)
            his = self._calculate_return(his)
            self._quoted[i].mean = his[1]
            self._quoted[i].sd = his[2]
            df = pd.concat([df,his[0]])
        df = df.sort_index()
        # remove outliers that outside the 99.9% confidence interval
        df = df[df.apply(lambda x: np.abs(x - x.mean()) / x.std() < 3).all(axis=1)]
        return df

    def _predict(self,symbol):
        # get current month in string
        month = str(self.time).split(' ')[0][5:7]
        # request the data in the last three months
        res = self._get_history(symbol.value,33*3)
        # pandas method to take the last datapoint of each month
        res = res.resample('BM').last()
        # remove the data points in the current month
        res = res[res.index.month != int(month)]
        # calculate the variables
        res = self._calculate_input(res,symbol.mean,symbol.sd)
        res = res.iloc[0]
        # take the coefficient. The first one will not be used for sum-product because it's the intercept
        params = self._formula.params[1:]
        # calculate the expected return
        re = sum([a*b for a,b in zip(res[1:],params)]) + self._formula.params[0]
        return re

    def initialize(self):
        self.set_start_date(2013,6,1)
        self.set_end_date(2016,6,1)
        self.set_cash(10000)
        syls = ['EURUSD','GBPUSD','USDCAD','USDJPY']
        self._quoted = []
        for i in range(len(syls)):
            self._quoted.append(self.add_forex(syls[i],Resolution.DAILY,Market.OANDA).symbol)
        
        df = self._concat()
        self.log(str(df))
        self._formula = self._OLS(df)
        self.log(str(self._formula.summary()))
        self.log(str(df))
        self.log(str(df.describe()))
        for i in self._quoted:
            self.log(str(i.mean) + '   ' + str(i.sd))
    
        self.schedule.on(self.date_rules.month_start(), self.time_rules.at(9,31), self._action)

    def on_data(self,data):
        self.data = data

    def _action(self):
        rank = []
        long_short = []
        for i in self._quoted:
            rank.append((i,self._predict(i)))
        # rank the symbols by their expected return
        rank.sort(key = lambda x: x[1],reverse = True)
        # the first element in long_short is the one with the highest expected return, which we are going to long, and the second one is going to be shorted.
        long_short.append(rank[0])
        long_short.append(rank[-1])
        self.liquidate()

        # the product < 0 means the expected return of the first one is positive and that of the second one is negative--we are going to long and short.
        if long_short[0][1]*long_short[1][1] < 0:
            self.set_holdings(long_short[0][0],1)
            self.set_holdings(long_short[1][0],-1)
        # this means we long only because all of the expected return is positive
        elif long_short[0][1] > 0 and long_short[1][1] > 0:
            self.set_holdings(long_short[0][0],1)
        # short only
        else:
            self.set_holdings(long_short[1][0],-1)