Skip to content

Commit eacbd85

Browse files
committed
Add start as params for databento market data subscriptions
1 parent bb8d103 commit eacbd85

File tree

3 files changed

+58
-11
lines changed

3 files changed

+58
-11
lines changed

nautilus_trader/adapters/databento/data.py

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -612,11 +612,16 @@ async def _subscribe_quote_ticks(
612612
]:
613613
schema = DatabentoSchema.MBP_1.value
614614

615+
start = params.get("start") if params else None
616+
start = start.value + 1 if start else None # time in nanoseconds from pd.Timestamp
617+
start = start if params and params.get("subscribe_from_start") else None
618+
615619
dataset: Dataset = self._loader.get_dataset_for_venue(instrument_id.venue)
616620
live_client = self._get_live_client(dataset)
617621
live_client.subscribe(
618622
schema=schema,
619623
instrument_ids=[instrument_id_to_pyo3(instrument_id)],
624+
start=start,
620625
)
621626

622627
# Add trade tick subscriptions for instrument (MBP-1 data includes trades)
@@ -637,11 +642,16 @@ async def _subscribe_trade_ticks(
637642

638643
await self._ensure_subscribed_for_instrument(instrument_id)
639644

645+
start = params.get("start") if params else None
646+
start = start.value + 1 if start else None # time in nanoseconds from pd.Timestamp
647+
start = start if params and params.get("subscribe_from_start") else None
648+
640649
dataset: Dataset = self._loader.get_dataset_for_venue(instrument_id.venue)
641650
live_client = self._get_live_client(dataset)
642651
live_client.subscribe(
643652
schema=DatabentoSchema.TRADES.value,
644653
instrument_ids=[instrument_id_to_pyo3(instrument_id)],
654+
start=start,
645655
)
646656
await self._check_live_client_started(dataset, live_client)
647657
except asyncio.CancelledError:
@@ -661,10 +671,15 @@ async def _subscribe_bars(
661671
self._log.error(f"Cannot subscribe: {e}")
662672
return
663673

674+
start = params.get("start") if params else None
675+
start = start.value + 1 if start else None # time in nanoseconds from pd.Timestamp
676+
start = start if params and params.get("subscribe_from_start") else None
677+
664678
live_client = self._get_live_client(dataset)
665679
live_client.subscribe(
666680
schema=schema.value,
667681
instrument_ids=[instrument_id_to_pyo3(bar_type.instrument_id)],
682+
start=start,
668683
)
669684
await self._check_live_client_started(dataset, live_client)
670685
except asyncio.CancelledError:

nautilus_trader/common/actor.pyx

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -28,7 +28,6 @@ import asyncio
2828
from concurrent.futures import Executor
2929

3030
import cython
31-
import msgspec
3231

3332
from nautilus_trader.common.config import ActorConfig
3433
from nautilus_trader.common.config import ImportableActorConfig

nautilus_trader/data/engine.pyx

Lines changed: 43 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -984,6 +984,15 @@ cdef class DataEngine(Component):
984984
return
985985
Condition.not_none(client, "client")
986986

987+
if not params.get("start"):
988+
last_timestamp: datetime | None = self._catalogs_last_timestamp(
989+
QuoteTick,
990+
instrument_id,
991+
)[0]
992+
993+
params = params if params else {}
994+
params["start"] = last_timestamp
995+
987996
if instrument_id not in client.subscribed_quote_ticks():
988997
client.subscribe_quote_ticks(instrument_id, params)
989998

@@ -1025,6 +1034,15 @@ cdef class DataEngine(Component):
10251034
return
10261035
Condition.not_none(client, "client")
10271036

1037+
if not params.get("start"):
1038+
last_timestamp: datetime | None = self._catalogs_last_timestamp(
1039+
TradeTick,
1040+
instrument_id,
1041+
)[0]
1042+
1043+
params = params if params else {}
1044+
params["start"] = last_timestamp
1045+
10281046
if instrument_id not in client.subscribed_trade_ticks():
10291047
client.subscribe_trade_ticks(instrument_id, params)
10301048

@@ -1076,6 +1094,15 @@ cdef class DataEngine(Component):
10761094
)
10771095
return
10781096

1097+
if not params.get("start"):
1098+
last_timestamp: datetime | None = self._catalogs_last_timestamp(
1099+
Bar,
1100+
bar_type=bar_type,
1101+
)[0]
1102+
1103+
params = params if params else {}
1104+
params["start"] = last_timestamp
1105+
10791106
if bar_type not in client.subscribed_bars():
10801107
client.subscribe_bars(bar_type, params)
10811108

@@ -1090,6 +1117,12 @@ cdef class DataEngine(Component):
10901117

10911118
try:
10921119
if data_type not in client.subscribed_custom_data():
1120+
if not params.get("start"):
1121+
last_timestamp: datetime | None = self._catalogs_last_timestamp(data_type.type)[0]
1122+
1123+
params = params if params else {}
1124+
params["start"] = last_timestamp
1125+
10931126
client.subscribe(data_type, params)
10941127
except NotImplementedError:
10951128
self._log.error(
@@ -1419,10 +1452,10 @@ cdef class DataEngine(Component):
14191452
datetime end,
14201453
dict params,
14211454
):
1422-
last_timestamp, _ = self._catalogs_last_timestamp(
1455+
last_timestamp = self._catalogs_last_timestamp(
14231456
Instrument,
14241457
instrument_id,
1425-
)
1458+
)[0]
14261459

14271460
if last_timestamp:
14281461
self._query_catalog(request)
@@ -1474,10 +1507,10 @@ cdef class DataEngine(Component):
14741507
):
14751508
instrument_id = request.data_type.metadata.get("instrument_id")
14761509

1477-
last_timestamp, _ = self._catalogs_last_timestamp(
1510+
last_timestamp = self._catalogs_last_timestamp(
14781511
QuoteTick,
14791512
instrument_id,
1480-
)
1513+
)[0]
14811514

14821515
if last_timestamp:
14831516
if (now <= last_timestamp) or (end and end <= last_timestamp):
@@ -1515,10 +1548,10 @@ cdef class DataEngine(Component):
15151548
):
15161549
instrument_id = request.data_type.metadata.get("instrument_id")
15171550

1518-
last_timestamp, _ = self._catalogs_last_timestamp(
1551+
last_timestamp = self._catalogs_last_timestamp(
15191552
TradeTick,
15201553
instrument_id,
1521-
)
1554+
)[0]
15221555

15231556
if last_timestamp:
15241557
if (now <= last_timestamp) or (end and end <= last_timestamp):
@@ -1556,10 +1589,10 @@ cdef class DataEngine(Component):
15561589
):
15571590
bar_type = request.data_type.metadata.get("bar_type")
15581591

1559-
last_timestamp, _ = self._catalogs_last_timestamp(
1592+
last_timestamp = self._catalogs_last_timestamp(
15601593
Bar,
15611594
bar_type=bar_type,
1562-
)
1595+
)[0]
15631596

15641597
if last_timestamp:
15651598
if (now <= last_timestamp) or (end and end <= last_timestamp):
@@ -1595,9 +1628,9 @@ cdef class DataEngine(Component):
15951628
datetime now,
15961629
dict params,
15971630
):
1598-
last_timestamp, _ = self._catalogs_last_timestamp(
1631+
last_timestamp = self._catalogs_last_timestamp(
15991632
request.data_type.type,
1600-
)
1633+
)[0]
16011634

16021635
if last_timestamp:
16031636
if (now <= last_timestamp) or (end and end <= last_timestamp):

0 commit comments

Comments
 (0)