Skip to content

Commit 7da9df0

Browse files
committed
Continue support for GLBX exchange venue mappings
Not yet operational for continuous contracts and parent symbols.
1 parent 7877070 commit 7da9df0

File tree

5 files changed

+88
-46
lines changed

5 files changed

+88
-46
lines changed

examples/live/databento/databento_subscriber.py

Lines changed: 8 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -45,11 +45,11 @@
4545
# For correct subscription operation, you must specify all instruments to be immediately
4646
# subscribed for as part of the data client configuration
4747
instrument_ids = [
48-
InstrumentId.from_str("ES.c.0.GLBX"),
49-
# InstrumentId.from_str("ESZ4.GLBX"),
50-
# InstrumentId.from_str("ES.FUT.GLBX"),
51-
# InstrumentId.from_str("CL.FUT.GLBX"),
52-
# InstrumentId.from_str("LO.OPT.GLBX"),
48+
InstrumentId.from_str("ES.c.0.GLBX"), # TODO: Continuous contracts only work with GLBX for now
49+
# InstrumentId.from_str("ESZ4.XCME"),
50+
# InstrumentId.from_str("ES.FUT.XCME"),
51+
# InstrumentId.from_str("CL.FUT.NYMEX"),
52+
# InstrumentId.from_str("LO.OPT.NYMEX"),
5353
# InstrumentId.from_str("AAPL.XNAS"),
5454
# InstrumentId.from_str("AAPL.IEXG"),
5555
]
@@ -87,9 +87,10 @@
8787
api_key=None, # 'DATABENTO_API_KEY' env var
8888
http_gateway=None,
8989
instrument_provider=InstrumentProviderConfig(load_all=True),
90+
use_exchange_as_venue=True,
91+
mbo_subscriptions_delay=10.0,
9092
instrument_ids=instrument_ids,
9193
parent_symbols={"GLBX.MDP3": {"ES.FUT"}},
92-
mbo_subscriptions_delay=10.0,
9394
),
9495
},
9596
timeout_connection=30.0,
@@ -154,6 +155,7 @@ def on_start(self) -> None:
154155
# interval_ms=1000,
155156
# )
156157

158+
# self.subscribe_instrument(parent_symbol, client_id=DATABENTO_CLIENT_ID)
157159
self.subscribe_quote_ticks(instrument_id, client_id=DATABENTO_CLIENT_ID)
158160
self.subscribe_trade_ticks(instrument_id, client_id=DATABENTO_CLIENT_ID)
159161
# self.subscribe_bars(BarType.from_str(f"{instrument_id}-1-SECOND-LAST-EXTERNAL"))

nautilus_core/adapters/databento/src/live.rs

Lines changed: 44 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -19,15 +19,11 @@ use std::{
1919
};
2020

