53
53
from nautilus_trader .adapters .dydx .schemas .ws import DYDXWsBlockHeightChannelData
54
54
from nautilus_trader .adapters .dydx .schemas .ws import DYDXWsBlockHeightSubscribedData
55
55
from nautilus_trader .adapters .dydx .schemas .ws import DYDXWsFillSubaccountMessageContents
56
+ from nautilus_trader .adapters .dydx .schemas .ws import DYDXWsMarketChannelData
57
+ from nautilus_trader .adapters .dydx .schemas .ws import DYDXWsMarketSubscribedData
56
58
from nautilus_trader .adapters .dydx .schemas .ws import DYDXWsMessageGeneral
57
59
from nautilus_trader .adapters .dydx .schemas .ws import DYDXWsOrderSubaccountMessageContents
58
60
from nautilus_trader .adapters .dydx .schemas .ws import DYDXWsSubaccountsChannelData
59
61
from nautilus_trader .adapters .dydx .schemas .ws import DYDXWsSubaccountsSubscribed
60
62
from nautilus_trader .adapters .dydx .websocket .client import DYDXWebsocketClient
61
63
from nautilus_trader .cache .cache import Cache
62
64
from nautilus_trader .common .component import LiveClock
65
+ from nautilus_trader .common .component import Logger
63
66
from nautilus_trader .common .component import MessageBus
64
67
from nautilus_trader .common .enums import LogColor
65
68
from nautilus_trader .core .correctness import PyCondition
@@ -110,6 +113,7 @@ def __init__(self, cache: Cache) -> None:
110
113
Generate integer client order IDs.
111
114
"""
112
115
self ._cache = cache
116
+ self ._log : Logger = Logger (type (self ).__name__ )
113
117
114
118
def generate_client_order_id_int (self , client_order_id : ClientOrderId ) -> int :
115
119
"""
@@ -143,6 +147,8 @@ def get_client_order_id_int(self, client_order_id: ClientOrderId) -> int | None:
143
147
144
148
if value is not None :
145
149
result = int .from_bytes (value , byteorder = "big" )
150
+ else :
151
+ self ._log .error (f"ClientOrderId integer not found in cache for { client_order_id !r} " )
146
152
147
153
return result
148
154
@@ -154,6 +160,8 @@ def get_client_order_id(self, client_order_id_int: int) -> ClientOrderId:
154
160
155
161
if value is not None :
156
162
return ClientOrderId (value .decode ("utf-8" ))
163
+ else :
164
+ self ._log .error (f"ClientOrderId not found in cache for integer { client_order_id_int } " )
157
165
158
166
return ClientOrderId (str (client_order_id_int ))
159
167
@@ -263,11 +271,14 @@ def __init__(
263
271
DYDXWsBlockHeightSubscribedData ,
264
272
)
265
273
self ._decoder_ws_block_height_channel = msgspec .json .Decoder (DYDXWsBlockHeightChannelData )
274
+ self ._decoder_ws_instruments = msgspec .json .Decoder (DYDXWsMarketChannelData )
275
+ self ._decoder_ws_instruments_subscribed = msgspec .json .Decoder (DYDXWsMarketSubscribedData )
266
276
267
277
# Hot caches
268
278
self ._order_builders : dict [InstrumentId , OrderBuilder ] = {}
269
279
self ._generate_order_status_retries : dict [ClientOrderId , int ] = {}
270
280
self ._block_height : int = 0
281
+ self ._oracle_prices : dict [InstrumentId , Decimal ] = {}
271
282
272
283
self ._retry_manager_pool = RetryManagerPool (
273
284
pool_size = 100 ,
@@ -288,11 +299,12 @@ async def _connect(self) -> None:
288
299
await self ._ws_client .connect ()
289
300
290
301
# Subscribe account updates
302
+ await self ._ws_client .subscribe_markets ()
303
+ await self ._ws_client .subscribe_block_height ()
291
304
await self ._ws_client .subscribe_account_update (
292
305
wallet_address = self ._wallet_address ,
293
306
subaccount_number = self ._subaccount ,
294
307
)
295
- await self ._ws_client .subscribe_block_height ()
296
308
297
309
self ._block_height = await self ._grpc_account .latest_block_height ()
298
310
@@ -304,11 +316,13 @@ async def _connect(self) -> None:
304
316
)
305
317
306
318
async def _disconnect (self ) -> None :
319
+ await self ._ws_client .unsubscribe_markets ()
320
+ await self ._ws_client .unsubscribe_block_height ()
307
321
await self ._ws_client .unsubscribe_account_update (
308
322
wallet_address = self ._wallet_address ,
309
323
subaccount_number = self ._subaccount ,
310
324
)
311
- await self . _ws_client . unsubscribe_block_height ()
325
+
312
326
await self ._ws_client .disconnect ()
313
327
await self ._grpc_account .disconnect ()
314
328
@@ -710,10 +724,14 @@ def _handle_ws_message(self, raw: bytes) -> None:
710
724
self ._handle_block_height_channel_data (raw )
711
725
elif ws_message .channel == "v4_subaccounts" and ws_message .type == "channel_data" :
712
726
self ._handle_subaccounts_channel_data (raw )
727
+ elif ws_message .channel == "v4_markets" and ws_message .type == "channel_data" :
728
+ self ._handle_markets (raw )
713
729
elif ws_message .channel == "v4_block_height" and ws_message .type == "subscribed" :
714
730
self ._handle_block_height_subscribed (raw )
715
731
elif ws_message .channel == "v4_subaccounts" and ws_message .type == "subscribed" :
716
732
self ._handle_subaccounts_subscribed (raw )
733
+ elif ws_message .channel == "v4_markets" and ws_message .type == "subscribed" :
734
+ self ._handle_markets_subscribed (raw )
717
735
elif ws_message .type == "unsubscribed" :
718
736
self ._log .info (
719
737
f"Unsubscribed from channel { ws_message .channel } for { ws_message .id } " ,
@@ -749,6 +767,30 @@ def _handle_block_height_channel_data(self, raw: bytes) -> None:
749
767
f"Failed to parse block height channel message: { raw .decode ()} with error { e } " ,
750
768
)
751
769
770
+ def _handle_markets (self , raw : bytes ) -> None :
771
+ try :
772
+ msg : DYDXWsMarketChannelData = self ._decoder_ws_instruments .decode (raw )
773
+
774
+ if msg .contents .oraclePrices is not None :
775
+ for symbol , oracle_price_market in msg .contents .oraclePrices .items ():
776
+ instrument_id = DYDXSymbol (symbol ).to_instrument_id ()
777
+ self ._oracle_prices [instrument_id ] = Decimal (oracle_price_market .oraclePrice )
778
+
779
+ except Exception as e :
780
+ self ._log .error (f"Failed to parse market data: { raw .decode ()} with error { e } " )
781
+
782
+ def _handle_markets_subscribed (self , raw : bytes ) -> None :
783
+ try :
784
+ msg : DYDXWsMarketSubscribedData = self ._decoder_ws_instruments_subscribed .decode (raw )
785
+
786
+ for symbol , oracle_price_market in msg .contents .markets .items ():
787
+ if oracle_price_market .oraclePrice is not None :
788
+ instrument_id = DYDXSymbol (symbol ).to_instrument_id ()
789
+ self ._oracle_prices [instrument_id ] = Decimal (oracle_price_market .oraclePrice )
790
+
791
+ except Exception as e :
792
+ self ._log .error (f"Failed to parse market channel data: { raw .decode ()} with error { e } " )
793
+
752
794
def _handle_subaccounts_subscribed (self , raw : bytes ) -> None :
753
795
try :
754
796
msg : DYDXWsSubaccountsSubscribed = self ._decoder_ws_msg_subaccounts_subscribed .decode (
@@ -780,6 +822,7 @@ def _handle_subaccounts_subscribed(self, raw: bytes) -> None:
780
822
margin_balance = perpetual_position .parse_margin_balance (
781
823
margin_init = instrument .margin_init ,
782
824
margin_maint = instrument .margin_maint ,
825
+ oracle_price = self ._oracle_prices .get (instrument .id ),
783
826
)
784
827
785
828
initial_margins [
0 commit comments