Overall Statistics |
Total Trades 6 Average Win 32.98% Average Loss -7.91% Compounding Annual Return 1.954% Drawdown 28.000% Expectancy 0.034 Net Profit 10.102% Sharpe Ratio 0.22 Probabilistic Sharpe Ratio 4.194% Loss Rate 80% Win Rate 20% Profit-Loss Ratio 4.17 Alpha -0.012 Beta 0.332 Annual Standard Deviation 0.122 Annual Variance 0.015 Information Ratio -0.628 Tracking Error 0.145 Treynor Ratio 0.081 Total Fees $24.83 |
from clr import AddReference AddReference("System") AddReference("QuantConnect.Common") AddReference("QuantConnect.Algorithm") AddReference("QuantConnect.Algorithm.Framework") from System import * from QuantConnect import * from QuantConnect.Orders import * from QuantConnect.Algorithm import * from QuantConnect.Algorithm.Framework import * from QuantConnect.Algorithm.Framework.Execution import * from QuantConnect.Algorithm.Framework.Portfolio import * import numpy as np class ImmediateExecutionWithLogsModel(ExecutionModel): ''' Description: Custom implementation of IExecutionModel that immediately submits market orders to achieve the desired portfolio targets Details: This custom implementation includes logs with information about number of shares traded, prices, profit and profit percent for both long and short positions. ''' def __init__(self): ''' Initializes a new instance of the ImmediateExecutionModel class ''' self.targetsCollection = PortfolioTargetCollection() def Execute(self, algorithm, targets): ''' Description: Immediately submits orders for the specified portfolio targets Args: algorithm: The algorithm instance targets: The portfolio targets to be ordered ''' self.targetsCollection.AddRange(targets) if self.targetsCollection.Count > 0: for target in self.targetsCollection.OrderByMarginImpact(algorithm): # calculate remaining quantity to be ordered quantity = OrderSizing.GetUnorderedQuantity(algorithm, target) # check if quantity is actually different than zero if quantity != 0: # get the current holdings quantity, average price and cost beforeHoldingsQuantity = algorithm.ActiveSecurities[target.Symbol].Holdings.Quantity beforeHoldingsAvgPrice = algorithm.ActiveSecurities[target.Symbol].Holdings.AveragePrice beforeHoldingsCost = algorithm.ActiveSecurities[target.Symbol].Holdings.HoldingsCost # place market order algorithm.MarketOrder(target.Symbol, quantity) # get the new holdings quantity, average price and cost newHoldingsQuantity = beforeHoldingsQuantity + quantity newHoldingsAvgPrice = algorithm.ActiveSecurities[target.Symbol].Holdings.AveragePrice newHoldingsCost = algorithm.ActiveSecurities[target.Symbol].Holdings.HoldingsCost # this is just for market on open orders because the avg price and cost won't update until order gets filled # so to avoid getting previous values we just make them zero if newHoldingsAvgPrice == beforeHoldingsAvgPrice and newHoldingsCost == beforeHoldingsCost: newHoldingsAvgPrice = 0 newHoldingsCost = 0 # calculate the profit percent and dollar profit when closing positions lastPrice = algorithm.ActiveSecurities[target.Symbol].Price if beforeHoldingsAvgPrice != 0 and lastPrice != 0: # profit/loss percent for the trade tradeProfitPercent = (((lastPrice / beforeHoldingsAvgPrice) - 1) * np.sign(beforeHoldingsQuantity)) * 100 # dollar profit/loss for the trade tradeDollarProfit = (lastPrice - beforeHoldingsAvgPrice) * beforeHoldingsQuantity else: tradeProfitPercent = 0 tradeDollarProfit = 0 ### if we are not invested already the options are: ---------------------------------------------------------- # new holdings > 0 => going long # new holdings < 0 => going short if beforeHoldingsQuantity == 0: if newHoldingsQuantity > 0: algorithm.Log(str(target.Symbol.Value) + ': going long!' + ' current total holdings: ' + str(round(quantity, 0)) + '; current average price: ' + str(round(newHoldingsAvgPrice, 4)) + '; current total holdings cost: ' + str(round(newHoldingsCost, 2))) else: algorithm.Log(str(target.Symbol.Value) + ': going short!' + ' current total holdings: ' + str(round(quantity, 0)) + '; average price: ' + str(round(newHoldingsAvgPrice, 4)) + '; current total holdings cost: ' + str(round(newHoldingsCost, 2))) ### ----------------------------------------------------------------------------------------------------------- ### if we are already long the security the options are: ------------------------------------------------------ # new quantity > 0 => adding to long position # new quantity < 0 and new holdings < before holdings => partially selling long position # new quantity < 0 and new holdings = 0 => closing entire long position # new quantity < 0 and new holdings < 0 => closing entire long position and going short elif beforeHoldingsQuantity > 0: if quantity > 0: algorithm.Log(str(target.Symbol.Value) + ': adding to current long position!' + ' additional shares: ' + str(round(quantity, 0)) + '; current total holdings: ' + str(round(newHoldingsQuantity, 0)) + '; current average price: ' + str(round(newHoldingsAvgPrice, 4)) + '; current total holdings cost: ' + str(round(newHoldingsCost, 2))) elif newHoldingsQuantity > 0 and newHoldingsQuantity < beforeHoldingsQuantity: algorithm.Log(str(target.Symbol.Value) + ': selling part of current long position!' + ' selling shares: ' + str(round(-quantity, 0)) + '; current total holdings: ' + str(round(newHoldingsQuantity, 0)) + '; buying average price was: ' + str(round(beforeHoldingsAvgPrice, 4)) + '; approx. selling average price is: ' + str(round(lastPrice, 4)) + '; profit percent: ' + str(round(tradeProfitPercent, 4)) + '; dollar profit: ' + str(round(tradeDollarProfit, 2))) elif newHoldingsQuantity == 0: algorithm.Log(str(target.Symbol.Value) + ': closing down entire current long position!' + ' selling shares: ' + str(round(-quantity, 0)) + '; current total holdings: ' + str(round(newHoldingsQuantity, 0)) + '; buying average price was: ' + str(round(beforeHoldingsAvgPrice, 4)) + '; approx. selling average price is: ' + str(round(lastPrice, 4)) + '; profit percent: ' + str(round(tradeProfitPercent, 4)) + '; dollar profit: ' + str(round(tradeDollarProfit, 2))) elif newHoldingsQuantity < 0: algorithm.Log(str(target.Symbol.Value) + ': closing down entire current long position and going short!' + ' selling shares to close long: ' + str(round(beforeHoldingsQuantity, 0)) + '; profit percent on long position: ' + str(round(tradeProfitPercent, 4)) + '; dollar profit on long position: ' + str(round(tradeDollarProfit, 2)) + '; selling shares to go short: ' + str(round(-newHoldingsQuantity, 0)) + '; current total holdings: ' + str(round(newHoldingsQuantity, 0)) + '; current average price: ' + str(round(newHoldingsAvgPrice, 4)) + '; current total holdings cost: ' + str(round(newHoldingsCost, 2))) ### -------------------------------------------------------------------------------------------------------------- ### if we are already short the security the options are: -------------------------------------------------------- # new quantity < 0 => adding to short position # new quantity > 0 and new holdings > before holdings => partially buying back short position # new quantity > 0 and new holdings = 0 => closing entire short position # new quantity > 0 and new holdings > 0 => closing entire short position and going long elif beforeHoldingsQuantity < 0: if quantity < 0: algorithm.Log(str(target.Symbol.Value) + ': adding to current short position!' + ' additional shares: ' + str(round(quantity, 0)) + '; current total holdings: ' + str(round(newHoldingsQuantity, 0)) + '; current average price: ' + str(round(newHoldingsAvgPrice, 4)) + '; current total holdings cost: ' + str(round(newHoldingsCost, 2))) elif newHoldingsQuantity < 0 and newHoldingsQuantity > beforeHoldingsQuantity: algorithm.Log(str(target.Symbol.Value) + ': buying back part of current short position!' + ' buying back shares: ' + str(round(quantity, 0)) + '; current total holdings: ' + str(round(newHoldingsQuantity, 0)) + '; shorting average price was: ' + str(round(beforeHoldingsAvgPrice, 4)) + '; approx. buying back average price is: ' + str(round(lastPrice, 4)) + '; profit percent: ' + str(round(tradeProfitPercent, 4)) + '; dollar profit: ' + str(round(tradeDollarProfit, 2))) elif newHoldingsQuantity == 0: algorithm.Log(str(target.Symbol.Value) + ': closing down entire current short position!' + ' buying back shares: ' + str(round(quantity, 0)) + '; current total holdings: ' + str(round(newHoldingsQuantity, 0)) + '; shorting average price was: ' + str(round(beforeHoldingsAvgPrice, 4)) + '; approx. buying back average price is: ' + str(round(lastPrice, 4)) + '; profit percent: ' + str(round(tradeProfitPercent, 4)) + '; dollar profit: ' + str(round(tradeDollarProfit, 2))) elif newHoldingsQuantity > 0: algorithm.Log(str(target.Symbol.Value) + ': closing down entire current short position and going long!' + ' buying back shares to close short: ' + str(round(-beforeHoldingsQuantity, 0)) + '; profit percent on short position: ' + str(round(tradeProfitPercent, 4)) + '; dollar profit on short position: ' + str(round(tradeDollarProfit, 2)) + '; buying shares to go long: ' + str(round(newHoldingsQuantity, 0)) + '; current total holdings: ' + str(round(newHoldingsQuantity, 0)) + '; current average price: ' + str(round(newHoldingsAvgPrice, 4)) + '; current total holdings cost: ' + str(round(newHoldingsCost, 2))) ### --------------------------------------------------------------------------------------------------------------- self.targetsCollection.ClearFulfilled(algorithm)
from clr import AddReference AddReference("QuantConnect.Common") AddReference("QuantConnect.Algorithm.Framework") from QuantConnect import Resolution, Extensions from QuantConnect.Algorithm.Framework.Alphas import * from QuantConnect.Algorithm.Framework.Portfolio import * from itertools import groupby from datetime import datetime, timedelta from pytz import utc UTCMIN = datetime.min.replace(tzinfo=utc) class CustomEqualWeightingPortfolioConstructionModel(PortfolioConstructionModel): ''' Description: Provide a custom implementation of IPortfolioConstructionModel that gives equal weighting to all active securities Details: - The target percent holdings of each security is 1/N where N is the number of securities with active Up/Down insights - For InsightDirection.Up, long targets are returned - For InsightDirection.Down, short targets are returned - For InsightDirection.Flat, closing position targets are returned ''' def __init__(self, rebalancingParam = False): ''' Description: Initialize a new instance of CustomEqualWeightingPortfolioConstructionModel Args: rebalancingParam: Integer indicating the number of days for rebalancing (default set to False, no rebalance) - Independent of this parameter, the portfolio will be rebalanced when a security is added/removed/changed direction ''' self.insightCollection = InsightCollection() self.removedSymbols = [] self.nextExpiryTime = UTCMIN self.rebalancingTime = UTCMIN # if the rebalancing parameter is not False but a positive integer # convert rebalancingParam to timedelta and create rebalancingFunc if rebalancingParam > 0: self.rebalancing = True rebalancingParam = timedelta(days = rebalancingParam) self.rebalancingFunc = lambda dt: dt + rebalancingParam else: self.rebalancing = rebalancingParam def CreateTargets(self, algorithm, insights): ''' Description: Create portfolio targets from the specified insights Args: algorithm: The algorithm instance insights: The insights to create portfolio targets from Returns: An enumerable of portfolio targets to be sent to the execution model ''' targets = [] # check if we have new insights coming from the alpha model or if some existing insights have expired # or if we have removed symbols from the universe if (len(insights) == 0 and algorithm.UtcTime <= self.nextExpiryTime and self.removedSymbols is None): return targets # here we get the new insights and add them to our insight collection for insight in insights: self.insightCollection.Add(insight) # create flatten target for each security that was removed from the universe if self.removedSymbols is not None: universeDeselectionTargets = [ PortfolioTarget(symbol, 0) for symbol in self.removedSymbols ] targets.extend(universeDeselectionTargets) self.removedSymbols = None # get insight that haven't expired of each symbol that is still in the universe activeInsights = self.insightCollection.GetActiveInsights(algorithm.UtcTime) # get the last generated active insight for each symbol lastActiveInsights = [] for symbol, g in groupby(activeInsights, lambda x: x.Symbol): lastActiveInsights.append(sorted(g, key = lambda x: x.GeneratedTimeUtc)[-1]) # determine target percent for the given insights (check function DetermineTargetPercent for details) percents = self.DetermineTargetPercent(lastActiveInsights) errorSymbols = {} # check if we actually want to create new targets for the securities (check function ShouldCreateTargets for details) if self.ShouldCreateTargets(algorithm, lastActiveInsights): for insight in lastActiveInsights: target = PortfolioTarget.Percent(algorithm, insight.Symbol, percents[insight]) if not target is None: targets.append(target) else: errorSymbols[insight.Symbol] = insight.Symbol # update rebalancing time if self.rebalancing: self.rebalancingTime = self.rebalancingFunc(algorithm.UtcTime) # get expired insights and create flatten targets for each symbol expiredInsights = self.insightCollection.RemoveExpiredInsights(algorithm.UtcTime) expiredTargets = [] for symbol, f in groupby(expiredInsights, lambda x: x.Symbol): if not self.insightCollection.HasActiveInsights(symbol, algorithm.UtcTime) and not symbol in errorSymbols: expiredTargets.append(PortfolioTarget(symbol, 0)) continue targets.extend(expiredTargets) # here we update the next expiry date in the insight collection self.nextExpiryTime = self.insightCollection.GetNextExpiryTime() if self.nextExpiryTime is None: self.nextExpiryTime = UTCMIN return targets def DetermineTargetPercent(self, lastActiveInsights): ''' Description: Determine the target percent from each insight Args: lastActiveInsights: The active insights to generate a target from ''' result = {} # give equal weighting to each security count = sum(x.Direction != InsightDirection.Flat for x in lastActiveInsights) percent = 0 if count == 0 else 1.0 / count for insight in lastActiveInsights: result[insight] = insight.Direction * percent return result def ShouldCreateTargets(self, algorithm, lastActiveInsights): ''' Description: Determine whether we should rebalance the portfolio to keep equal weighting when: - It is time to rebalance regardless - We want to include some new security in the portfolio - We want to modify the direction of some existing security Args: lastActiveInsights: The last active insights to check ''' # it is time to rebalance if self.rebalancing and algorithm.UtcTime >= self.rebalancingTime: return True for insight in lastActiveInsights: # if there is an insight for a new security that's not invested, then rebalance if not algorithm.Portfolio[insight.Symbol].Invested and insight.Direction != InsightDirection.Flat: return True # if there is an insight to close a long position, then rebalance elif algorithm.Portfolio[insight.Symbol].IsLong and insight.Direction != InsightDirection.Up: return True # if there is an insight to close a short position, then rebalance elif algorithm.Portfolio[insight.Symbol].IsShort and insight.Direction != InsightDirection.Down: return True else: continue return False def OnSecuritiesChanged(self, algorithm, changes): ''' Description: Event fired each time the we add/remove securities from the data feed Args: algorithm: The algorithm instance that experienced the change in securities changes: The security additions and removals from the algorithm ''' # get removed symbol and invalidate them in the insight collection self.removedSymbols = [x.Symbol for x in changes.RemovedSecurities] self.insightCollection.Clear(self.removedSymbols)
### PRODUCT INFORMATION -------------------------------------------------------------------------------- # Copyright InnoQuantivity.com, granted to the public domain. # Use entirely at your own risk. # This algorithm contains open source code from other sources and no claim is being made to such code. # Do not remove this copyright notice. ### ---------------------------------------------------------------------------------------------------- from LongShortMovingAverageCrossoverAlphaCreation import LongShortMovingAverageCrossoverAlphaCreationModel from CustomEqualWeightingPortfolioConstruction import CustomEqualWeightingPortfolioConstructionModel from ImmediateExecutionWithLogs import ImmediateExecutionWithLogsModel from System.Drawing import Color class LongOnlyMovingAverageCrossoverFrameworkAlgorithm(QCAlgorithmFramework): ''' Trading Logic: - This algorithm is a long-short market timing strategy that buys when a Short Moving Average crosses above a Long Moving Average and sells short when it crosses below - This is a simple technique commonly used to time the market, and can be combined with other strategies to reduce drawdown and improve Sharpe Ratio Modules: Universe: Manual input of tickers Alpha: Creation of Up/Down Insights based on Moving Average Crossover - Up Insights when Short Moving Average crosses above Long Moving Average (to go Long) - Down Insights when Long Moving Average crosses below Short Moving Average (to go Short) Portfolio: Equal Weighting (allocate equal amounts of portfolio % to each security) - If some of the tickers did not exist at the start date, it will start processing them when they first appeared in the market - To rebalance the portfolio periodically to ensure equal weighting, change the rebalancingParam below Execution: Immediate Execution with Market Orders Risk: Null ''' def Initialize(self): ### user-defined inputs -------------------------------------------------------------- # set timeframe for backtest and starting cash self.SetStartDate(2015, 1, 1) # set start date #self.SetEndDate(2019, 1, 1) # set end date self.SetCash(100000) # set strategy cash # set data resolution (Resolution.Daily, Resolution.Hour, Resolution.Minute) resolution = Resolution.Daily # add tickers to the list tickers = ['SPY'] # select the periods for the moving averages shortPeriodSMA = 50 longPeriodSMA = 200 # rebalancing period (to enable rebalancing enter an integer for number of days, e.g. 1, 7, 30, 365) rebalancingParam = False ### ----------------------------------------------------------------------------------- # set the brokerage model for slippage and fees self.SetSecurityInitializer(self.CustomSecurityInitializer) self.SetBrokerageModel(AlphaStreamsBrokerageModel()) # set requested data resolution and disable fill forward data self.UniverseSettings.Resolution = resolution self.UniverseSettings.FillForward = False # initialize the moving average crossover plots for all tickers # we only plot if we have less than 5 tickers to avoid creating too many charts if len(tickers) < 5: allowPlots = True for ticker in tickers: smaPlot = Chart('Moving Average Crossover ' + str(ticker)) smaPlot.AddSeries(Series('Short SMA', SeriesType.Line, '$', Color.Blue)) smaPlot.AddSeries(Series('Long SMA', SeriesType.Line, '$', Color.Black)) smaPlot.AddSeries(Series('Buy', SeriesType.Scatter, '$', Color.Green, ScatterMarkerSymbol.Triangle)) smaPlot.AddSeries(Series('Sell Short', SeriesType.Scatter, '$', Color.Red, ScatterMarkerSymbol.Triangle)) self.AddChart(smaPlot) else: allowPlots = False symbols = [] # loop through the list and create symbols for the universe for i in range(len(tickers)): symbols.append(Symbol.Create(tickers[i], SecurityType.Equity, Market.USA)) # select modules self.SetUniverseSelection(ManualUniverseSelectionModel(symbols)) self.SetAlpha(LongShortMovingAverageCrossoverAlphaCreationModel(shortPeriodSMA = shortPeriodSMA, longPeriodSMA = longPeriodSMA, resolution = resolution, allowPlots = allowPlots)) self.SetPortfolioConstruction(CustomEqualWeightingPortfolioConstructionModel(rebalancingParam = rebalancingParam)) self.SetExecution(ImmediateExecutionWithLogsModel()) self.SetRiskManagement(NullRiskManagementModel()) def CustomSecurityInitializer(self, security): ''' Description: Initialize the security with adjusted prices Args: security: Security which characteristics we want to change ''' security.SetDataNormalizationMode(DataNormalizationMode.Adjusted)
from clr import AddReference AddReference("System") AddReference("QuantConnect.Common") AddReference("QuantConnect.Algorithm") AddReference("QuantConnect.Algorithm.Framework") from System import * from QuantConnect import * from QuantConnect.Algorithm import * from QuantConnect.Algorithm.Framework import * from QuantConnect.Algorithm.Framework.Alphas import AlphaModel, Insight, InsightType, InsightDirection import numpy as np class LongShortMovingAverageCrossoverAlphaCreationModel(AlphaModel): ''' * Refer to the research notebook for a visual explanation of this alpha logic Description: This Alpha model creates InsightDirection.Up to go Long when a Short Moving Average crosses above a Long Moving Average, and InsightDirection.Down to go Short when it crosses below Details: The important things to understand here are: - We can retrieve historical data by calling algorith.History(symbol, bar_count, resolution) - We can easily orginise the code in Python with a class to store calculations for indicators for each symbol - We can use InsightDirection.Up/InsightDirection.Down to go Long/Short ''' def __init__(self, shortPeriodSMA = 50, longPeriodSMA = 200, resolution = Resolution.Daily, allowPlots = False): self.shortPeriodSMA = shortPeriodSMA # period for short moving average self.longPeriodSMA = longPeriodSMA # period for long moving average self.resolution = resolution # resolution for historical data self.allowPlots = allowPlots # boolean to allow plots or not self.securities = [] # list to store securities to consider self.calculations = {} # store calculations self.insightExpiry = Time.Multiply(Extensions.ToTimeSpan(resolution), 0.25) # insight duration def Update(self, algorithm, data): # get the symbols for which we have already calculate indicators to simply add last data point to update them # we separate this from new symbols to avoid calling full history for all securities every time currentSymbols = [x.Symbol for x in self.securities if x.Symbol in self.calculations.keys()] if len(currentSymbols) > 0: historyCurrentSymbols = algorithm.History(currentSymbols, 1, self.resolution) # get the new symbols for which we need to warm up indicators from scratch newSymbols = [x.Symbol for x in self.securities if x.Symbol not in self.calculations.keys()] if len(newSymbols) > 0: historyNewSymbols = algorithm.History(newSymbols, self.longPeriodSMA + 1, self.resolution) # now loop through securities to create/update indicators for security in self.securities: if security.Symbol in newSymbols: self.calculations[security.Symbol] = SymbolData(security.Symbol, self.shortPeriodSMA, self.longPeriodSMA) history = historyNewSymbols else: history = historyCurrentSymbols try: self.calculations[security.Symbol].UpdateIndicators(history) except Exception as e: algorithm.Log('removing from calculations due to ' + str(e)) self.calculations.pop(security.Symbol, None) continue ### generate insights ------------------------------------------------------------------------------------------------------ insights = [] # list to store the new insights to be created # loop through active securities and generate insights for symbol, symbolData in self.calculations.items(): # check if there's new data for the security or we're already invested # if there's no new data but we're invested, we keep updating the insight since we don't really need to place orders if data.ContainsKey(symbol) or algorithm.Portfolio[symbol].Invested: # if short sma just crossed above long sma, we go long with an InsightDirection.Up if symbolData.crossAbove: insightDirection = InsightDirection.Up if self.allowPlots: algorithm.Plot('Moving Average Crossover ' + str(symbol.Value), 'Buy', float(symbolData.currentShortSMA)) # if short sma just crossed below long sma, we go short with an InsightDirection.Down elif symbolData.crossBelow: insightDirection = InsightDirection.Down if self.allowPlots: algorithm.Plot('Moving Average Crossover ' + str(symbol.Value), 'Sell Short', float(symbolData.currentShortSMA)) # if no cross happened but we are currently Long, update the InsightDirection.Up to stay Long for another bar elif algorithm.Portfolio[symbol].IsLong: insightDirection = InsightDirection.Up # if no cross happened but we are currently Short, update the InsightDirection.Down to stay Short for another bar elif algorithm.Portfolio[symbol].IsShort: insightDirection = InsightDirection.Down # if no cross has happened and we are not invested, emit an InsightDirection.Flat to stay in cash for another bar else: insightDirection = InsightDirection.Flat # append the insights list with the prediction for each symbol insights.append(Insight.Price(symbol, self.insightExpiry, insightDirection)) # update the charts if self.allowPlots and symbolData.closePrices.IsReady: algorithm.Plot('Moving Average Crossover ' + str(symbol.Value), 'Short SMA', float(symbolData.currentShortSMA)) algorithm.Plot('Moving Average Crossover ' + str(symbol.Value), 'Long SMA', float(symbolData.currentLongSMA)) else: algorithm.Log('excluding this security due to missing data: ' + str(symbol.Value)) return insights def OnSecuritiesChanged(self, algorithm, changes): ''' Description: Event fired each time the we add/remove securities from the data feed Args: algorithm: The algorithm instance that experienced the change in securities changes: The security additions and removals from the algorithm ''' # add new securities for added in changes.AddedSecurities: self.securities.append(added) # remove securities for removed in changes.RemovedSecurities: if removed in self.securities: self.securities.remove(removed) self.calculations.pop(removed.Symbol, None) # this class is coming from the research nothebook (check its logic there) class SymbolData: ''' make all the calculations needed for each symbol including all the indicators and whether the ticker meets the criteria ''' def __init__(self, symbol, shortPeriodSMA, longPeriodSMA): self.Symbol = symbol self.shortPeriod = shortPeriodSMA self.longPeriod = longPeriodSMA self.closePrices = RollingWindow[float](longPeriodSMA + 1) # method to update the rolling window def UpdateIndicators(self, history): if str(self.Symbol) in history.index: for index, row in history.loc[str(self.Symbol)].iterrows(): if 'close' in row: self.closePrices.Add(row['close']) else: raise Exception('missing some close prices for: ' + str(self.Symbol.Value)) else: raise Exception('symbol not in history index: ' + str(self.Symbol.Value)) # convert the rolling window to list for easier manipulation @property def listClosePrices(self): if self.closePrices.IsReady: return [float(x) for x in self.closePrices] else: return [0] # update short and long current SMA @property def currentShortSMA(self): return np.mean(self.listClosePrices[:self.shortPeriod]) @property def currentLongSMA(self): return np.mean(self.listClosePrices[:self.longPeriod]) # update short and long before SMA (the SMA from the previous trading bar) @property def beforeShortSMA(self): return np.mean(self.listClosePrices[1:][:self.shortPeriod]) @property def beforeLongSMA(self): return np.mean(self.listClosePrices[1:][:self.longPeriod]) # update boolean for cross above/below of moving averages @property def crossAbove(self): return (self.currentShortSMA > self.currentLongSMA) and (self.beforeShortSMA < self.beforeLongSMA) @property def crossBelow(self): return (self.currentShortSMA < self.currentLongSMA) and (self.beforeShortSMA > self.beforeLongSMA)