Skip to content

Request order books snapshots with HTTP for dYdX #2393

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 1 commit into from
Mar 2, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
67 changes: 46 additions & 21 deletions nautilus_trader/adapters/dydx/data.py
Original file line number Diff line number Diff line change
Expand Up @@ -164,10 +164,9 @@ def __init__(
self._update_instruments_interval_mins: int | None = config.update_instruments_interval_mins
self._update_orderbook_interval_secs: int = 60 # Once every 60 seconds (hard-coded for now)
self._update_instruments_task: asyncio.Task | None = None
self._resubscribe_orderbook_task: asyncio.Task | None = None
self._fetch_orderbook_task: asyncio.Task | None = None
self._last_quotes: dict[InstrumentId, QuoteTick] = {}
self._orderbook_subscriptions: set[str] = set()
self._resubscribe_orderbook_lock = asyncio.Lock()

# Hot caches
self._bars: dict[BarType, Bar] = {}
Expand All @@ -180,8 +179,8 @@ async def _connect(self) -> None:
self._update_instruments_task = self.create_task(
self._update_instruments(self._update_instruments_interval_mins),
)
self._resubscribe_orderbook_task = self.create_task(
self._resubscribe_orderbooks_on_interval(),
self._fetch_orderbook_task = self.create_task(
self._fetch_orderbooks_on_interval(),
)

self._log.info("Initializing websocket connection")
Expand All @@ -195,10 +194,10 @@ async def _disconnect(self) -> None:
self._update_instruments_task.cancel()
self._update_instruments_task = None

if self._resubscribe_orderbook_task:
self._log.debug("Cancelling 'resubscribe_orderbook' task")
self._resubscribe_orderbook_task.cancel()
self._resubscribe_orderbook_task = None
if self._fetch_orderbook_task:
self._log.debug("Cancelling 'fetch_orderbook' task")
self._fetch_orderbook_task.cancel()
self._fetch_orderbook_task = None

await self._ws_client.unsubscribe_markets()
await self._ws_client.disconnect()
Expand All @@ -215,32 +214,58 @@ async def _update_instruments(self, interval_mins: int) -> None:
except asyncio.CancelledError:
self._log.debug("Canceled task 'update_instruments'")

async def _resubscribe_orderbooks_on_interval(self) -> None:
async def _fetch_orderbooks_on_interval(self) -> None:
"""
Resubscribe to the orderbook on a fixed interval `update_orderbook_interval` to
ensure it does not become outdated.
Fetch the orderbook on a fixed interval `update_orderbook_interval` to ensure it
does not become outdated.
"""
try:
while True:
self._log.debug(
f"Scheduled `resubscribe_order_book` to run in {self._update_orderbook_interval_secs}s",
)
await asyncio.sleep(self._update_orderbook_interval_secs)
await self._resubscribe_orderbooks()
await self._fetch_orderbooks()
except asyncio.CancelledError:
self._log.debug("Canceled task 'resubscribe_orderbook'")
self._log.debug("Canceled task 'fetch_orderbook'")

async def _resubscribe_orderbooks(self) -> None:
async def _fetch_orderbooks(self) -> None:
"""
Resubscribe to the orderbook.
Request a new orderbook snapshot for all order book subscriptions.
"""
async with self._resubscribe_orderbook_lock:
for symbol in self._orderbook_subscriptions:
await self._ws_client.unsubscribe_order_book(symbol, remove_subscription=False)
await self._ws_client.subscribe_order_book(
symbol,
bypass_subscription_validation=True,
tasks = []

for symbol in self._orderbook_subscriptions:
tasks.append(self._fetch_orderbook(symbol))

await asyncio.gather(*tasks)

async def _fetch_orderbook(self, symbol: str) -> None:
"""
Request a new orderbook snapshot.
"""
msg = await self._http_market.get_orderbook(symbol=symbol)

if msg is not None:
instrument_id: InstrumentId = self._get_cached_instrument_id(symbol)
instrument = self._cache.instrument(instrument_id)

if instrument is None:
self._log.error(
f"Cannot parse orderbook snapshot: no instrument for {instrument_id}",
)
return

ts_init = self._clock.timestamp_ns()
deltas = msg.parse_to_snapshot(
instrument_id=instrument_id,
price_precision=instrument.price_precision,
size_precision=instrument.size_precision,
ts_event=ts_init,
ts_init=ts_init,
)

self._handle_deltas(instrument_id=instrument_id, deltas=deltas)

def _send_all_instruments_to_data_engine(self) -> None:
for instrument in self._instrument_provider.get_all().values():
Expand Down
3 changes: 2 additions & 1 deletion nautilus_trader/adapters/dydx/endpoints/endpoint.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
from nautilus_trader.adapters.dydx.http.errors import DYDXError
from nautilus_trader.adapters.dydx.http.errors import should_retry
from nautilus_trader.common.component import Logger
from nautilus_trader.core.nautilus_pyo3 import HttpError
from nautilus_trader.core.nautilus_pyo3 import HttpMethod
from nautilus_trader.core.nautilus_pyo3 import HttpTimeoutError
from nautilus_trader.live.retry import RetryManagerPool
Expand Down Expand Up @@ -63,7 +64,7 @@ def __init__(
max_retries=5,
retry_delay_secs=1.0,
logger=Logger(name="DYDXHttpEndpoint"),
exc_types=(HttpTimeoutError, DYDXError),
exc_types=(HttpTimeoutError, HttpError, DYDXError),
retry_check=should_retry,
)

Expand Down
8 changes: 4 additions & 4 deletions nautilus_trader/adapters/dydx/endpoints/market/candles.py
Original file line number Diff line number Diff line change
Expand Up @@ -43,20 +43,20 @@ class DYDXCandlesGetParams(msgspec.Struct, omit_defaults=True):

class DYDXCandlesResponse(msgspec.Struct, forbid_unknown_fields=True):
"""
Represent the dYdX list perpetual markets response object.
Represent the dYdX candles response object.
"""

candles: list[DYDXCandle]


class DYDXCandlesEndpoint(DYDXHttpEndpoint):
"""
Define the instrument info endpoint.
Define the bars endpoint.
"""

def __init__(self, client: DYDXHttpClient) -> None:
"""
Define the instrument info endpoint.
Define the bars endpoint.
"""
url_path = "/candles/perpetualMarkets/"
super().__init__(
Expand All @@ -70,7 +70,7 @@ def __init__(self, client: DYDXHttpClient) -> None:

async def get(self, symbol: str, params: DYDXCandlesGetParams) -> DYDXCandlesResponse | None:
"""
Call the endpoint to list the instruments.
Call the bars endpoint.
"""
url_path = f"/candles/perpetualMarkets/{symbol}"
raw = await self._method(self.method_type, params=params, url_path=url_path)
Expand Down
75 changes: 75 additions & 0 deletions nautilus_trader/adapters/dydx/endpoints/market/orderbook.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,75 @@
# -------------------------------------------------------------------------------------------------
# Copyright (C) 2015-2025 Nautech Systems Pty Ltd. All rights reserved.
# https://nautechsystems.io
#
# Licensed under the GNU Lesser General Public License Version 3.0 (the "License");
# You may not use this file except in compliance with the License.
# You may obtain a copy of the License at https://www.gnu.org/licenses/lgpl-3.0.en.html
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# -------------------------------------------------------------------------------------------------
"""
Define the orderbook snapshot endpoint.
"""


import msgspec

from nautilus_trader.adapters.dydx.common.enums import DYDXEndpointType
from nautilus_trader.adapters.dydx.endpoints.endpoint import DYDXHttpEndpoint
from nautilus_trader.adapters.dydx.http.client import DYDXHttpClient
from nautilus_trader.adapters.dydx.schemas.ws import DYDXWsOrderbookMessageSnapshotContents
from nautilus_trader.core.nautilus_pyo3 import HttpMethod


class DYDXOrderBookSnapshotEndpoint(DYDXHttpEndpoint):
"""
Define the order book snapshot endpoint.
"""

def __init__(self, client: DYDXHttpClient) -> None:
"""
Define the order book snapshot endpoint.

Parameters
----------
client : DYDXHttpClient
The HTTP client.

"""
url_path = "/orderbooks/perpetualMarket/"
super().__init__(
client=client,
url_path=url_path,
endpoint_type=DYDXEndpointType.NONE,
name="DYDXOrderBookSnapshotEndpoint",
)
self.method_type = HttpMethod.GET
self._decoder = msgspec.json.Decoder(DYDXWsOrderbookMessageSnapshotContents)

async def get(self, symbol: str) -> DYDXWsOrderbookMessageSnapshotContents | None:
"""
Call the endpoint to request an order book snapshot.

Parameters
----------
symbol : str
The ticker or symbol to request the order book snapshot for.

Returns
-------
DYDXWsOrderbookMessageSnapshotContents | None
The order book snapshot message.

"""
url_path = f"/orderbooks/perpetualMarket/{symbol}"
raw = await self._method(self.method_type, url_path=url_path)

if raw is not None:
return self._decoder.decode(raw)

return None
20 changes: 20 additions & 0 deletions nautilus_trader/adapters/dydx/http/market.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,9 @@
from nautilus_trader.adapters.dydx.endpoints.market.instruments_info import DYDXListPerpetualMarketsEndpoint
from nautilus_trader.adapters.dydx.endpoints.market.instruments_info import DYDXListPerpetualMarketsResponse
from nautilus_trader.adapters.dydx.endpoints.market.instruments_info import ListPerpetualMarketsGetParams
from nautilus_trader.adapters.dydx.endpoints.market.orderbook import DYDXOrderBookSnapshotEndpoint
from nautilus_trader.adapters.dydx.http.client import DYDXHttpClient
from nautilus_trader.adapters.dydx.schemas.ws import DYDXWsOrderbookMessageSnapshotContents

# fmt: on
from nautilus_trader.common.component import LiveClock
Expand All @@ -53,6 +55,7 @@ def __init__(

self._endpoint_instruments = DYDXListPerpetualMarketsEndpoint(client)
self._endpoint_candles = DYDXCandlesEndpoint(client)
self._endpoint_orderbook = DYDXOrderBookSnapshotEndpoint(client)

async def fetch_instruments(
self,
Expand All @@ -66,6 +69,23 @@ async def fetch_instruments(
ListPerpetualMarketsGetParams(ticker=symbol, limit=limit),
)

async def get_orderbook(self, symbol: str) -> DYDXWsOrderbookMessageSnapshotContents | None:
"""
Request an orderbook snapshot.

Parameters
----------
symbol : str
The ticker or symbol to request the order book snapshot for.

Returns
-------
DYDXWsOrderbookMessageSnapshotContents | None
The order book snapshot message.

"""
return await self._endpoint_orderbook.get(symbol=symbol)

async def get_candles(
self,
symbol: str,
Expand Down
63 changes: 41 additions & 22 deletions nautilus_trader/adapters/dydx/schemas/ws.py
Original file line number Diff line number Diff line change
Expand Up @@ -339,20 +339,6 @@ class DYDXWsOrderbookMessageSnapshotContents(msgspec.Struct, forbid_unknown_fiel
bids: list[PriceLevel] | None = None
asks: list[PriceLevel] | None = None


class DYDXWsOrderbookSnapshotChannelData(msgspec.Struct, forbid_unknown_fields=True):
"""
Define the order book snapshot messages.
"""

type: str
connection_id: str
message_id: int
channel: str
id: str
contents: DYDXWsOrderbookMessageSnapshotContents
version: str | None = None

def parse_to_snapshot(
self,
instrument_id: InstrumentId,
Expand All @@ -375,16 +361,16 @@ def parse_to_snapshot(
)
deltas.append(clear)

if self.contents.bids is None:
self.contents.bids = []
if self.bids is None:
self.bids = []

if self.contents.asks is None:
self.contents.asks = []
if self.asks is None:
self.asks = []

bids_len = len(self.contents.bids)
asks_len = len(self.contents.asks)
bids_len = len(self.bids)
asks_len = len(self.asks)

for idx, bid in enumerate(self.contents.bids):
for idx, bid in enumerate(self.bids):
flags = 0
if idx == bids_len - 1 and asks_len == 0:
# F_LAST, 1 << 7
Expand All @@ -410,7 +396,7 @@ def parse_to_snapshot(

deltas.append(delta)

for idx, ask in enumerate(self.contents.asks):
for idx, ask in enumerate(self.asks):
flags = 0
if idx == asks_len - 1:
# F_LAST, 1 << 7
Expand All @@ -436,6 +422,39 @@ def parse_to_snapshot(
return OrderBookDeltas(instrument_id=instrument_id, deltas=deltas)


class DYDXWsOrderbookSnapshotChannelData(msgspec.Struct, forbid_unknown_fields=True):
"""
Define the order book snapshot messages.
"""

type: str
connection_id: str
message_id: int
channel: str
id: str
contents: DYDXWsOrderbookMessageSnapshotContents
version: str | None = None

def parse_to_snapshot(
self,
instrument_id: InstrumentId,
price_precision: int,
size_precision: int,
ts_event: int,
ts_init: int,
) -> OrderBookDeltas:
"""
Parse the order book message into OrderBookDeltas.
"""
return self.contents.parse_to_snapshot(
instrument_id,
price_precision,
size_precision,
ts_event,
ts_init,
)


class DYDXWsOrderbookBatchedData(msgspec.Struct, forbid_unknown_fields=True):
"""
Define the order book batched deltas message.
Expand Down
Loading
Loading