Overall Statistics |
Total Trades 0 Average Win 0% Average Loss 0% Compounding Annual Return 0% Drawdown 0% Expectancy 0 Net Profit 0% Sharpe Ratio 0 Probabilistic Sharpe Ratio 0% Loss Rate 0% Win Rate 0% Profit-Loss Ratio 0 Alpha 0 Beta 0 Annual Standard Deviation 0 Annual Variance 0 Information Ratio -10.468 Tracking Error 0.053 Treynor Ratio 0 Total Fees $0.00 |
from BankingIndustryStocks import BankingIndustryStocks from datetime import datetime, timedelta from numpy import sum # import pandas as pd # from grid_class import * from gid_func import * from collections import deque class CalibratedParticleAtmosphericScrubbers(QCAlgorithm): def Initialize(self): self.SetStartDate(2020, 2, 11) # Set Start Date self.SetEndDate(2020, 2, 15) # Set End Date self.SetCash(100000) # Set Strategy Cash # self.AddEquity("SPY", Resolution.Minute) self.UniverseSettings.Resolution = Resolution.Daily self.SetUniverseSelection(BankingIndustryStocks()) self.AddAlpha(Grid_indicator("Grid")) class Grid_indicator(AlphaModel): def __init__(self, name): self.symbolDataBySymbol = {} self.LOW_index = 0 self.Name = name self.Time = datetime.min self.Value = 0 def __repr__(self): return "{0} -> IsReady: {1}. Time: {2}. Value: {3}".format(self.Name, self.IsReady, self.Time, self.Value) def Update(self, algorithm, data): for symbol, item in self.symbolDataBySymbol.items(): #https://www.quantconnect.com/forum/discussion/7956/rolling-window-questions/p1 if data.ContainsKey(symbol) and data[symbol] is not None: data_IN = {'Open' : data[symbol].Open, 'High' : data[symbol].High, 'Low' : data[symbol].Low, 'Close': data[symbol].Close } item.series_index += 1 algorithm.Log(f"{algorithm.Time} {symbol.Value}: Index: {item.series_index}") algorithm.Log(f"{algorithm.Time} {symbol.Value}: Close: {round(data[symbol].Close, 2)}") algorithm.Log(f"{algorithm.Time} {symbol.Value}: blue0 @ index: {item.series_index}: {round(item.blue_zero.values[item.series_index][0], 2)}") if item.series_index == 5: algorithm.Debug("*** Trial redo of indicator*** ") for mod in list(self.symbolDataBySymbol.values()): data_IN = {'Open' : data[symbol].Open, 'High' : data[symbol].High, 'Low' : data[symbol].Low, 'Close': data[symbol].Close } algorithm.Debug(mod.Symbol.Value) algorithm.Debug(mod.blue_zero.head()) algorithm.Debug(f"testing out blue0 @ Index {item.series_index}: {SymbolData(symbol.Value, data_IN, algorithm, self.Name).blue_zero.values[item.series_index][0]}") # self.symbolDataBySymbol[symbol.Value] = SymbolData(symbol.Value, data_IN, algorithm, self.Name) insights = [] for symbol, item in self.symbolDataBySymbol.items(): sma = item.indicator if sma > 1: insights.append(Insight.Price(symbol,timedelta(hours=5), InsightDirection.Up)) return insights def OnSecuritiesChanged(self, algorithm, changes): for removed in changes.RemovedSecurities: self.symbolDataBySymbol.pop(removed.Symbol, None) for added in changes.AddedSecurities: if added.Symbol not in self.symbolDataBySymbol: algorithm.Debug(f"Added Symbol in OnSecuritiesChanged: {added.Symbol.Value}") data_IN = {'Open' : added.Open, 'High' : added.High, 'Low' : added.Low, 'Close': added.Close } algorithm.Debug(data_IN) self.symbolDataBySymbol[added.Symbol] = SymbolData(added.Symbol, data_IN, algorithm, self.Name) algorithm.Debug(f"blue_zero for {added.Symbol.Value} :{self.symbolDataBySymbol[added.Symbol].blue_zero.head(2)}") class SymbolData: def __init__(self, symbol, data_IN, algorithm, name): self.Symbol = symbol # algorithm.Debug(f"Added Symbol in SymbolData: {self.Symbol.Value}") self.Name = name self.Time = datetime.min self.Value = 0 self.blue_neg_one = None self.blue_neg_two = None self.blue_neg_three = None self.blue_pos_one = None self.blue_pos_two = None self.blue_pos_three = None self.green_neg_one = None self.green_neg_two = None self.green_neg_three = None self.green_pos_one = None self.green_pos_two = None self.green_pos_three = None self.green_zero = None self.blue_zero = None self.series_index = 0 self.indicator = self.makeIndicator(symbol, data_IN) def makeIndicator(self, symbol, data_IN): #just a random data here data_IN = {'Date' : ["2020-08-31 09:30:00"], 'Open' : [data_IN["Open"]], 'High' : [data_IN["High"]], 'Low' : [data_IN["Low"]], 'Close': [data_IN["Close"]]} data_IN = pd.DataFrame(data=data_IN) data_IN.set_index("Date", inplace=True) data_IN.index = pd.to_datetime(data_IN.index).strftime('%Y-%m-%d %H:%M:%S') data_IN.index = pd.to_datetime(data_IN.index) self.LOW_series = get_price_series(price_point="Low", interval="1d", data=data_IN) self.blue_neg_one = self.LOW_series['blue neg1'] self.blue_neg_two = self.LOW_series['blue neg2'] self.blue_neg_three = self.LOW_series['blue neg3'] self.blue_pos_one = self.LOW_series['blue pos1'] self.blue_pos_two = self.LOW_series['blue pos2'] self.blue_pos_three = self.LOW_series['blue pos3'] self.green_neg_one = self.LOW_series['green neg1'] self.green_neg_two = self.LOW_series['green neg2'] self.green_neg_three = self.LOW_series['green neg3'] self.green_pos_one = self.LOW_series['green pos1'] self.green_pos_two = self.LOW_series['green pos2'] self.green_pos_three = self.LOW_series['green pos3'] self.green_zero = self.LOW_series['green 0'] self.blue_zero = self.LOW_series['blue 0'] # self.Time = input.EndTime self.Value = self.blue_zero.iloc[0].values[0] #This check does not seem to do anything return len(self.blue_zero) == 150
from Selection.FundamentalUniverseSelectionModel import FundamentalUniverseSelectionModel class BankingIndustryStocks(FundamentalUniverseSelectionModel): ''' This module selects the most liquid stocks listed on the Nasdaq Stock Exchange. ''' def __init__(self, filterFineData = True, universeSettings = None, securityInitializer = None): '''Initializes a new default instance of the TechnologyUniverseModule''' super().__init__(filterFineData, universeSettings, securityInitializer) self.numberOfSymbolsCoarse = 1000 self.numberOfSymbolsFine = 100 self.dollarVolumeBySymbol = {} self.lastMonth = -1 def SelectCoarse(self, algorithm, coarse): ''' Performs a coarse selection: -The stock must have fundamental data -The stock must have positive previous-day close price -The stock must have positive volume on the previous trading day ''' if algorithm.Time.month == self.lastMonth: return Universe.Unchanged sortedByDollarVolume = sorted([x for x in coarse if x.HasFundamentalData and x.Volume > 0 and x.Price > 0], key = lambda x: x.DollarVolume, reverse=True)[:self.numberOfSymbolsCoarse] self.dollarVolumeBySymbol = {x.Symbol:x.DollarVolume for x in sortedByDollarVolume} # If no security has met the QC500 criteria, the universe is unchanged. if len(self.dollarVolumeBySymbol) == 0: return Universe.Unchanged return list(self.dollarVolumeBySymbol.keys()) def SelectFine(self, algorithm, fine): ''' Performs a fine selection for companies in the Morningstar Banking Sector ''' # Filter stocks and sort on dollar volume sortedByDollarVolume = sorted([x for x in fine if x.AssetClassification.MorningstarIndustryGroupCode == MorningstarIndustryGroupCode.Banks], key = lambda x: self.dollarVolumeBySymbol[x.Symbol], reverse=True) if len(sortedByDollarVolume) == 0: return Universe.Unchanged self.lastMonth = algorithm.Time.month return [x.Symbol for x in sortedByDollarVolume[:self.numberOfSymbolsFine]][:2]
import matplotlib import pandas as pd import numpy as np import re from math import * import os def get_price_series(price_point: str, interval: str = "1d", csv_records: int = 150, data=None): class Line: slope = 0.0 intercept = 0.0 point = (0, 0) color = "" def __init__(self, slope: float, intercept: float, color: str, point: tuple, label: str): self.slope = slope self.intercept = intercept self.label = label self.point = point self.color = color def get_y(self, x: float): return self.slope * float(x) + self.intercept # print("Initializing data...") lines = [] date_price_series = {} grid_lines = [] col = price_point #This is for GE calibration blue_d = 99.0 green_d = 3.0 blue_deg = 4.0 green_deg = 13.0 const_blue_d = 1 const_green_d = 1 const_blue_angle = 1 const_green_angle = 1 xmin = -7 xmax = 13 ymin = 58 ymax = 113 xpx = 46 dpi = 100.0 # We will be using x_y_angle everywhere since it is in radians blue_angle = radians(blue_deg) green_angle = radians(green_deg) frequency_unit = re.findall(r'[A-Za-z]+|\d+', interval)[1] frequency = re.findall(r'[A-Za-z]+|\d+', interval)[0] interval = frequency + frequency_unit if interval != "1d": matplotlib.rcParams['timezone'] = 'US/Eastern' else: matplotlib.rcParams['timezone'] = 'UTC' ''' This is because it the one and only row of a dataframe ''' xp = 0.0 yp = data[col][int(xp)] point = (xp, yp) ratio = (ymax - ymin)/(xmax - xmin) # print(ratio) y_line_slope = 1.0 / tan(green_angle * const_green_angle) * ratio y_line_intercept = yp - y_line_slope * xp x_line_slope = tan(blue_angle * const_blue_angle) * ratio x_line_intercept = yp - x_line_slope * xp lines.append(Line(x_line_slope, x_line_intercept, 'blue', point, "blue 0")) lines.append(Line(y_line_slope, y_line_intercept, "green", point, "green 0")) for line in lines: original_intercept = line.intercept if line.color == "green": g_d_inches = green_d / 25.4 inches = g_d_inches / sin(green_angle - pi / 2.0) xp = inches * dpi / xpx * const_green_d else: b_d_inches = blue_d / 25.4 inches = b_d_inches / sin(blue_angle) xp = inches * dpi / xpx * const_blue_d for direction in (-1.0, 1.0): intercept = original_intercept for x in range(15): yp = intercept intercept = yp + abs(line.slope) * xp * direction point = (xp, yp) if direction == 1: # going up pos = "pos" + str(x + 1) else: pos = "neg" + str(x + 1) label = line.color + " " + pos li = Line(line.slope, intercept, line.color, point, label) grid_lines.append(li) all_lines = lines + grid_lines xp = lines[0].point[0] for line in all_lines: date = data.index[int(xp)] new_intercept = xp * line.slope + line.intercept prices = [] for x in range(csv_records): price = round(line.slope * x + new_intercept, 18) prices.append(price) df = {col: prices} df = pd.DataFrame(df) df.index.name = "Date" # print("Making:", line.label) date_price_series[line.label] = df return date_price_series # Your New Python File
from gid_func import * from collections import deque from datetime import datetime, timedelta from numpy import sum # Python implementation of SimpleMovingAverage. # Represents the traditional simple moving average indicator (SMA). class Grid_indicator(PythonIndicator): def __init__(self, name): self.Name = name self.Time = datetime.min self.Value = 0 # self.IsReady = False # self.queue = deque(maxlen=period) self.blue_neg_one = None self.blue_neg_two = None self.blue_neg_three = None self.blue_pos_one = None self.blue_pos_two = None self.blue_pos_three = None self.green_neg_one = None self.green_neg_two = None self.green_neg_three = None self.green_pos_one = None self.green_pos_two = None self.green_pos_three = None self.green_zero = None self.blue_zero = None def __repr__(self): return "{0} -> IsReady: {1}. Time: {2}. Value: {3}".format(self.Name, self.IsReady, self.Time, self.Value) # Update method is mandatory def Update(self, input): # if self.Value == 0: #just a random data here data_IN = {'Date': ["2020-08-31 09:30:00"], 'Open': [input.Open], 'High': [input.High], 'Low': [input.Low], 'Close': [input.Close]} data_IN = pd.DataFrame(data=data_IN) data_IN.set_index("Date", inplace=True) data_IN.index = pd.to_datetime(data_IN.index).strftime('%Y-%m-%d %H:%M:%S') data_IN.index = pd.to_datetime(data_IN.index) self.LOW_series = get_price_series(price_point="Low", interval="1d", data=data_IN) self.blue_neg_one = self.LOW_series['blue neg1'] self.blue_neg_two = self.LOW_series['blue neg2'] self.blue_neg_three = self.LOW_series['blue neg3'] self.blue_pos_one = self.LOW_series['blue pos1'] self.blue_pos_two = self.LOW_series['blue pos2'] self.blue_pos_three = self.LOW_series['blue pos3'] self.green_neg_one = self.LOW_series['green neg1'] self.green_neg_two = self.LOW_series['green neg2'] self.green_neg_three = self.LOW_series['green neg3'] self.green_pos_one = self.LOW_series['green pos1'] self.green_pos_two = self.LOW_series['green pos2'] self.green_pos_three = self.LOW_series['green pos3'] self.green_zero = self.LOW_series['green 0'] self.blue_zero = self.LOW_series['blue 0'] self.Time = input.EndTime self.Value = self.blue_zero.iloc[0].values[0] #This check does not seam to do anything return len(self.blue_zero) == 150