2121
use databento::{
22-
dbn,
23-
dbn::{PitSymbolMap, Record, SymbolIndex, VersionUpgradePolicy},
22+
dbn::{self, PitSymbolMap, Publisher, Record, SymbolIndex, VersionUpgradePolicy},
2423
live::Subscription,
2524
};
2625
use indexmap::IndexMap;
27-
use nautilus_core::{
28-
python::to_pyruntime_err,
29-
time::{get_atomic_clock_realtime, AtomicTime},
30-
};
26+
use nautilus_core::{nanos::UnixNanos, python::to_pyruntime_err, time::get_atomic_clock_realtime};
3127
use nautilus_model::{
3228
data::{
3329
delta::OrderBookDelta,
@@ -209,6 +205,8 @@ impl DatabentoFeedHandler {
209205
}
210206
};
211207

208+
let ts_init = clock.get_time_ns();
209+
212210
// Decode record
213211
if let Some(msg) = record.get::<dbn::ErrorMsg>() {
214212
handle_error_msg(msg);
@@ -219,14 +217,25 @@ impl DatabentoFeedHandler {
219217
instrument_id_map.remove(&msg.hd.instrument_id);
220218
handle_symbol_mapping_msg(msg, &mut symbol_map, &mut instrument_id_map);
221219
} else if let Some(msg) = record.get::<dbn::InstrumentDefMsg>() {
220+
// TODO: Make this configurable so that exchange can be used as venue
221+
let use_exchange_as_venue = false;
222+
if use_exchange_as_venue && msg.publisher()? == Publisher::GlbxMdp3Glbx {
223+
update_instrument_id_map_with_exchange(
224+
&symbol_map,
225+
&self.symbol_venue_map,
226+
&mut instrument_id_map,
227+
msg.hd.instrument_id,
228+
msg.exchange()?,
229+
);
230+
};
222231
let data = handle_instrument_def_msg(
223232
msg,
224233
&record,
225234
&symbol_map,
226235
&self.publisher_venue_map,
227236
&self.symbol_venue_map.read().unwrap(),
228237
&mut instrument_id_map,
229-
clock,
238+
ts_init,
230239
)?;
231240
self.send_msg(LiveMessage::Instrument(data)).await;
232241
} else if let Some(msg) = record.get::<dbn::StatusMsg>() {
@@ -237,7 +246,7 @@ impl DatabentoFeedHandler {
237246
&self.publisher_venue_map,
238247
&self.symbol_venue_map.read().unwrap(),
239248
&mut instrument_id_map,
240-
clock,
249+
ts_init,
241250
)?;
242251
self.send_msg(LiveMessage::Status(data)).await;
243252
} else if let Some(msg) = record.get::<dbn::ImbalanceMsg>() {
@@ -248,7 +257,7 @@ impl DatabentoFeedHandler {
248257
&self.publisher_venue_map,
249258
&self.symbol_venue_map.read().unwrap(),
250259
&mut instrument_id_map,
251-
clock,
260+
ts_init,
252261
)?;
253262
self.send_msg(LiveMessage::Imbalance(data)).await;
254263
} else if let Some(msg) = record.get::<dbn::StatMsg>() {
@@ -259,17 +268,17 @@ impl DatabentoFeedHandler {
259268
&self.publisher_venue_map,
260269
&self.symbol_venue_map.read().unwrap(),
261270
&mut instrument_id_map,
262-
clock,
271+
ts_init,
263272
)?;
264273
self.send_msg(LiveMessage::Statistics(data)).await;
265274
} else {
266275
let (mut data1, data2) = match handle_record(
267276
record,
268277
&symbol_map,
269278
&self.publisher_venue_map,
270-
&self.symbol_venue_map.read().unwrap(), // TODO: Optimize this bottleneck
279+
&self.symbol_venue_map.read().unwrap(),
271280
&mut instrument_id_map,
272-
clock,
281+
ts_init,
273282
) {
274283
Ok(decoded) => decoded,
275284
Err(e) => {
@@ -363,6 +372,24 @@ fn handle_symbol_mapping_msg(
363372
instrument_id_map.remove(&msg.header().instrument_id);
364373
}
365374

375+
fn update_instrument_id_map_with_exchange(
376+
symbol_map: &PitSymbolMap,
377+
symbol_venue_map: &RwLock<HashMap<Symbol, Venue>>,
378+
instrument_id_map: &mut HashMap<u32, InstrumentId>,
379+
raw_instrument_id: u32,
380+
exchange: &str,
381+
) -> InstrumentId {
382+
let raw_symbol = symbol_map
383+
.get(raw_instrument_id)
384+
.expect("Cannot resolve `raw_symbol` from `symbol_map`");
385+
let symbol = Symbol::from(raw_symbol.as_str());
386+
let venue = Venue::from(exchange);
387+
let instrument_id = InstrumentId::new(symbol, venue);
388+
symbol_venue_map.write().unwrap().insert(symbol, venue);
389+
instrument_id_map.insert(raw_instrument_id, instrument_id);
390+
instrument_id
391+
}
392+
366393
fn update_instrument_id_map(
367394
record: &dbn::RecordRef,
368395
symbol_map: &PitSymbolMap,
@@ -403,7 +430,7 @@ fn handle_instrument_def_msg(
403430
publisher_venue_map: &IndexMap<PublisherId, Venue>,
404431
symbol_venue_map: &HashMap<Symbol, Venue>,
405432
instrument_id_map: &mut HashMap<u32, InstrumentId>,
406-
clock: &AtomicTime,
433+
ts_init: UnixNanos,
407434
) -> anyhow::Result<InstrumentAny> {
408435
let instrument_id = update_instrument_id_map(
409436
record,
@@ -412,7 +439,6 @@ fn handle_instrument_def_msg(
412439
symbol_venue_map,
413440
instrument_id_map,
414441
);
415-
let ts_init = clock.get_time_ns();
416442

417443
decode_instrument_def_msg(msg, instrument_id, ts_init)
418444
}
@@ -424,7 +450,7 @@ fn handle_status_msg(
424450
publisher_venue_map: &IndexMap<PublisherId, Venue>,
425451
symbol_venue_map: &HashMap<Symbol, Venue>,
426452
instrument_id_map: &mut HashMap<u32, InstrumentId>,
427-
clock: &AtomicTime,
453+
ts_init: UnixNanos,
428454
) -> anyhow::Result<InstrumentStatus> {
429455
let instrument_id = update_instrument_id_map(
430456
record,
@@ -433,7 +459,6 @@ fn handle_status_msg(
433459
symbol_venue_map,
434460
instrument_id_map,
435461
);
436-
let ts_init = clock.get_time_ns();
437462

438463
decode_status_msg(msg, instrument_id, ts_init)
439464
}
@@ -445,7 +470,7 @@ fn handle_imbalance_msg(
445470
publisher_venue_map: &IndexMap<PublisherId, Venue>,
446471
symbol_venue_map: &HashMap<Symbol, Venue>,
447472
instrument_id_map: &mut HashMap<u32, InstrumentId>,
448-
clock: &AtomicTime,
473+
ts_init: UnixNanos,
449474
) -> anyhow::Result<DatabentoImbalance> {
450475
let instrument_id = update_instrument_id_map(
451476
record,
@@ -456,7 +481,6 @@ fn handle_imbalance_msg(
456481
);
457482

458483
let price_precision = 2; // Hard-coded for now
459-
let ts_init = clock.get_time_ns();
460484

461485
decode_imbalance_msg(msg, instrument_id, price_precision, ts_init)
462486
}
@@ -468,7 +492,7 @@ fn handle_statistics_msg(
468492
publisher_venue_map: &IndexMap<PublisherId, Venue>,
469493
symbol_venue_map: &HashMap<Symbol, Venue>,
470494
instrument_id_map: &mut HashMap<u32, InstrumentId>,
471-
clock: &AtomicTime,
495+
ts_init: UnixNanos,
472496
) -> anyhow::Result<DatabentoStatistics> {
473497
let instrument_id = update_instrument_id_map(
474498
record,
@@ -479,7 +503,6 @@ fn handle_statistics_msg(
479503
);
480504

481505
let price_precision = 2; // Hard-coded for now
482-
let ts_init = clock.get_time_ns();
483506

484507
decode_statistics_msg(msg, instrument_id, price_precision, ts_init)
485508
}
@@ -490,7 +513,7 @@ fn handle_record(
490513
publisher_venue_map: &IndexMap<PublisherId, Venue>,
491514
symbol_venue_map: &HashMap<Symbol, Venue>,
492515
instrument_id_map: &mut HashMap<u32, InstrumentId>,
493-
clock: &AtomicTime,
516+
ts_init: UnixNanos,
494517
) -> anyhow::Result<(Option<Data>, Option<Data>)> {
495518
let instrument_id = update_instrument_id_map(
496519
&record,
@@ -501,7 +524,6 @@ fn handle_record(
501524
);
502525

503526
let price_precision = 2; // Hard-coded for now
504-
let ts_init = clock.get_time_ns();
505527

506528
decode_record(
507529
&record,

nautilus_trader/adapters/databento/config.py

Lines changed: 11 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -30,25 +30,28 @@ class DatabentoDataClientConfig(LiveDataClientConfig, frozen=True):
3030
The historical HTTP client gateway override.
3131
live_gateway : str, optional
3232
The live client gateway override.
33-
parent_symbols : dict[str, set[str]], optional
34-
The Databento parent symbols to subscribe to instrument definitions for on start.
35-
This is a map of Databento dataset keys -> to a sequence of the parent symbols,
36-
e.g. {'GLBX.MDP3', ['ES.FUT', 'ES.OPT']} (for all E-mini S&P 500 futures and options products).
37-
instrument_ids : list[InstrumentId], optional
38-
The instrument IDs to request instrument definitions for on start.
33+
use_exchange_as_venue : bool, default True
34+
If the `exchange` field will be used as the venue for instrument IDs.
3935
timeout_initial_load : float, default 15.0
4036
The timeout (seconds) to wait for instruments to load (concurrently per dataset).
4137
mbo_subscriptions_delay : float, default 3.0
4238
The timeout (seconds) to wait for MBO/L3 subscriptions (concurrently per dataset).
4339
After the timeout the MBO order book feed will start and replay messages from the initial
4440
snapshot and then all deltas.
41+
parent_symbols : dict[str, set[str]], optional
42+
The Databento parent symbols to subscribe to instrument definitions for on start.
43+
This is a map of Databento dataset keys -> to a sequence of the parent symbols,
44+
e.g. {'GLBX.MDP3', ['ES.FUT', 'ES.OPT']} (for all E-mini S&P 500 futures and options products).
45+
instrument_ids : list[InstrumentId], optional
46+
The instrument IDs to request instrument definitions for on start.
4547
4648
"""
4749

4850
api_key: str | None = None
4951
http_gateway: str | None = None
5052
live_gateway: str | None = None
51-
instrument_ids: list[InstrumentId] | None = None
52-
parent_symbols: dict[str, set[str]] | None = None
53+
use_exchange_as_venue: bool = True
5354
timeout_initial_load: float | None = 15.0
5455
mbo_subscriptions_delay: float | None = 3.0 # Need to have received all definitions
56+
instrument_ids: list[InstrumentId] | None = None
57+
parent_symbols: dict[str, set[str]] | None = None

nautilus_trader/adapters/databento/data.py

Lines changed: 17 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -115,10 +115,15 @@ def __init__(
115115
# Configuration
116116
self._live_api_key: str = config.api_key or http_client.key
117117
self._live_gateway: str | None = config.live_gateway
118-
self._parent_symbols: dict[Dataset, set[str]] = defaultdict(set)
119-
self._instrument_ids: dict[Dataset, set[InstrumentId]] = defaultdict(set)
118+
self._use_exchange_as_venue: bool = config.use_exchange_as_venue
120119
self._timeout_initial_load: float | None = config.timeout_initial_load
121120
self._mbo_subscriptions_delay: float | None = config.mbo_subscriptions_delay
121+
self._parent_symbols: dict[Dataset, set[str]] = defaultdict(set)
122+
self._instrument_ids: dict[Dataset, set[InstrumentId]] = defaultdict(set)
123+
124+
self._log.info(f"{config.use_exchange_as_venue=}", LogColor.BLUE)
125+
self._log.info(f"{config.timeout_initial_load=}", LogColor.BLUE)
126+
self._log.info(f"{config.mbo_subscriptions_delay=}", LogColor.BLUE)
122127

123128
# Clients
124129
self._http_client = http_client
@@ -161,7 +166,10 @@ async def _connect(self) -> None:
161166
coros: list[Coroutine] = []
162167
for dataset, instrument_ids in self._instrument_ids.items():
163168
loading_ids: list[InstrumentId] = sorted(instrument_ids)
164-
filters = {"parent_symbols": list(self._parent_symbols.get(dataset, []))}
169+
filters = {
170+
"use_exchange_as_venue": self._use_exchange_as_venue,
171+
"parent_symbols": list(self._parent_symbols.get(dataset, [])),
172+
}
165173
coro = self._instrument_provider.load_ids_async(
166174
instrument_ids=loading_ids,
167175
filters=filters,
@@ -424,10 +432,13 @@ async def _subscribe_instrument(
424432
) -> None:
425433
try:
426434
dataset: Dataset = self._loader.get_dataset_for_venue(instrument_id.venue)
435+
start: int | None = params.get("start") if params else None
436+
427437
live_client = self._get_live_client(dataset)
428438
live_client.subscribe(
429439
schema=DatabentoSchema.DEFINITION.value,
430440
instrument_ids=[instrument_id_to_pyo3(instrument_id)],
441+
start=start,
431442
)
432443
await self._check_live_client_started(dataset, live_client)
433444
except asyncio.CancelledError:
@@ -436,17 +447,14 @@ async def _subscribe_instrument(
436447
async def _subscribe_parent_symbols(
437448
self,
438449
dataset: Dataset,
439-
parent_instrument_ids: set[InstrumentId],
450+
parent_symbols: set[InstrumentId],
440451
) -> None:
441452
try:
442453
live_client = self._get_live_client(dataset)
443454
live_client.subscribe(
444455
schema=DatabentoSchema.DEFINITION.value,
445456
instrument_ids=sorted( # type: ignore[type-var]
446-
[
447-
instrument_id_to_pyo3(instrument_id)
448-
for instrument_id in parent_instrument_ids
449-
],
457+
[instrument_id_to_pyo3(instrument_id) for instrument_id in parent_symbols],
450458
),
451459
stype_in="parent",
452460
)
@@ -919,6 +927,7 @@ async def _request_instrument(
919927
instrument_ids=[instrument_id_to_pyo3(instrument_id)],
920928
start=start.value,
921929
end=end.value,
930+
use_exchange_as_venue=self._use_exchange_as_venue,
922931
)
923932

924933
instruments = instruments_from_pyo3(pyo3_instruments)

nautilus_trader/adapters/databento/providers.py

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -126,6 +126,7 @@ async def load_ids_async( # noqa: C901 (too complex)
126126
)
127127

128128
parent_symbols = list(filters.get("parent_symbols", [])) if filters is not None else None
129+
# use_exchange_as_venue = filters.get("use_exchange_as_venue", True) if filters else True
129130

130131
pyo3_instruments = []
131132

@@ -157,6 +158,7 @@ def receive_instruments(pyo3_instrument: Any) -> None:
157158
[instrument_id_to_pyo3(instrument_id) for instrument_id in instrument_ids],
158159
),
159160
start=0, # From start of current week (latest definitions)
161+
# use_exchange_as_venue=use_exchange_as_venue, # TODO
160162
)
161163

162164
if parent_symbols:
@@ -226,7 +228,7 @@ async def load_async(
226228
Calling this method will incur a cost to your Databento account in USD.
227229
228230
"""
229-
await self.load_ids_async([instrument_id])
231+
await self.load_ids_async([instrument_id], filters=filters)
230232

231233
async def get_range(
232234
self,
@@ -262,12 +264,16 @@ async def get_range(
262264
263265
"""
264266
dataset = self._check_all_datasets_equal(instrument_ids)
267+
use_exchange_as_venue = filters.get("use_exchange_as_venue", True) if filters else True
265268

269+
# Here the NULL venue is overridden and so is used as a
270+
# placeholder to conform to instrument ID conventions.
266271
pyo3_instruments = await self._http_client.get_range_instruments(
267272
dataset=dataset,
268-
instrument_ids=[instrument_id_to_pyo3(InstrumentId.from_str(f"{ALL_SYMBOLS}.GLBX"))],
273+
instrument_ids=[instrument_id_to_pyo3(InstrumentId.from_str(f"{ALL_SYMBOLS}.NULL"))],
269274
start=pd.Timestamp(start, tz=pytz.utc).value,
270275
end=pd.Timestamp(end, tz=pytz.utc).value if end is not None else None,
276+
use_exchange_as_venue=use_exchange_as_venue,
271277
)
272278

273279
instruments = instruments_from_pyo3(pyo3_instruments)

0 commit comments

Comments
 (0)