Overall Statistics |
Total Trades 237 Average Win 0.46% Average Loss -0.28% Compounding Annual Return -1.717% Drawdown 9.100% Expectancy -0.112 Net Profit -3.647% Sharpe Ratio -1.525 Sortino Ratio -1.719 Probabilistic Sharpe Ratio 0.789% Loss Rate 66% Win Rate 34% Profit-Loss Ratio 1.62 Alpha 0 Beta 0 Annual Standard Deviation 0.032 Annual Variance 0.001 Information Ratio -0.362 Tracking Error 0.032 Treynor Ratio 0 Total Fees $585.39 Estimated Strategy Capacity $620000000.00 Lowest Capacity Asset CL YG5JR83D8NNL Portfolio Turnover 2.65% |
from AlgorithmImports import * from QuantConnect.DataSource import * class USFuturesSecurityMasterDataClassicAlgorithm(QCAlgorithm): def Initialize(self) -> None: self.SetCash(1000000) self.SetStartDate(2022, 1, 1) # self.SetEndDate(2021, 1, 1) # Set brokerage Model self.SetBrokerageModel(BrokerageName.InteractiveBrokersBrokerage, AccountType.Margin) self.SetSecurityInitializer(BrokerageModelSecurityInitializer( self.BrokerageModel, FuncSecuritySeeder(self.GetLastKnownPrices))) # Requesting data self.continuous_contract = self.AddFuture("CL", dataNormalizationMode = DataNormalizationMode.BackwardsRatio, dataMappingMode = DataMappingMode.OpenInterest, contractDepthOffset = 0) self.symbol = self.continuous_contract.Symbol # Historical data history = self.History(self.symbol, 100, Resolution.Daily) self.Debug(f"We got {len(history)} items from our history request") # Initialize BollingerBands indicator self.bollinger_bands = self.BB(self.symbol, 10, 2, MovingAverageType.Simple, Resolution.Daily) # Inside your Initialize method, after fetching the historical data: if not history.empty: for index, row in history.iterrows(): # The 'index' variable already contains a datetime object in QuantConnect. time = index[1] price = float(row["close"]) # Update the Bollinger Bands indicator with the time and price self.bollinger_bands.Update(time, price) def OnData(self, slice: Slice) -> None: # Accessing data for symbol, changed_event in slice.SymbolChangedEvents.items(): self.HandleRollover(changed_event) mapped_symbol = self.continuous_contract.Mapped if not (slice.Bars.ContainsKey(self.symbol) and self.bollinger_bands.IsReady and mapped_symbol): return # Trading logic price = slice.Bars[self.symbol].Close if price > self.bollinger_bands.UpperBand.Current.Value and not self.Portfolio[mapped_symbol].IsLong: self.MarketOrder(mapped_symbol, 1) elif price < self.bollinger_bands.LowerBand.Current.Value and not self.Portfolio[mapped_symbol].IsShort: self.MarketOrder(mapped_symbol, -1) def HandleRollover(self, changed_event): old_symbol = changed_event.OldSymbol new_symbol = changed_event.NewSymbol tag = f"Rollover at {self.Time}: {old_symbol} -> {new_symbol}" quantity = self.Portfolio[old_symbol].Quantity # Rolling over: to liquidate any position of the old mapped contract and switch to the newly mapped contract self.Liquidate(old_symbol, tag=tag) if quantity != 0: self.MarketOrder(new_symbol, quantity, tag=tag) self.Log(tag)
from AlgorithmImports import * from QuantConnect.DataSource import * class USFuturesSecurityMasterDataClassicAlgorithm (QCAlgorithm): threshold = 0.01 # 1% def Initialize(self) -> None: self.SetCash(1000000) self.SetStartDate(2022, 1, 1) # self.SetEndDate(2021, 1, 1) #Set brokerage Model self.SetBrokerageModel(BrokerageName.InteractiveBrokersBrokerage, AccountType.Margin) self.SetSecurityInitializer(BrokerageModelSecurityInitializer( self.BrokerageModel, FuncSecuritySeeder(self.GetLastKnownPrices))) # Requesting data self.continuous_contract = self.AddFuture("CL", dataNormalizationMode = DataNormalizationMode.BackwardsRatio, dataMappingMode = DataMappingMode.OpenInterest, contractDepthOffset = 0) self.symbol = self.continuous_contract.Symbol # Historical data history = self.History(self.symbol, 100, Resolution.Daily) self.Debug(f"We got {len(history)} items from our history request") self.sma = self.SMA(self.symbol, 10, Resolution.Daily) if not history.empty: for time, row in history.droplevel(0).loc[self.symbol].iterrows(): self.sma.Update(IndicatorDataPoint(time, row.close)) def OnData(self, slice: Slice) -> None: # Accessing data for symbol, changed_event in slice.SymbolChangedEvents.items(): self.HandleRollover(changed_event) mapped_symbol = self.continuous_contract.Mapped if not (slice.Bars.ContainsKey(self.symbol) and self.sma.IsReady and mapped_symbol): return # Trading logic if slice.Bars[self.symbol].Price > self.sma.Current.Value * (1+self.threshold) and not self.Portfolio[mapped_symbol].IsLong: self.MarketOrder(mapped_symbol, 1) elif slice.Bars[self.symbol].Price < self.sma.Current.Value * (1-self.threshold) and not self.Portfolio[mapped_symbol].IsShort: self.MarketOrder(mapped_symbol, -1) def HandleRollover(self, changed_event): old_symbol = changed_event.OldSymbol new_symbol = changed_event.NewSymbol tag = f"Rollover at {self.Time}: {old_symbol} -> {new_symbol}" quantity = self.Portfolio[old_symbol].Quantity # Rolling over: to liquidate any position of the old mapped contract and switch to the newly mapped contract self.Liquidate(old_symbol, tag=tag) if quantity != 0: self.MarketOrder(new_symbol, quantity, tag = tag) self.Log(tag)
from AlgorithmImports import * from QuantConnect.DataSource import * from datetime import timedelta class USFuturesSecurityMasterDataClassicAlgorithm(QCAlgorithm): threshold = 0.01 # 1% atr_period = 100 # ATR calculation period risk_factor = 0.002 # 0.2% risk per trade def Initialize(self) -> None: self.SetCash(1000000) self.SetStartDate(2022, 1, 1) # self.SetEndDate(2021, 1, 1) self.SetBrokerageModel(BrokerageName.InteractiveBrokersBrokerage, AccountType.Margin) # Add Future contract and get the SymbolProperties self.continuous_contract = self.AddFuture("GC", dataNormalizationMode = DataNormalizationMode.BackwardsRatio, dataMappingMode = DataMappingMode.OpenInterest, contractDepthOffset = 0) self.symbol = self.continuous_contract.Symbol properties = self.Securities[self.symbol].SymbolProperties self.contractMultiplier = properties.ContractMultiplier self.Debug(f"Contract Multiplier: {self.contractMultiplier}") # Initialize ATR self.atr = self.ATR(self.symbol, self.atr_period, MovingAverageType.Simple, Resolution.Daily) # Initialize SMA self.sma = self.SMA(self.symbol, 10, Resolution.Daily) self.Debug("ATR and SMA indicators have been initialized.") history = self.History(self.symbol, 100, Resolution.Daily) if not history.empty: # Ensure we're iterating over the DataFrame correctly for index, row in history.iterrows(): # The 'index' should be a tuple containing (symbol, time) if isinstance(index, tuple) and len(index) == 2: # Unpack the tuple symbol, time = index # Now proceed as before # ... else: self.Debug(f"Unexpected index structure: {index}") self.SetWarmUp(timedelta(days=self.atr_period)) def OnData(self, slice: Slice) -> None: if self.IsWarmingUp: return for symbol, changed_event in slice.SymbolChangedEvents.items(): self.HandleRollover(changed_event) mapped_symbol = self.continuous_contract.Mapped if not (slice.Bars.ContainsKey(self.symbol) and self.sma.IsReady and self.atr.IsReady and mapped_symbol): return # Calculate the position size using ATR and contract multiplier equity = self.Portfolio.TotalPortfolioValue position_size = self.CalculatePositionSize(equity, self.atr.Current.Value, self.contractMultiplier) # Trading logic # Check if the position size is greater than zero before trading if position_size > 0: # Trading logic price = slice.Bars[self.symbol].Price if price > self.sma.Current.Value * (1 + self.threshold) and not self.Portfolio[mapped_symbol].IsLong: self.Debug(f"Going long: {position_size} contracts.") self.MarketOrder(mapped_symbol, position_size) elif price < self.sma.Current.Value * (1 - self.threshold) and not self.Portfolio[mapped_symbol].IsShort: self.Debug(f"Going short: {position_size} contracts.") self.MarketOrder(mapped_symbol, -position_size) else: self.Debug("Position size is zero, no trade executed.") def CalculatePositionSize(self, equity, atr, contractMultiplier): dollar_volatility = atr * contractMultiplier num_contracts = (self.risk_factor * equity) / dollar_volatility self.Debug(f"Dollar volatility: {dollar_volatility}, Number of contracts: {num_contracts}") # Check if the calculated number of contracts is greater than zero if num_contracts > 0: return int(num_contracts) # Ensure we only buy whole contracts else: self.Debug(f"Calculated position size is zero. Equity: {equity}, ATR: {atr}, Contract Multiplier: {contractMultiplier}") return 0 def HandleRollover(self, changed_event): old_symbol = changed_event.OldSymbol new_symbol = changed_event.NewSymbol tag = f"Rollover at {self.Time}: {old_symbol} -> {new_symbol}" quantity = self.Portfolio[old_symbol].Quantity # Rolling over: to liquidate any position of the old mapped contract and switch to the newly mapped contract self.Liquidate(old_symbol, tag=tag) if quantity != 0: self.Debug(f"Handling rollover from {old_symbol} to {new_symbol}, quantity: {quantity}") self.MarketOrder(new_symbol, quantity, tag = tag) self.Log(tag)
from AlgorithmImports import * from QuantConnect.DataSource import * class USFuturesSecurityMasterDataClassicAlgorithm (QCAlgorithm): threshold = 0.01 # 1% def Initialize(self) -> None: self.SetCash(1000000) self.SetStartDate(2022, 1, 1) # self.SetEndDate(2021, 1, 1) #Set brokerage Model self.SetBrokerageModel(BrokerageName.InteractiveBrokersBrokerage, AccountType.Margin) self.SetSecurityInitializer(BrokerageModelSecurityInitializer( self.BrokerageModel, FuncSecuritySeeder(self.GetLastKnownPrices))) # Requesting data self.continuous_contract = self.AddFuture("CL", dataNormalizationMode = DataNormalizationMode.BackwardsRatio, dataMappingMode = DataMappingMode.OpenInterest, contractDepthOffset = 0) self.symbol = self.continuous_contract.Symbol # Historical data history = self.History(self.symbol, 100, Resolution.Daily) self.Debug(f"We got {len(history)} items from our history request") self.sma = self.SMA(self.symbol, 10, Resolution.Daily) if not history.empty: for time, row in history.droplevel(0).loc[self.symbol].iterrows(): self.sma.Update(IndicatorDataPoint(time, row.close)) def OnData(self, slice: Slice) -> None: # Accessing data for symbol, changed_event in slice.SymbolChangedEvents.items(): self.HandleRollover(changed_event) mapped_symbol = self.continuous_contract.Mapped if not (slice.Bars.ContainsKey(self.symbol) and self.sma.IsReady and mapped_symbol): return # Trading logic if slice.Bars[self.symbol].Price > self.sma.Current.Value * (1+self.threshold) and not self.Portfolio[mapped_symbol].IsLong: self.MarketOrder(mapped_symbol, 1) elif slice.Bars[self.symbol].Price < self.sma.Current.Value * (1-self.threshold) and not self.Portfolio[mapped_symbol].IsShort: self.MarketOrder(mapped_symbol, -1) def HandleRollover(self, changed_event): old_symbol = changed_event.OldSymbol new_symbol = changed_event.NewSymbol tag = f"Rollover at {self.Time}: {old_symbol} -> {new_symbol}" quantity = self.Portfolio[old_symbol].Quantity # Rolling over: to liquidate any position of the old mapped contract and switch to the newly mapped contract self.Liquidate(old_symbol, tag=tag) if quantity != 0: self.MarketOrder(new_symbol, quantity, tag = tag) self.Log(tag)