from BankingIndustryStocks import BankingIndustryStocks
from datetime import datetime, timedelta
from numpy import sum
# import pandas as pd
# from grid_class import *
from gid_func import *
from collections import deque
class CalibratedParticleAtmosphericScrubbers(QCAlgorithm):
def Initialize(self):
self.SetStartDate(2020, 2, 11) # Set Start Date
self.SetEndDate(2020, 2, 12) # Set End Date
self.SetCash(100000) # Set Strategy Cash
# self.AddEquity("SPY", Resolution.Minute)
self.UniverseSettings.Resolution = Resolution.Daily
self.SetUniverseSelection(BankingIndustryStocks())
self.symbolDataBySymbol = {}
self.AddAlpha(Grid_indicator)
# def CoarseSelectionFunction(self, coarse):
# self.universe = coarse
# return self.universe
# def OnData(self, data):
# '''OnData event is the primary entry point for your algorithm. Each new data point will be pumped in here.
# Arguments:
# data: Slice object keyed by symbol containing the stock data
# '''
# # for key in data.Keys:
# # self.Log(key)
# # self.Debug(f"Last price: {data[key].LastPrice}")
# # # self.Debug(key.Open)
# for kvp in data.Bars:
# symbol = kvp.Key
# value = kvp.Value
# self.Log(f"{self.Time}: {symbol.Value} - Close: {round(value.Close,2)}")
# self.symbolDataBySymbol[symbol] = SymbolData(symbol, input, algorithm, self.Name)
class Grid_indicator(PythonIndicator):
def __init__(self, name):
self.symbolDataBySymbol = {}
self.Name = name
self.Time = datetime.min
self.Value = 0
def __repr__(self):
return "{0} -> IsReady: {1}. Time: {2}. Value: {3}".format(self.Name, self.IsReady, self.Time, self.Value)
def Update(self, input, algorithm):
insights = []
for symbol, item in self.symbolDataBySymbol.items():
self.Debug(symbol)
sma = item.indicator
if sma > 1:
insights.append(Insight.Price(symbol,timedelta(hours=5), InsightDirection.Up))
return insights
# def OnSecuritiesChanged(self, algorithm, changes):
def OnSecuritiesChanged(self, changes):
self.Log(changes)
for removed in changes.RemovedSecurities:
self.symbolDataBySymbol.pop(y.Symbol, None)
for added in changes.AddedSecurities:
if added.Symbol not in self.symbolDataBySymbol:
self.symbolDataBySymbol[symbol] = SymbolData(symbol, input, algorithm, self.Name)
self.Log(self.symbolDataBySymbol[symbol])
class SymbolData:
def __init__(self, symbol, input, algorithm, name):
self.Symbol = symbol
self.indicator = makeIndicator(symbol, input)
self.Name = name
self.Time = datetime.min
self.Value = 0
self.blue_neg_one = None
self.blue_neg_two = None
self.blue_neg_three = None
self.blue_pos_one = None
self.blue_pos_two = None
self.blue_pos_three = None
self.green_neg_one = None
self.green_neg_two = None
self.green_neg_three = None
self.green_pos_one = None
self.green_pos_two = None
self.green_pos_three = None
self.green_zero = None
self.blue_zero = None
def makeIndicator(symbol, input):
#just a random data here
data_IN = {'Date': ["2020-08-31 09:30:00"],
'Open': [input.Open],
'High': [input.High],
'Low': [input.Low],
'Close': [input.Close]}
data_IN = pd.DataFrame(data=data_IN)
data_IN.set_index("Date", inplace=True)
data_IN.index = pd.to_datetime(data_IN.index).strftime('%Y-%m-%d %H:%M:%S')
data_IN.index = pd.to_datetime(data_IN.index)
self.LOW_series = get_price_series(price_point="Low", interval="1d", data=data_IN)
self.blue_neg_one = self.LOW_series['blue neg1']
self.blue_neg_two = self.LOW_series['blue neg2']
self.blue_neg_three = self.LOW_series['blue neg3']
self.blue_pos_one = self.LOW_series['blue pos1']
self.blue_pos_two = self.LOW_series['blue pos2']
self.blue_pos_three = self.LOW_series['blue pos3']
self.green_neg_one = self.LOW_series['green neg1']
self.green_neg_two = self.LOW_series['green neg2']
self.green_neg_three = self.LOW_series['green neg3']
self.green_pos_one = self.LOW_series['green pos1']
self.green_pos_two = self.LOW_series['green pos2']
self.green_pos_three = self.LOW_series['green pos3']
self.green_zero = self.LOW_series['green 0']
self.blue_zero = self.LOW_series['blue 0']
self.Time = input.EndTime
self.Value = self.blue_zero.iloc[0].values[0]
#This check does not seam to do anything
return len(self.blue_zero) == 150
from Selection.FundamentalUniverseSelectionModel import FundamentalUniverseSelectionModel
class BankingIndustryStocks(FundamentalUniverseSelectionModel):
'''
This module selects the most liquid stocks listed on the Nasdaq Stock Exchange.
'''
def __init__(self, filterFineData = True, universeSettings = None, securityInitializer = None):
'''Initializes a new default instance of the TechnologyUniverseModule'''
super().__init__(filterFineData, universeSettings, securityInitializer)
self.numberOfSymbolsCoarse = 1000
self.numberOfSymbolsFine = 100
self.dollarVolumeBySymbol = {}
self.lastMonth = -1
def SelectCoarse(self, algorithm, coarse):
'''
Performs a coarse selection:
-The stock must have fundamental data
-The stock must have positive previous-day close price
-The stock must have positive volume on the previous trading day
'''
if algorithm.Time.month == self.lastMonth:
return Universe.Unchanged
sortedByDollarVolume = sorted([x for x in coarse if x.HasFundamentalData and x.Volume > 0 and x.Price > 0],
key = lambda x: x.DollarVolume, reverse=True)[:self.numberOfSymbolsCoarse]
self.dollarVolumeBySymbol = {x.Symbol:x.DollarVolume for x in sortedByDollarVolume}
# If no security has met the QC500 criteria, the universe is unchanged.
if len(self.dollarVolumeBySymbol) == 0:
return Universe.Unchanged
return list(self.dollarVolumeBySymbol.keys())
def SelectFine(self, algorithm, fine):
'''
Performs a fine selection for companies in the Morningstar Banking Sector
'''
# Filter stocks and sort on dollar volume
sortedByDollarVolume = sorted([x for x in fine if x.AssetClassification.MorningstarIndustryGroupCode == MorningstarIndustryGroupCode.Banks],
key = lambda x: self.dollarVolumeBySymbol[x.Symbol], reverse=True)
if len(sortedByDollarVolume) == 0:
return Universe.Unchanged
self.lastMonth = algorithm.Time.month
return [x.Symbol for x in sortedByDollarVolume[:self.numberOfSymbolsFine]]
import matplotlib
import pandas as pd
import numpy as np
import re
from math import *
import os
def get_price_series(price_point: str, interval: str = "1d",
csv_records: int = 150, data=None):
class Line:
slope = 0.0
intercept = 0.0
point = (0, 0)
color = ""
def __init__(self, slope: float, intercept: float, color: str, point: tuple, label: str):
self.slope = slope
self.intercept = intercept
self.label = label
self.point = point
self.color = color
def get_y(self, x: float):
return self.slope * float(x) + self.intercept
# print("Initializing data...")
lines = []
date_price_series = {}
grid_lines = []
col = price_point
#This is for GE calibration
blue_d = 99.0
green_d = 3.0
blue_deg = 4.0
green_deg = 13.0
const_blue_d = 1
const_green_d = 1
const_blue_angle = 1
const_green_angle = 1
xmin = -7
xmax = 13
ymin = 58
ymax = 113
xpx = 46
dpi = 100.0
# We will be using x_y_angle everywhere since it is in radians
blue_angle = radians(blue_deg)
green_angle = radians(green_deg)
frequency_unit = re.findall(r'[A-Za-z]+|\d+', interval)[1]
frequency = re.findall(r'[A-Za-z]+|\d+', interval)[0]
interval = frequency + frequency_unit
if interval != "1d":
matplotlib.rcParams['timezone'] = 'US/Eastern'
else:
matplotlib.rcParams['timezone'] = 'UTC'
'''
This is because it the one and only row of a dataframe
'''
xp = 0.0
yp = data[col][int(xp)]
point = (xp, yp)
ratio = (ymax - ymin)/(xmax - xmin)
# print(ratio)
y_line_slope = 1.0 / tan(green_angle * const_green_angle) * ratio
y_line_intercept = yp - y_line_slope * xp
x_line_slope = tan(blue_angle * const_blue_angle) * ratio
x_line_intercept = yp - x_line_slope * xp
lines.append(Line(x_line_slope, x_line_intercept, 'blue', point, "blue 0"))
lines.append(Line(y_line_slope, y_line_intercept, "green", point, "green 0"))
for line in lines:
original_intercept = line.intercept
if line.color == "green":
g_d_inches = green_d / 25.4
inches = g_d_inches / sin(green_angle - pi / 2.0)
xp = inches * dpi / xpx * const_green_d
else:
b_d_inches = blue_d / 25.4
inches = b_d_inches / sin(blue_angle)
xp = inches * dpi / xpx * const_blue_d
for direction in (-1.0, 1.0):
intercept = original_intercept
for x in range(15):
yp = intercept
intercept = yp + abs(line.slope) * xp * direction
point = (xp, yp)
if direction == 1: # going up
pos = "pos" + str(x + 1)
else:
pos = "neg" + str(x + 1)
label = line.color + " " + pos
li = Line(line.slope, intercept, line.color, point, label)
grid_lines.append(li)
all_lines = lines + grid_lines
xp = lines[0].point[0]
for line in all_lines:
date = data.index[int(xp)]
new_intercept = xp * line.slope + line.intercept
prices = []
for x in range(csv_records):
price = round(line.slope * x + new_intercept, 18)
prices.append(price)
df = {col: prices}
df = pd.DataFrame(df)
df.index.name = "Date"
# print("Making:", line.label)
date_price_series[line.label] = df
return date_price_series
# Your New Python File
from gid_func import *
from collections import deque
# Python implementation of SimpleMovingAverage.
# Represents the traditional simple moving average indicator (SMA).
class Grid_indicator(PythonIndicator):
def __init__(self, name):
self.Name = name
self.Time = datetime.min
self.Value = 0
# self.IsReady = False
# self.queue = deque(maxlen=period)
self.blue_neg_one = None
self.blue_neg_two = None
self.blue_neg_three = None
self.blue_pos_one = None
self.blue_pos_two = None
self.blue_pos_three = None
self.green_neg_one = None
self.green_neg_two = None
self.green_neg_three = None
self.green_pos_one = None
self.green_pos_two = None
self.green_pos_three = None
self.green_zero = None
self.blue_zero = None
def __repr__(self):
return "{0} -> IsReady: {1}. Time: {2}. Value: {3}".format(self.Name, self.IsReady, self.Time, self.Value)
# Update method is mandatory
def Update(self, input):
# if self.Value == 0:
#just a random data here
data_IN = {'Date': ["2020-08-31 09:30:00"],
'Open': [input.Open],
'High': [input.High],
'Low': [input.Low],
'Close': [input.Close]}
data_IN = pd.DataFrame(data=data_IN)
data_IN.set_index("Date", inplace=True)
data_IN.index = pd.to_datetime(data_IN.index).strftime('%Y-%m-%d %H:%M:%S')
data_IN.index = pd.to_datetime(data_IN.index)
self.LOW_series = get_price_series(price_point="Low", interval="1d", data=data_IN)
self.blue_neg_one = self.LOW_series['blue neg1']
self.blue_neg_two = self.LOW_series['blue neg2']
self.blue_neg_three = self.LOW_series['blue neg3']
self.blue_pos_one = self.LOW_series['blue pos1']
self.blue_pos_two = self.LOW_series['blue pos2']
self.blue_pos_three = self.LOW_series['blue pos3']
self.green_neg_one = self.LOW_series['green neg1']
self.green_neg_two = self.LOW_series['green neg2']
self.green_neg_three = self.LOW_series['green neg3']
self.green_pos_one = self.LOW_series['green pos1']
self.green_pos_two = self.LOW_series['green pos2']
self.green_pos_three = self.LOW_series['green pos3']
self.green_zero = self.LOW_series['green 0']
self.blue_zero = self.LOW_series['blue 0']
self.Time = input.EndTime
self.Value = self.blue_zero.iloc[0].values[0]
#This check does not seam to do anything
return len(self.blue_zero) == 150