Overall Statistics |
Total Trades 4 Average Win 0% Average Loss 0% Compounding Annual Return 13.645% Drawdown 10.300% Expectancy 0 Net Profit 7.221% Sharpe Ratio 0.832 Probabilistic Sharpe Ratio 43.012% Loss Rate 0% Win Rate 0% Profit-Loss Ratio 0 Alpha 0.113 Beta 0.058 Annual Standard Deviation 0.147 Annual Variance 0.022 Information Ratio -0.093 Tracking Error 0.391 Treynor Ratio 2.099 Total Fees $4.67 |
from BankingIndustryStocks import BankingIndustryStocks from datetime import datetime, timedelta from numpy import sum # import pandas as pd # from grid_class import * from gid_func import * from collections import deque def get_change(current, previous): if current == previous: return 0 try: return round((abs(current - previous) / previous) * 100.0, 2) except ZeroDivisionError: return float('inf') class CalibratedParticleAtmosphericScrubbers(QCAlgorithm): def Initialize(self): self.SetStartDate(2020, 2, 11) # Set Start Date self.SetEndDate(2020, 8, 27) # Set End Date self.SetCash(100000) # Set Strategy Cash self.UniverseSettings.Resolution = Resolution.Daily # self.UniverseSettings.Resolution = Resolution.Minute self.SetUniverseSelection(BankingIndustryStocks()) self.AddAlpha(Grid_indicator("Grid")) class Grid_indicator(AlphaModel): def __init__(self, name): self.symbolDataBySymbol = {} self.Name = name self.Time = datetime.min self.Value = 0 def Update(self, algorithm, data): self.HAS_RESET = False insights = [] for symbol, item in self.symbolDataBySymbol.items(): #Want this to be an item specific seris index value aka 100 and for this to then reset just this one asset indicator, not all. Like it currently does. if item.series_index == 150: for mod in list(self.symbolDataBySymbol.values()): if symbol == mod.Symbol: algorithm.Debug(f"HIT max grid length of {item.series_index} for {mod.Symbol.Value}, REDRAW GRID!") # algorithm.Debug(f"*** redo {mod.Symbol.Value} of indicator @ Index {item.series_index}*** ") data_IN = {'Open' : data[mod.Symbol].Open, 'High' : data[mod.Symbol].High, 'Low' : data[mod.Symbol].Low, 'Close': data[mod.Symbol].Close } #Visually check the data in, make sure it is different for each symbol algorithm.Debug(data_IN) #This resets the dict with the latest grid for the specific symbol self.symbolDataBySymbol[mod.Symbol] = SymbolData(mod.Symbol, data_IN, algorithm, self.Name) algorithm.Debug(f"mod symbol: {mod.Symbol.Value}") #Check out the new grid algorithm.Debug(self.symbolDataBySymbol[mod.Symbol].blue_zero.head()) #Just print out one at blue 0 to visually check algorithm.Debug(f"testing out blue0 @ Index 1: {self.symbolDataBySymbol[mod.Symbol].blue_zero.values[1][0]}") #https://www.quantconnect.com/forum/discussion/7956/rolling-window-questions/p1 if data.ContainsKey(symbol) and data[symbol] is not None and self.HAS_RESET == False: #item.series_index != 5 algorithm.Log(f"{symbol.Value}: Blue 0 @ index {item.series_index} - {round(item.blue_zero.values[item.series_index][0], 2)} Low: {round(data[symbol].Close, 2)}") algorithm.Log(f"{symbol.Value}: Index: {item.series_index}") algorithm.Log(f"{symbol.Value}: Close: {round(data[symbol].Close, 2)}") if not algorithm.Portfolio[symbol].Invested: algorithm.SetHoldings(symbol, -0.1) # # insights.append(Insight.Price(symbol, timedelta(days=150), InsightDirection.Down)) item.series_index += 1 return insights def OnSecuritiesChanged(self, algorithm, changes): for removed in changes.RemovedSecurities: self.symbolDataBySymbol.pop(removed.Symbol, None) algorithm.Debug(f"REMOVED Symbol in OnSecuritiesChanged: {removed.Symbol.Value}") for added in changes.AddedSecurities: if added.Symbol not in self.symbolDataBySymbol: algorithm.Debug(f"Added Symbol in OnSecuritiesChanged: {added.Symbol.Value}") data_IN = {'Open' : added.Open, 'High' : added.High, 'Low' : added.Low, 'Close': added.Close } algorithm.Debug(data_IN) self.symbolDataBySymbol[added.Symbol] = SymbolData(added.Symbol, data_IN, algorithm, self.Name) algorithm.Debug(f"blue_zero for {added.Symbol.Value} :{self.symbolDataBySymbol[added.Symbol].blue_zero.head(2)}") class SymbolData: def __init__(self, symbol, data_IN, algorithm, name): self.Symbol = symbol self.Name = name self.Time = datetime.min self.Value = 0 # self.OP_low = None # self.long_reset = True # self.RESET_POTENTIAL_FLAG = False # self.ALREADY_TRADED = False # self.POINT_B = False # self.SHORT_TRADE_ENTER = False # self.already_shorted = False # self.LONG_TRADE_ENTER = False # self.IN_LONG_TRADE = False # self.LONG_A = False self.blue_neg_one = None self.blue_neg_two = None self.blue_neg_three = None self.blue_pos_one = None self.blue_pos_two = None self.blue_pos_three = None self.green_neg_one = None self.green_neg_two = None self.green_neg_three = None self.green_pos_one = None self.green_pos_two = None self.green_pos_three = None self.green_zero = None self.blue_zero = None self.series_index = 0 self.indicator = self.makeIndicator(symbol, data_IN) def makeIndicator(self, symbol, data_IN): #just a random data here data_IN = {'Date' : ["2020-08-31 09:30:00"], 'Open' : [data_IN["Open"]], 'High' : [data_IN["High"]], 'Low' : [data_IN["Low"]], 'Close': [data_IN["Close"]]} data_IN = pd.DataFrame(data=data_IN) data_IN.set_index("Date", inplace=True) data_IN.index = pd.to_datetime(data_IN.index).strftime('%Y-%m-%d %H:%M:%S') data_IN.index = pd.to_datetime(data_IN.index) self.LOW_series = get_price_series(price_point="Low", interval="1d", data=data_IN) self.blue_neg_one = self.LOW_series['blue neg1'] self.blue_neg_two = self.LOW_series['blue neg2'] self.blue_neg_three = self.LOW_series['blue neg3'] self.blue_pos_one = self.LOW_series['blue pos1'] self.blue_pos_two = self.LOW_series['blue pos2'] self.blue_pos_three = self.LOW_series['blue pos3'] self.green_neg_one = self.LOW_series['green neg1'] self.green_neg_two = self.LOW_series['green neg2'] self.green_neg_three = self.LOW_series['green neg3'] self.green_pos_one = self.LOW_series['green pos1'] self.green_pos_two = self.LOW_series['green pos2'] self.green_pos_three = self.LOW_series['green pos3'] self.green_zero = self.LOW_series['green 0'] self.blue_zero = self.LOW_series['blue 0'] # self.Time = input.EndTime self.Value = self.blue_zero.iloc[0].values[0] #This check does not seem to do anything return len(self.blue_zero) == 150
from Selection.FundamentalUniverseSelectionModel import FundamentalUniverseSelectionModel class BankingIndustryStocks(FundamentalUniverseSelectionModel): ''' This module selects the most liquid stocks listed on the Nasdaq Stock Exchange. ''' def __init__(self, filterFineData = True, universeSettings = None, securityInitializer = None): '''Initializes a new default instance of the TechnologyUniverseModule''' super().__init__(filterFineData, universeSettings, securityInitializer) self.numberOfSymbolsCoarse = 1000 self.numberOfSymbolsFine = 100 self.dollarVolumeBySymbol = {} self.lastMonth = -1 def SelectCoarse(self, algorithm, coarse): ''' Performs a coarse selection: -The stock must have fundamental data -The stock must have positive previous-day close price -The stock must have positive volume on the previous trading day ''' if algorithm.Time.month == self.lastMonth: return Universe.Unchanged sortedByDollarVolume = sorted([x for x in coarse if x.HasFundamentalData and x.Volume > 0 and x.Price > 0], key = lambda x: x.DollarVolume, reverse=True)[:self.numberOfSymbolsCoarse] self.dollarVolumeBySymbol = {x.Symbol:x.DollarVolume for x in sortedByDollarVolume} # If no security has met the QC500 criteria, the universe is unchanged. if len(self.dollarVolumeBySymbol) == 0: return Universe.Unchanged return list(self.dollarVolumeBySymbol.keys()) def SelectFine(self, algorithm, fine): ''' Performs a fine selection for companies in the Morningstar Banking Sector ''' # Filter stocks and sort on dollar volume sortedByDollarVolume = sorted([x for x in fine if x.AssetClassification.MorningstarIndustryGroupCode == MorningstarIndustryGroupCode.Banks], key = lambda x: self.dollarVolumeBySymbol[x.Symbol], reverse=True) if len(sortedByDollarVolume) == 0: return Universe.Unchanged self.lastMonth = algorithm.Time.month return [x.Symbol for x in sortedByDollarVolume[:self.numberOfSymbolsFine]][:2]
import matplotlib import pandas as pd import numpy as np import re from math import * import os def get_price_series(price_point: str, interval: str = "1d", csv_records: int = 150, data=None): class Line: slope = 0.0 intercept = 0.0 point = (0, 0) color = "" def __init__(self, slope: float, intercept: float, color: str, point: tuple, label: str): self.slope = slope self.intercept = intercept self.label = label self.point = point self.color = color def get_y(self, x: float): return self.slope * float(x) + self.intercept print("Initializing data...") lines = [] date_price_series = {} grid_lines = [] col = price_point #This is for GE calibration blue_d = 30.0 green_d = 36.0 blue_deg = 45.0 green_deg = 103.0 const_blue_d = 1.06 const_green_d = 0.66 const_blue_angle = 1.24 const_green_angle = 1.09 ratio = 0.14001806684733514 xpx = 6.692936331960709 dpi = 100.0 # We will be using x_y_angle everywhere since it is in radians blue_angle = radians(blue_deg) green_angle = radians(green_deg) frequency_unit = re.findall(r'[A-Za-z]+|\d+', interval)[1] frequency = re.findall(r'[A-Za-z]+|\d+', interval)[0] interval = frequency + frequency_unit if interval != "1d": matplotlib.rcParams['timezone'] = 'US/Eastern' else: matplotlib.rcParams['timezone'] = 'UTC' ''' This is because it the one and only row of a dataframe ''' xp = 0.0 yp = data[col][int(xp)] point = (xp, yp) # ratio = (ymax - ymin)/(xmax - xmin) # print(ratio) y_line_slope = 1.0 / tan(green_angle * const_green_angle) * ratio y_line_intercept = yp - y_line_slope * xp x_line_slope = tan(blue_angle * const_blue_angle) * ratio x_line_intercept = yp - x_line_slope * xp lines.append(Line(x_line_slope, x_line_intercept, 'blue', point, "blue 0")) lines.append(Line(y_line_slope, y_line_intercept, "green", point, "green 0")) for line in lines: original_intercept = line.intercept if line.color == "green": g_d_inches = green_d / 25.4 inches = g_d_inches / sin(green_angle - pi / 2.0) xp = inches * dpi / xpx * const_green_d else: b_d_inches = blue_d / 25.4 inches = b_d_inches / sin(blue_angle) xp = inches * dpi / xpx * const_blue_d for direction in (-1.0, 1.0): intercept = original_intercept for x in range(5): yp = intercept intercept = yp + abs(line.slope) * xp * direction point = (xp, yp) if direction == 1: # going up pos = "pos" + str(x + 1) else: pos = "neg" + str(x + 1) label = line.color + " " + pos li = Line(line.slope, intercept, line.color, point, label) grid_lines.append(li) all_lines = lines + grid_lines xp = lines[0].point[0] for line in all_lines: date = data.index[int(xp)] new_intercept = xp * line.slope + line.intercept prices = [] for x in range(csv_records): price = round(line.slope * x + new_intercept, 8) prices.append(price) df = {col: prices} df = pd.DataFrame(df) df.index.name = "Date" print("Making:", line.label) date_price_series[line.label] = df return date_price_series # Your New Python File