Overall Statistics
Total Trades
8
Average Win
0%
Average Loss
0%
Compounding Annual Return
0%
Drawdown
0%
Expectancy
0
Net Profit
0%
Sharpe Ratio
0
Probabilistic Sharpe Ratio
0%
Loss Rate
0%
Win Rate
0%
Profit-Loss Ratio
0
Alpha
0
Beta
0
Annual Standard Deviation
0
Annual Variance
0
Information Ratio
0
Tracking Error
0
Treynor Ratio
0
Total Fees
$8.00
Estimated Strategy Capacity
$0
Lowest Capacity Asset
AAPL.EODDATA 2S
from asyncio import FastChildWatcher
from AlgorithmImports import *
import numpy as np
from dateutil.parser import parse


class FormalRedChicken(QCAlgorithm):

    def Initialize(self):
        self.SetStartDate(2021, 11, 1)  # Set Start Date
        self.SetEndDate(2021, 11, 1)
        self.SetCash(10000)  # Set Strategy Cash
        self.symbol = self.AddData(EODDATA, "AAPL", Resolution.Minute).Symbol
        self.sum = 0 
        #self.sum_n = 0 
        self.tick_list = []
        self.tick_seq = []
        #self.tick_list_n = []
        #self.tick_seq_n = []
        self.ts = int(self.GetParameter("tick_size"))
        #self.ts_n = int(self.GetParameter("tick_size_n"))
        self.window_size = int(self.GetParameter("window_size"))
        #self.window_size_n = int(self.GetParameter("window_size_n"))
        self.next_positive_prob = 0
        self.next_negative_prob = 0

        #Reality Modeling 
        self.interests = self.AddData(DFFData, "INT", Resolution.Daily).Symbol
        #Benchmark
        self.SetBenchmark(self.AddEquity("SPY").Symbol)
        #IB Fee model
        self.Securities[self.symbol].SetFeeModel(CustomFeeModel())
        #Set slippage model
        self.Securities[self.symbol].SetSlippageModel(ConstantSlippageModel(0.0002))
        #Set Leverage interest model
        self.Securities[self.symbol].MarginModel = BuyingPowerModel(2.0, 0.05)
        self.cash_rate = 0.005
        self.leverage_rate = .0075
        self.short_rate = 0.0025
        self.leverage_costs = 0
        self.cash_interests = 0
        self.short_costs = 0
        self.max_leverage = 2
        self.qt_factor = float(self.GetParameter("qt"))
        #Schedule events
        self.Schedule.On(self.DateRules.EveryDay(), self.TimeRules.At(0,0), self.Cash_Rebalance)
        self.Schedule.On(self.DateRules.EveryDay(), self.TimeRules.At(20,0), self.Leverage_Rebalance)
        self.Schedule.On(self.DateRules.EveryDay(), self.TimeRules.At(20,0), self.Short_Rebalance)
        self.Schedule.On(self.DateRules.EveryDay(), self.TimeRules.Every(TimeSpan.FromHours(int(self.GetParameter("prob_rebalance")))), self.Get_Probabilities)
        self.debug = True

        self.qt_factor = 1.5
        #Schedule events
        self.Schedule.On(self.DateRules.EveryDay(), self.TimeRules.At(0,0), self.Cash_Rebalance)
        self.Schedule.On(self.DateRules.EveryDay(), self.TimeRules.At(20,0), self.Leverage_Rebalance)
        self.Schedule.On(self.DateRules.EveryDay(), self.TimeRules.At(20,0), self.Short_Rebalance)
        self.Schedule.On(self.DateRules.EveryDay(), self.TimeRules.Every(TimeSpan.FromHours(int(self.GetParameter("prob_rebalance")))), self.Get_Probabilities)
        self.debug = True
        
    def OnData(self, data):
        
        if not data.ContainsKey(self.symbol): return

        if self.debug:
            self.Debug(f"Last price: {data[self.symbol].Value}")
            self.Debug(f"Last qty: {data[self.symbol].GetProperty('Volume')}")
        
        self.tick_list.append(data[self.symbol].Value)
        self.sum = self.sum + data[self.symbol].GetProperty('Volume')
        #self.tick_list_n.append(data[self.symbol].Value)
        #self.sum_n = self.sum_n + data[self.symbol].GetProperty('Volume')
        
        if self.sum >= self.ts:
            self.sum = 0
            if self.tick_list[0] < self.tick_list[-1]:
                self.tick_seq.append(1)
            else:
                self.tick_seq.append(-1)
            self.tick_list.clear()

            if self.debug:
                self.Debug('New Tick! (pos)')
                self.Debug('Tick seq ' + str(self.tick_seq))
                self.Debug('Sum ' + str(sum(self.tick_seq[-(self.window_size):])))
            
            total_leverage = float(self.Portfolio.TotalPortfolioValue) * self.max_leverage 
            qt = (total_leverage * (float(self.qt_factor)/self.max_leverage)) / data[self.symbol].Value
            
            if self.next_positive_prob > 0.6:
                if not self.Portfolio.Invested:
                    if sum(self.tick_seq[-(self.window_size):]) == self.window_size:
                        self.MarketOrder(self.symbol, qt)
                        #if self.debug:
                        self.Debug('Bought ' +str(qt)+ ' stocks at ' + str(data[self.symbol].Value))
                else:
                    if self.Portfolio[self.symbol].IsLong:
                        if self.tick_seq[-1]==-1:
                            self.Liquidate()
                            #if self.debug:
                            self.Debug('Liqudated')

        #if self.sum_n >= self.ts_n:
        #    self.sum_n = 0
        #    if self.tick_list_n[0] < self.tick_list_n[-1]:
        #        self.tick_seq_n.append(1)
        #    else:
        #        self.tick_seq_n.append(-1)
        #    self.tick_list_n.clear()
        #    if self.debug:
        #        self.Debug('New Tick! (neg)')
        #        self.Debug('Tick seq ' + str(self.tick_seq_n))
        #        self.Debug('Sum ' + str(sum(self.tick_seq_n[-(self.window_size_n):])))
            
        #    total_leverage = float(self.Portfolio.TotalPortfolioValue) * self.max_leverage 
        #    qt = (total_leverage * (float(self.qt_factor)/self.max_leverage)) / data[self.symbol].Value
            
            if self.next_negative_prob > 0.6:
                if not self.Portfolio.Invested:
                    if sum(self.tick_seq[-(self.window_size):]) == -self.window_size:
                        self.MarketOrder(self.symbol, -qt)
                        #if self.debug:
                        self.Debug('Shorted ' +str(qt)+ ' stocks at ' + str(data[self.symbol].Value))
                else:
                    if self.Portfolio[self.symbol].IsShort:
                        if self.tick_seq[-1]==1:
                            self.SetHoldings(self.symbol, 0)
                            if self.debug:
                                self.Debug('Stoped short')
                            
    def OnEndOfAlgorithm(self):
        self.ExitPositions()

    def ExitPositions(self):
        self.Liquidate()
        self.Debug('TRADING FEES: ' + str(self.Portfolio[self.symbol].TotalFees))
        self.Debug('LEVERAGE COSTS: ' + str(self.leverage_costs))
        self.Debug('CASH INTERESTS: ' + str(self.cash_interests))
        self.Debug('SHORT COSTS: ' + str(self.short_costs))


    def Cash_Rebalance(self):
        if not self.Portfolio.Invested:
            data = self.Securities[self.interests].GetLastData()
            if data:
                cash = self.Portfolio.Cash
                interest_rate = (data.Value / 100 - self.cash_rate)/360
                if interest_rate < 0: return
                else: 
                    self.Portfolio.CashBook["USD"].AddAmount(cash * interest_rate)
                    self.cash_interests += cash * interest_rate
                    if self.debug:
                        self.Debug('Interest '+str(interest_rate))
                        self.Debug("Cash int " + str(cash * interest_rate))
            
    def Leverage_Rebalance(self):
        if self.Portfolio.Invested:
            data = self.Securities[self.interests].GetLastData()
            if data:
                cash = self.Portfolio.Cash
                interest_rate = (data.Value / 100 + self.leverage_rate)/360
                self.Portfolio.CashBook["USD"].AddAmount(cash * interest_rate)
                self.leverage_costs += cash * interest_rate
                if self.debug:
                    self.Debug('Interest '+str(interest_rate))
                    self.Debug("Leverage int " + str(cash * interest_rate))
            
    def Short_Rebalance(self):
        if self.Portfolio[self.symbol].IsShort:
            Last_order = self.Transactions.LastOrderId
            mkt_value = self.Transactions.GetOrderTicket(Last_order).AverageFillPrice * self.Transactions.GetOrderTicket(Last_order).QuantityFilled 
            interest = self.short_rate / 360
            self.short_costs += mkt_value * interest
            self.Portfolio.CashBook["USD"].AddAmount(mkt_value * interest)
            if self.debug:
                self.Debug('Portfolio is short')
                self.Debug('Short Interest ' + str(mkt_value * interest))
                
    def Get_Probabilities(self):
        if len(self.tick_seq) == 0: return
        self.tick_idx = [i for i in range(len(self.tick_seq))]
        window_total = 0
        positive_ticks = 0
        next_positive = 0
        negative_ticks = 0
        next_negative = 0
        for i in range(self.window_size ,len(self.tick_seq)+1):
            window_total = np.sum( self.tick_seq[i-self.window_size:i-1] )
            if window_total == self.window_size-1:
                positive_ticks += 1
                if self.tick_seq[i-1] == 1:
                    next_positive += 1
            elif -window_total == self.window_size-1:
                negative_ticks += 1
                if self.tick_seq[i-1] == -1:
                    next_negative += 1
                    
        if positive_ticks == 0: self.next_positive_prob = 0
        else: self.next_positive_prob = next_positive/positive_ticks
        if negative_ticks == 0: self.next_negative_prob=0
        else: self.next_negative_prob = next_negative /negative_ticks
        if self.debug:
            self.Debug('Window size ' + str(self.window_size))
            self.Debug('Tick size ' + str(self.ts))
            self.Debug('Total ticks ' + str(len(self.tick_seq)))
            self.Debug(str(self.window_size-1) + ' positive ticks in a row probability: ' + str(positive_ticks/len(self.tick_seq)) + ', '+ str(self.window_size-1) + ' negative ticks in a row prob. ' + str(negative_ticks/len(self.tick_seq)))
            self.Debug(str(self.window_size) +' positive probability: ' + str(self.next_positive_prob) +' ' + str(self.window_size) +' negative probability ' + str(self.next_negative_prob))
            #self.Debug(str(self.next_positive) +' ' + str(self.next_negative))
            
