Skip to content

Commit 436143f

Browse files
authored
Fix databento venue conversion for GLBX (#2121)
* Fix databento venue conversion for GLBX * Add start as params for databento market data subscriptions
1 parent 4ef754f commit 436143f

File tree

7 files changed

+84
-27
lines changed

7 files changed

+84
-27
lines changed

examples/live/databento/databento_historical_data.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -97,7 +97,7 @@ def on_start(self) -> None:
9797
start_time = time_object_to_dt("2024-05-09T10:00")
9898
end_time = time_object_to_dt("2024-05-09T10:05")
9999
self.request_quote_ticks(
100-
InstrumentId.from_str("ESM4.GLBX"),
100+
InstrumentId.from_str("ESM4.XCME"), # or "ESM4.GLBX"
101101
start_time,
102102
end_time,
103103
params={"schema": "bbo-1m"},

nautilus_core/adapters/databento/src/python/historical.rs

Lines changed: 11 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -119,8 +119,9 @@ impl DatabentoHistoricalClient {
119119
})
120120
}
121121

122+
#[allow(clippy::too_many_arguments)]
122123
#[pyo3(name = "get_range_instruments")]
123-
#[pyo3(signature = (dataset, instrument_ids, start, end=None, limit=None))]
124+
#[pyo3(signature = (dataset, instrument_ids, start, end=None, limit=None, use_exchange_as_venue=false))]
124125
fn py_get_range_instruments<'py>(
125126
&self,
126127
py: Python<'py>,
@@ -129,6 +130,7 @@ impl DatabentoHistoricalClient {
129130
start: u64,
130131
end: Option<u64>,
131132
limit: Option<u64>,
133+
use_exchange_as_venue: bool,
132134
) -> PyResult<Bound<'py, PyAny>> {
133135
let client = self.inner.clone();
134136
let mut symbol_venue_map = self.symbol_venue_map.write().unwrap();
@@ -172,14 +174,21 @@ impl DatabentoHistoricalClient {
172174

173175
while let Ok(Some(msg)) = decoder.decode_record::<dbn::InstrumentDefMsg>().await {
174176
let record = dbn::RecordRef::from(msg);
175-
let instrument_id = decode_nautilus_instrument_id(
177+
let mut instrument_id = decode_nautilus_instrument_id(
176178
&record,
177179
&metadata,
178180
&publisher_venue_map,
179181
&symbol_venue_map.read().unwrap(),
180182
)
181183
.map_err(to_pyvalue_err)?;
182184

185+
if use_exchange_as_venue && instrument_id.venue == Venue::GLBX() {
186+
let exchange = msg.exchange().unwrap();
187+
let venue = Venue::from_code(exchange)
188+
.unwrap_or_else(|_| panic!("`Venue` not found for exchange {exchange}"));
189+
instrument_id.venue = venue;
190+
}
191+
183192
let result = decode_instrument_def_msg(msg, instrument_id, ts_init);
184193
match result {
185194
Ok(instrument) => instruments.push(instrument),

nautilus_core/adapters/databento/src/symbology.rs

Lines changed: 10 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -26,17 +26,16 @@ pub fn instrument_id_to_symbol_string(
2626
instrument_id: InstrumentId,
2727
symbol_venue_map: &mut HashMap<Symbol, Venue>,
2828
) -> String {
29-
if let Some(venue) = symbol_venue_map.get(&instrument_id.symbol) {
30-
if venue == &Venue::CBCM()
31-
|| venue == &Venue::NYUM()
32-
|| venue == &Venue::XCBT()
33-
|| venue == &Venue::XCEC()
34-
|| venue == &Venue::XCME()
35-
|| venue == &Venue::XFXS()
36-
|| venue == &Venue::XNYM()
37-
{
38-
symbol_venue_map.insert(instrument_id.symbol, Venue::GLBX());
39-
}
29+
let venue = instrument_id.venue;
30+
if venue == Venue::CBCM()
31+
|| venue == Venue::NYUM()
32+
|| venue == Venue::XCBT()
33+
|| venue == Venue::XCEC()
34+
|| venue == Venue::XCME()
35+
|| venue == Venue::XFXS()
36+
|| venue == Venue::XNYM()
37+
{
38+
symbol_venue_map.insert(instrument_id.symbol, venue);
4039
}
4140

4241
instrument_id.symbol.to_string()
@@ -55,8 +54,6 @@ pub fn decode_nautilus_instrument_id(
5554
.ok_or_else(|| anyhow::anyhow!("`Venue` not found for `publisher_id` {publisher_id}"))?;
5655
let mut instrument_id = get_nautilus_instrument_id_for_record(record, metadata, *venue)?;
5756
if publisher == Publisher::GlbxMdp3Glbx {
58-
// Source actual exchange from GLBX instrument
59-
// definitions if they were loaded.
6057
if let Some(venue) = symbol_venue_map.get(&instrument_id.symbol) {
6158
instrument_id.venue = *venue;
6259
}

nautilus_trader/adapters/databento/data.py

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -612,11 +612,16 @@ async def _subscribe_quote_ticks(
612612
]:
613613
schema = DatabentoSchema.MBP_1.value
614614

615+
start = params.get("start") if params else None
616+
start = start.value + 1 if start else None # time in nanoseconds from pd.Timestamp
617+
start = start if params and params.get("subscribe_from_start") else None
618+
615619
dataset: Dataset = self._loader.get_dataset_for_venue(instrument_id.venue)
616620
live_client = self._get_live_client(dataset)
617621
live_client.subscribe(
618622
schema=schema,
619623
instrument_ids=[instrument_id_to_pyo3(instrument_id)],
624+
start=start,
620625
)
621626

622627
# Add trade tick subscriptions for instrument (MBP-1 data includes trades)
@@ -637,11 +642,16 @@ async def _subscribe_trade_ticks(
637642

638643
await self._ensure_subscribed_for_instrument(instrument_id)
639644

645+
start = params.get("start") if params else None
646+
start = start.value + 1 if start else None # time in nanoseconds from pd.Timestamp
647+
start = start if params and params.get("subscribe_from_start") else None
648+
640649
dataset: Dataset = self._loader.get_dataset_for_venue(instrument_id.venue)
641650
live_client = self._get_live_client(dataset)
642651
live_client.subscribe(
643652
schema=DatabentoSchema.TRADES.value,
644653
instrument_ids=[instrument_id_to_pyo3(instrument_id)],
654+
start=start,
645655
)
646656
await self._check_live_client_started(dataset, live_client)
647657
except asyncio.CancelledError:
@@ -661,10 +671,15 @@ async def _subscribe_bars(
661671
self._log.error(f"Cannot subscribe: {e}")
662672
return
663673

674+
start = params.get("start") if params else None
675+
start = start.value + 1 if start else None # time in nanoseconds from pd.Timestamp
676+
start = start if params and params.get("subscribe_from_start") else None
677+
664678
live_client = self._get_live_client(dataset)
665679
live_client.subscribe(
666680
schema=schema.value,
667681
instrument_ids=[instrument_id_to_pyo3(bar_type.instrument_id)],
682+
start=start,
668683
)
669684
await self._check_live_client_started(dataset, live_client)
670685
except asyncio.CancelledError:
@@ -941,11 +956,14 @@ async def _request_instruments(
941956
LogColor.BLUE,
942957
)
943958

959+
use_exchange_as_venue = params is not None and params.get("use_exchange_as_venue", False)
960+
944961
pyo3_instruments = await self._http_client.get_range_instruments(
945962
dataset=dataset,
946963
instrument_ids=[instrument_id_to_pyo3(InstrumentId.from_str(f"{ALL_SYMBOLS}.{venue}"))],
947964
start=start.value,
948965
end=end.value,
966+
use_exchange_as_venue=use_exchange_as_venue,
949967
)
950968

951969
instruments = instruments_from_pyo3(pyo3_instruments)

nautilus_trader/common/actor.pyx

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -28,7 +28,6 @@ import asyncio
2828
from concurrent.futures import Executor
2929

3030
import cython
31-
import msgspec
3231

3332
from nautilus_trader.common.config import ActorConfig
3433
from nautilus_trader.common.config import ImportableActorConfig

nautilus_trader/core/nautilus_pyo3.pyi

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4235,6 +4235,7 @@ class DatabentoHistoricalClient:
42354235
start: int,
42364236
end: int | None = None,
42374237
limit: int | None = None,
4238+
use_exchange_as_venue: bool = False
42384239
) -> list[Instrument]: ...
42394240
async def get_range_quotes(
42404241
self,

nautilus_trader/data/engine.pyx

Lines changed: 43 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -984,6 +984,15 @@ cdef class DataEngine(Component):
984984
return
985985
Condition.not_none(client, "client")
986986

987+
if not params.get("start"):
988+
last_timestamp: datetime | None = self._catalogs_last_timestamp(
989+
QuoteTick,
990+
instrument_id,
991+
)[0]
992+
993+
params = params if params else {}
994+
params["start"] = last_timestamp
995+
987996
if instrument_id not in client.subscribed_quote_ticks():
988997
client.subscribe_quote_ticks(instrument_id, params)
989998

@@ -1025,6 +1034,15 @@ cdef class DataEngine(Component):
10251034
return
10261035
Condition.not_none(client, "client")
10271036

1037+
if not params.get("start"):
1038+
last_timestamp: datetime | None = self._catalogs_last_timestamp(
1039+
TradeTick,
1040+
instrument_id,
1041+
)[0]
1042+
1043+
params = params if params else {}
1044+
params["start"] = last_timestamp
1045+
10281046
if instrument_id not in client.subscribed_trade_ticks():
10291047
client.subscribe_trade_ticks(instrument_id, params)
10301048

@@ -1076,6 +1094,15 @@ cdef class DataEngine(Component):
10761094
)
10771095
return
10781096

1097+
if not params.get("start"):
1098+
last_timestamp: datetime | None = self._catalogs_last_timestamp(
1099+
Bar,
1100+
bar_type=bar_type,
1101+
)[0]
1102+
1103+
params = params if params else {}
1104+
params["start"] = last_timestamp
1105+
10791106
if bar_type not in client.subscribed_bars():
10801107
client.subscribe_bars(bar_type, params)
10811108

@@ -1090,6 +1117,12 @@ cdef class DataEngine(Component):
10901117

10911118
try:
10921119
if data_type not in client.subscribed_custom_data():
1120+
if not params.get("start"):
1121+
last_timestamp: datetime | None = self._catalogs_last_timestamp(data_type.type)[0]
1122+
1123+
params = params if params else {}
1124+
params["start"] = last_timestamp
1125+
10931126
client.subscribe(data_type, params)
10941127
except NotImplementedError:
10951128
self._log.error(
@@ -1419,10 +1452,10 @@ cdef class DataEngine(Component):
14191452
datetime end,
14201453
dict params,
14211454
):
1422-
last_timestamp, _ = self._catalogs_last_timestamp(
1455+
last_timestamp = self._catalogs_last_timestamp(
14231456
Instrument,
14241457
instrument_id,
1425-
)
1458+
)[0]
14261459

14271460
if last_timestamp:
14281461
self._query_catalog(request)
@@ -1474,10 +1507,10 @@ cdef class DataEngine(Component):
14741507
):
14751508
instrument_id = request.data_type.metadata.get("instrument_id")
14761509

1477-
last_timestamp, _ = self._catalogs_last_timestamp(
1510+
last_timestamp = self._catalogs_last_timestamp(
14781511
QuoteTick,
14791512
instrument_id,
1480-
)
1513+
)[0]
14811514

14821515
if last_timestamp:
14831516
if (now <= last_timestamp) or (end and end <= last_timestamp):
@@ -1515,10 +1548,10 @@ cdef class DataEngine(Component):
15151548
):
15161549
instrument_id = request.data_type.metadata.get("instrument_id")
15171550

1518-
last_timestamp, _ = self._catalogs_last_timestamp(
1551+
last_timestamp = self._catalogs_last_timestamp(
15191552
TradeTick,
15201553
instrument_id,
1521-
)
1554+
)[0]
15221555

15231556
if last_timestamp:
15241557
if (now <= last_timestamp) or (end and end <= last_timestamp):
@@ -1556,10 +1589,10 @@ cdef class DataEngine(Component):
15561589
):
15571590
bar_type = request.data_type.metadata.get("bar_type")
15581591

1559-
last_timestamp, _ = self._catalogs_last_timestamp(
1592+
last_timestamp = self._catalogs_last_timestamp(
15601593
Bar,
15611594
bar_type=bar_type,
1562-
)
1595+
)[0]
15631596

15641597
if last_timestamp:
15651598
if (now <= last_timestamp) or (end and end <= last_timestamp):
@@ -1595,9 +1628,9 @@ cdef class DataEngine(Component):
15951628
datetime now,
15961629
dict params,
15971630
):
1598-
last_timestamp, _ = self._catalogs_last_timestamp(
1631+
last_timestamp = self._catalogs_last_timestamp(
15991632
request.data_type.type,
1600-
)
1633+
)[0]
16011634

16021635
if last_timestamp:
16031636
if (now <= last_timestamp) or (end and end <= last_timestamp):

0 commit comments

Comments
 (0)