Skip to content

Fix composite bars subscription #2337

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 1 commit into from
Feb 19, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
49 changes: 27 additions & 22 deletions examples/backtest/databento_test_request_bars.py
Original file line number Diff line number Diff line change
Expand Up @@ -120,21 +120,19 @@ def on_start(self):
end_historical_bars = utc_now - pd.Timedelta(minutes=self.config.historical_end_delay)
self.user_log(f"on_start: {start_historical_bars=}, {end_historical_bars=}")

# external_bar_type = BarType.from_str(f"{self.config.symbol_id}-1-MINUTE-LAST-EXTERNAL")
# self.subscribe_bars(external_bar_type)

bar_type_1 = BarType.from_str(
self.external_bar_type = BarType.from_str(f"{self.config.symbol_id}-1-MINUTE-LAST-EXTERNAL")
self.bar_type_1 = BarType.from_str(
f"{self.config.symbol_id}-2-MINUTE-LAST-INTERNAL@1-MINUTE-EXTERNAL",
)
bar_type_2 = BarType.from_str(
self.bar_type_2 = BarType.from_str(
f"{self.config.symbol_id}-4-MINUTE-LAST-INTERNAL@2-MINUTE-INTERNAL",
)
bar_type_3 = BarType.from_str(
self.bar_type_3 = BarType.from_str(
f"{self.config.symbol_id}-5-MINUTE-LAST-INTERNAL@1-MINUTE-EXTERNAL",
)

self.request_aggregated_bars(
[bar_type_1, bar_type_2, bar_type_3],
[self.bar_type_1, self.bar_type_2, self.bar_type_3],
start=start_historical_bars,
end=end_historical_bars,
update_subscriptions=True,
Expand All @@ -143,15 +141,16 @@ def on_start(self):

self.user_log("request_aggregated_bars done")

self.subscribe_bars(bar_type_1)
self.subscribe_bars(bar_type_2)
self.subscribe_bars(bar_type_3)
self.subscribe_bars(self.external_bar_type)
self.subscribe_bars(self.bar_type_1)
self.subscribe_bars(self.bar_type_2)
self.subscribe_bars(self.bar_type_3)

self.user_log("subscribe_bars done")

#### for testing indicators with bars
# self.register_indicator_for_bars(external_bar_type, self.external_sma)
# self.register_indicator_for_bars(composite_bar_type, self.composite_sma)
# self.register_indicator_for_bars(self.external_bar_type, self.external_sma)
# self.register_indicator_for_bars(self.bar_type_1, self.composite_sma)

######### for testing quotes
# utc_now = self._clock.utc_now()
Expand All @@ -162,19 +161,19 @@ def on_start(self):
# )
# self.user_log(f"on_start: {start_historical_bars=}, {end_historical_bars=}")

# bar_type_1 = BarType.from_str(f"{self.config.symbol_id}-1-MINUTE-BID-INTERNAL")
# bar_type_2 = BarType.from_str(f"{self.config.symbol_id}-2-MINUTE-BID-INTERNAL@1-MINUTE-INTERNAL")
# self.bar_type_1 = BarType.from_str(f"{self.config.symbol_id}-1-MINUTE-BID-INTERNAL")
# self.bar_type_2 = BarType.from_str(f"{self.config.symbol_id}-2-MINUTE-BID-INTERNAL@1-MINUTE-INTERNAL")

# self.request_aggregated_bars(
# [bar_type_1, bar_type_2],
# [self.bar_type_1, self.bar_type_2],
# start=start_historical_bars,
# end=end_historical_bars,
# update_subscriptions=True,
# include_external_data=False,
# )

# self.subscribe_bars(bar_type_1)
# self.subscribe_bars(bar_type_2)
# self.subscribe_bars(self.bar_type_1)
# self.subscribe_bars(self.bar_type_2)

######### for testing trades
# utc_now = self._clock.utc_now()
Expand All @@ -185,19 +184,19 @@ def on_start(self):
# )
# self.user_log(f"on_start: {start_historical_bars=}, {end_historical_bars=}")

# bar_type_1 = BarType.from_str(f"{self.config.symbol_id}-1-MINUTE-LAST-INTERNAL")
# bar_type_2 = BarType.from_str(f"{self.config.symbol_id}-2-MINUTE-LAST-INTERNAL@1-MINUTE-INTERNAL")
# self.bar_type_1 = BarType.from_str(f"{self.config.symbol_id}-1-MINUTE-LAST-INTERNAL")
# self.bar_type_2 = BarType.from_str(f"{self.config.symbol_id}-2-MINUTE-LAST-INTERNAL@1-MINUTE-INTERNAL")

# self.request_aggregated_bars(
# [bar_type_1, bar_type_2],
# [self.bar_type_1, self.bar_type_2],
# start=start_historical_bars,
# end=end_historical_bars,
# update_subscriptions=True,
# include_external_data=False,
# )

# self.subscribe_bars(bar_type_1)
# self.subscribe_bars(bar_type_2)
# self.subscribe_bars(self.bar_type_1)
# self.subscribe_bars(self.bar_type_2)

def on_historical_data(self, data):
if type(data) is Bar:
Expand All @@ -219,6 +218,12 @@ def on_bar(self, bar):
def user_log(self, msg):
self.log.warning(str(msg), color=LogColor.GREEN)

def on_stop(self):
# self.subscribe_bars(self.external_bar_type)
self.unsubscribe_bars(self.bar_type_1)
self.unsubscribe_bars(self.bar_type_2)
# self.subscribe_bars(self.bar_type_3)


# %% [markdown]
# ## backtest node
Expand Down
63 changes: 55 additions & 8 deletions nautilus_trader/data/engine.pyx
Original file line number Diff line number Diff line change
Expand Up @@ -767,7 +767,6 @@ cdef class DataEngine(Component):
self._log.error("Cannot subscribe for synthetic instrument `OrderBook` data")
return


cdef:
uint64_t interval_ns
uint64_t timestamp_ns
Expand Down Expand Up @@ -2063,7 +2062,16 @@ cdef class DataEngine(Component):
topic=f"data.bars.{composite_bar_type}",
handler=aggregator.handle_bar,
)
self._handle_subscribe_bars(client, command)
bars_subscription = SubscribeBars(
command.id,
composite_bar_type,
command.await_partial,
command.client_id,
command.venue,
command.ts_init,
command.params
)
self._handle_subscribe_bars(client, bars_subscription)
elif command.bar_type.spec.price_type == PriceType.LAST:
self._msgbus.subscribe(
topic=f"data.trades"
Expand All @@ -2072,7 +2080,15 @@ cdef class DataEngine(Component):
handler=aggregator.handle_trade_tick,
priority=5,
)
self._handle_subscribe_trade_ticks(client, SubscribeTradeTicks(command.id, command.bar_type.instrument_id, command.client_id, command.venue, command.ts_init, command.params))
trade_ticks_subscription = SubscribeTradeTicks(
command.id,
command.bar_type.instrument_id,
command.client_id,
command.venue,
command.ts_init,
command.params
)
self._handle_subscribe_trade_ticks(client, trade_ticks_subscription)
else:
self._msgbus.subscribe(
topic=f"data.quotes"
Expand All @@ -2081,7 +2097,15 @@ cdef class DataEngine(Component):
handler=aggregator.handle_quote_tick,
priority=5,
)
self._handle_subscribe_quote_ticks(client, SubscribeQuoteTicks(command.id, command.bar_type.instrument_id, command.client_id, command.venue, command.ts_init, command.params))
quote_ticks_subscription = SubscribeQuoteTicks(
command.id,
command.bar_type.instrument_id,
command.client_id,
command.venue,
command.ts_init,
command.params
)
self._handle_subscribe_quote_ticks(client, quote_ticks_subscription)

aggregator.is_running = True

Expand All @@ -2105,24 +2129,47 @@ cdef class DataEngine(Component):
topic=f"data.bars.{composite_bar_type}",
handler=aggregator.handle_bar,
)
command.bar_type = composite_bar_type
self._handle_unsubscribe_bars(client, command)
bars_unsubscription = UnsubscribeBars(
command.id,
composite_bar_type,
command.client_id,
command.venue,
command.ts_init,
command.params
)
self._handle_unsubscribe_bars(client, bars_unsubscription)
elif command.bar_type.spec.price_type == PriceType.LAST:
self._msgbus.unsubscribe(
topic=f"data.trades"
f".{command.bar_type.instrument_id.venue}"
f".{command.bar_type.instrument_id.symbol}",
handler=aggregator.handle_trade_tick,
)
self._handle_unsubscribe_trade_ticks(client, UnsubscribeTradeTicks(command.id, command.bar_type.instrument_id, command.client_id, command.venue, command.ts_init, command.params))
trade_ticks_unsubscription =UnsubscribeTradeTicks(
command.id,
command.bar_type.instrument_id,
command.client_id,
command.venue,
command.ts_init,
command.params
)
self._handle_unsubscribe_trade_ticks(client, trade_ticks_unsubscription)
else:
self._msgbus.unsubscribe(
topic=f"data.quotes"
f".{command.bar_type.instrument_id.venue}"
f".{command.bar_type.instrument_id.symbol}",
handler=aggregator.handle_quote_tick,
)
self._handle_unsubscribe_quote_ticks(client, UnsubscribeQuoteTicks(command.id, command.bar_type.instrument_id, command.client_id, command.venue, command.ts_init, command.params))
quote_ticks_unsubscription =UnsubscribeQuoteTicks(
command.id,
command.bar_type.instrument_id,
command.client_id,
command.venue,
command.ts_init,
command.params
)
self._handle_unsubscribe_quote_ticks(client, quote_ticks_unsubscription)

# Remove from aggregators
del self._bar_aggregators[command.bar_type.standard()]
Expand Down
Loading