class CustomFeeModel:
    
    def GetOrderFee(self, parameters):
        if parameters.Order.Quantity > 0:
            fee = min(max(1, parameters.Order.AbsoluteQuantity * 0.005), parameters.Security.Price * parameters.Order.AbsoluteQuantity * 0.01)
        elif parameters.Order.Tag == 'Liquidated':
            fee = min(max(1, parameters.Order.AbsoluteQuantity * 0.005), parameters.Security.Price * parameters.Order.AbsoluteQuantity * 0.01)
        elif parameters.Order.Quantity < 0:
            fee = min(max(1, parameters.Order.AbsoluteQuantity * 0.005), parameters.Security.Price * parameters.Order.AbsoluteQuantity * 0.01)
        else:
            fee = 0

        return OrderFee(CashAmount(fee, 'USD'))

class DFFData(PythonData):
    
    def GetSource(self, config, date, isLiveMode):
        source = "https://www.dropbox.com/s/n4beqe9zlcv0y4t/DFF.csv?dl=1"
        return SubscriptionDataSource( source, SubscriptionTransportMedium.RemoteFile) #rest for live data
        
    def Reader(self, config, line, date, isLiveMode):
        
        if not (line.strip() and line[0].isdigit()): return None
        data = line.split(',')
        coin = DFFData()
        coin.Symbol = config.Symbol
        coin.Time = parse(data[0])
        coin.Value =  float(data[1])

        return coin
        
class EODDATA(PythonData):
    
    def GetSource(self, config, date, isLiveMode):
        source = "https://www.dropbox.com/s/6hma23a7tnrkbph/AMZN_1m_2004-01-01_2022-03-01.csv?dl=1"
        return SubscriptionDataSource( source, SubscriptionTransportMedium.RemoteFile)

    def Reader(self, config, line, date, isLiveMode):
        
        if not (line.strip() and line[0].isdigit()): return None
        data = line.split(',')
        coin = EODDATA()
        coin.Symbol = config.Symbol
        coin.Time = parse(data[0])
        coin.Value = float(data[4]) 
        coin["Volume"] = float(data[5])

        return coin