Overall Statistics |
Total Trades 126 Average Win 0.07% Average Loss -0.09% Compounding Annual Return -24.263% Drawdown 2.600% Expectancy -0.382 Net Profit -2.258% Sharpe Ratio -6.879 Probabilistic Sharpe Ratio 0.178% Loss Rate 64% Win Rate 36% Profit-Loss Ratio 0.73 Alpha -0.235 Beta -0.118 Annual Standard Deviation 0.03 Annual Variance 0.001 Information Ratio 0.271 Tracking Error 0.17 Treynor Ratio 1.747 Total Fees $126.84 Estimated Strategy Capacity $4200000.00 |
#from riskManagement import * from datetime import timedelta class EMAMomentumUniverse(QCAlgorithm): def Initialize(self): self.SetStartDate(2015, 1, 1) self.SetEndDate(2015, 2, 1) #self.SetEndDate(2019, 4, 1) self.SetCash(100000) self.SetBenchmark("SPY") self.UniverseSettings.Resolution = Resolution.Minute #setting the coarse filter for investment universe self.AddUniverse(self.CoarseSelectionFunction) #self.AddUniverseSelection(LiquidValueUniverseSelectionModel()) #self.AddRiskManagement( ProtecOptions() ) #declaring dictionary averages #self.SetRiskManagement(MaximumDrawdownPercentPerSecurityCustom(0.10)) self.SetExecution(ImmediateExecutionModel()) self.averages = { } # self.underlyingsymbol = equity.Symbol self.hist = RollingWindow[float](390*22) self.contract = None self.SetSecurityInitializer(self.security_initializer) self.buys = [] self.sells = [] self.contract_by_equity = {} def security_initializer(self, security): if security.Type == SecurityType.Equity: security.SetDataNormalizationMode(DataNormalizationMode.Raw) elif security.Type == SecurityType.Option: security.SetMarketPrice(self.GetLastKnownPrice(security)) def CoarseSelectionFunction(self, universe): #Main output, creating a list where the below applies selected = [] #Sort by dollar volume using lambda function, declare universe as EQTY > $10 universe = sorted(universe, key=lambda c: c.DollarVolume, reverse=True) universe = [c for c in universe if c.Price > 10][:100] #loop for all stocks in universe, uses all coarse data for coarse in universe: symbol = coarse.Symbol #Check for instance of SelectionData for this symbol in averages dictionary if symbol not in self.averages: # 1. Call history to get an array of 200 days of history data history = self.History(symbol, 300, Resolution.Daily) if history.empty or 'close' not in history.columns: continue #2. Create new instance of SelectionData with the 'history' result self.averages[symbol] = SelectionData(history.loc[symbol].close) #Update symbol with latest coarse.AdjustedPrice data \\ accesing method and pass params self.averages[symbol].update(self.Time, coarse.AdjustedPrice) #Check if indicators are ready, and that the 50 day EMA is > the 200 day EMA; then add to list 'selected' #Access property of class as dictionary item if self.averages[symbol].is_ready() and self.averages[symbol].fast > self.averages[symbol].slow: selected.append(symbol) #update the selected list with the top 10 results return selected[:10] def OnData(self, data): sells = self.sells.copy() for security in sells: self.Liquidate(security.Symbol) # Is this necessary?? if security.Symbol in self.contract_by_equity: contract = self.contract_by_equity.pop(security.Symbol) self.Liquidate(contract) self.sells.remove(security) buys = self.buys.copy() for security in buys: if data.ContainsKey(security.Symbol) and data[security.Symbol] is not None and security.Type == SecurityType.Equity: self.SetHoldings(security.Symbol, 0.05) self.contract_by_equity[security.Symbol] = self.BuyPut(security.Symbol) self.buys.remove(security) #Method for monitoring if universe has changed def OnSecuritiesChanged(self, changes): for security in changes.AddedSecurities: self.buys.append(security) for security in changes.RemovedSecurities: self.sells.append(security) #def SellCall(self, sec): '''def SellCall(self): #self.symbol = sec contracts = self.OptionChainProvider.GetOptionContractList(self.symbol, self.Time) self.Debug(f"SellCall: {len(contracts)}") if len(contracts) == 0: return min_expiry = 0 max_expiry = 40 filtered_contracts = [i for i in contracts if min_expiry <= (i.ID.Date.date() - self.Time.date()).days <= max_expiry] call = [x for x in filtered_contracts if x.ID.OptionRight == 0] if len(call) == 0: return # sorted the contracts according to their expiration dates and choose the ATM options price = self.Securities[self.symbol].Price self.contract = sorted(sorted(call, key = lambda x: abs(price - x.ID.StrikePrice)), key = lambda x: x.ID.Date, reverse=True)[0] self.AddOptionContract(self.contract, Resolution.Minute) self.MarketOrder(self.contract, -1) ''' #def BuyPut(self, sec): def BuyPut(self, symbol): contracts = self.OptionChainProvider.GetOptionContractList(symbol, self.Time) self.Debug(f"BuyPut: {len(contracts)}") #contracts = self.OptionChainProvider.GetOptionChains(self.Symbol, self.Time.date()) if len(contracts) == 0: return min_expiry = 0 max_expiry = 40 filtered_contracts = [i for i in contracts if min_expiry <= (i.ID.Date.date() - self.Time.date()).days <= max_expiry] put = [x for x in filtered_contracts if x.ID.OptionRight == 1] if len(put) == 0: return price = self.Securities[symbol].Price # sorted the contracts according to their expiration dates and choose the ATM options self.contract = sorted(sorted(put, key = lambda x: abs(price - x.ID.StrikePrice)), key = lambda x: x.ID.Date, reverse=True)[0] self.AddOptionContract(self.contract, Resolution.Minute) self.MarketOrder(self.contract, 1) return self.contract class SelectionData(object): def __init__(self, closes): self.tolerance = 1.01 self.fast = ExponentialMovingAverage(100) self.slow = ExponentialMovingAverage(300) self.is_uptrend = False self.scale = 0 for time, close in closes.iteritems(): self.fast.Update(time, close) self.slow.Update(time, close) def update(self, time, value): if self.fast.Update(time, value) and self.slow.Update(time, value): fast = self.fast.Current.Value slow = self.slow.Current.Value self.is_uptrend = fast > slow * self.tolerance if self.is_uptrend: self.scale = (fast - slow) / ((fast + slow) / 2.0) def is_ready(self): return self.slow.IsReady