Skip to content

Refine margin balance report for dYdX #2154

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 1 commit into from
Dec 30, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
25 changes: 1 addition & 24 deletions nautilus_trader/adapters/dydx/data.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,8 +32,6 @@
from nautilus_trader.adapters.dydx.providers import DYDXInstrumentProvider
from nautilus_trader.adapters.dydx.schemas.ws import DYDXWsCandlesChannelData
from nautilus_trader.adapters.dydx.schemas.ws import DYDXWsCandlesSubscribedData
from nautilus_trader.adapters.dydx.schemas.ws import DYDXWsMarketChannelData
from nautilus_trader.adapters.dydx.schemas.ws import DYDXWsMarketSubscribedData
from nautilus_trader.adapters.dydx.schemas.ws import DYDXWsMessageGeneral
from nautilus_trader.adapters.dydx.schemas.ws import DYDXWsOrderbookBatchedData
from nautilus_trader.adapters.dydx.schemas.ws import DYDXWsOrderbookChannelData
Expand Down Expand Up @@ -131,8 +129,7 @@ def __init__(
self._decoder_ws_trade = msgspec.json.Decoder(DYDXWsTradeChannelData)
self._decoder_ws_kline = msgspec.json.Decoder(DYDXWsCandlesChannelData)
self._decoder_ws_kline_subscribed = msgspec.json.Decoder(DYDXWsCandlesSubscribedData)
self._decoder_ws_instruments = msgspec.json.Decoder(DYDXWsMarketChannelData)
self._decoder_ws_instruments_subscribed = msgspec.json.Decoder(DYDXWsMarketSubscribedData)

self._ws_client = DYDXWebsocketClient(
clock=clock,
handler=self._handle_ws_message,
Expand Down Expand Up @@ -242,8 +239,6 @@ def _handle_ws_message(self, raw: bytes) -> None:
("v4_trades", "subscribed"): self._handle_trade_subscribed,
("v4_candles", "channel_data"): self._handle_kline,
("v4_candles", "subscribed"): self._handle_kline_subscribed,
("v4_markets", "channel_data"): self._handle_markets,
("v4_markets", "subscribed"): self._handle_markets_subscribed,
}
try:
ws_message = self._decoder_ws_msg_general.decode(raw)
Expand Down Expand Up @@ -665,24 +660,6 @@ def _handle_kline_unsubscribed(self, msg: DYDXWsMessageGeneral) -> None:
if msg.id is not None:
self._topic_bar_type.pop(msg.id, None)

def _handle_markets(self, raw: bytes) -> None:
try:
msg: DYDXWsMarketChannelData = self._decoder_ws_instruments.decode(raw)

self._log.debug(f"{msg}")

except Exception as e:
self._log.error(f"Failed to parse market data: {raw.decode()} with error {e}")

def _handle_markets_subscribed(self, raw: bytes) -> None:
try:
msg: DYDXWsMarketSubscribedData = self._decoder_ws_instruments_subscribed.decode(raw)

self._log.debug(f"{msg}")

except Exception as e:
self._log.error(f"Failed to parse market channel data: {raw.decode()} with error {e}")

async def _subscribe_trade_ticks(
self,
instrument_id: InstrumentId,
Expand Down
47 changes: 45 additions & 2 deletions nautilus_trader/adapters/dydx/execution.py
Original file line number Diff line number Diff line change
Expand Up @@ -53,13 +53,16 @@
from nautilus_trader.adapters.dydx.schemas.ws import DYDXWsBlockHeightChannelData
from nautilus_trader.adapters.dydx.schemas.ws import DYDXWsBlockHeightSubscribedData
from nautilus_trader.adapters.dydx.schemas.ws import DYDXWsFillSubaccountMessageContents
from nautilus_trader.adapters.dydx.schemas.ws import DYDXWsMarketChannelData
from nautilus_trader.adapters.dydx.schemas.ws import DYDXWsMarketSubscribedData
from nautilus_trader.adapters.dydx.schemas.ws import DYDXWsMessageGeneral
from nautilus_trader.adapters.dydx.schemas.ws import DYDXWsOrderSubaccountMessageContents
from nautilus_trader.adapters.dydx.schemas.ws import DYDXWsSubaccountsChannelData
from nautilus_trader.adapters.dydx.schemas.ws import DYDXWsSubaccountsSubscribed
from nautilus_trader.adapters.dydx.websocket.client import DYDXWebsocketClient
from nautilus_trader.cache.cache import Cache
from nautilus_trader.common.component import LiveClock
from nautilus_trader.common.component import Logger
from nautilus_trader.common.component import MessageBus
from nautilus_trader.common.enums import LogColor
from nautilus_trader.core.correctness import PyCondition
Expand Down Expand Up @@ -110,6 +113,7 @@ def __init__(self, cache: Cache) -> None:
Generate integer client order IDs.
"""
self._cache = cache
self._log: Logger = Logger(type(self).__name__)

def generate_client_order_id_int(self, client_order_id: ClientOrderId) -> int:
"""
Expand Down Expand Up @@ -143,6 +147,8 @@ def get_client_order_id_int(self, client_order_id: ClientOrderId) -> int | None:

if value is not None:
result = int.from_bytes(value, byteorder="big")
else:
self._log.error(f"ClientOrderId integer not found in cache for {client_order_id!r}")

return result

Expand All @@ -154,6 +160,8 @@ def get_client_order_id(self, client_order_id_int: int) -> ClientOrderId:

if value is not None:
return ClientOrderId(value.decode("utf-8"))
else:
self._log.error(f"ClientOrderId not found in cache for integer {client_order_id_int}")

return ClientOrderId(str(client_order_id_int))

Expand Down Expand Up @@ -263,11 +271,14 @@ def __init__(
DYDXWsBlockHeightSubscribedData,
)
self._decoder_ws_block_height_channel = msgspec.json.Decoder(DYDXWsBlockHeightChannelData)
self._decoder_ws_instruments = msgspec.json.Decoder(DYDXWsMarketChannelData)
self._decoder_ws_instruments_subscribed = msgspec.json.Decoder(DYDXWsMarketSubscribedData)

# Hot caches
self._order_builders: dict[InstrumentId, OrderBuilder] = {}
self._generate_order_status_retries: dict[ClientOrderId, int] = {}
self._block_height: int = 0
self._oracle_prices: dict[InstrumentId, Decimal] = {}

self._retry_manager_pool = RetryManagerPool(
pool_size=100,
Expand All @@ -288,11 +299,12 @@ async def _connect(self) -> None:
await self._ws_client.connect()

# Subscribe account updates
await self._ws_client.subscribe_markets()
await self._ws_client.subscribe_block_height()
await self._ws_client.subscribe_account_update(
wallet_address=self._wallet_address,
subaccount_number=self._subaccount,
)
await self._ws_client.subscribe_block_height()

self._block_height = await self._grpc_account.latest_block_height()

Expand All @@ -304,11 +316,13 @@ async def _connect(self) -> None:
)

async def _disconnect(self) -> None:
await self._ws_client.unsubscribe_markets()
await self._ws_client.unsubscribe_block_height()
await self._ws_client.unsubscribe_account_update(
wallet_address=self._wallet_address,
subaccount_number=self._subaccount,
)
await self._ws_client.unsubscribe_block_height()

await self._ws_client.disconnect()
await self._grpc_account.disconnect()

Expand Down Expand Up @@ -710,10 +724,14 @@ def _handle_ws_message(self, raw: bytes) -> None:
self._handle_block_height_channel_data(raw)
elif ws_message.channel == "v4_subaccounts" and ws_message.type == "channel_data":
self._handle_subaccounts_channel_data(raw)
elif ws_message.channel == "v4_markets" and ws_message.type == "channel_data":
self._handle_markets(raw)
elif ws_message.channel == "v4_block_height" and ws_message.type == "subscribed":
self._handle_block_height_subscribed(raw)
elif ws_message.channel == "v4_subaccounts" and ws_message.type == "subscribed":
self._handle_subaccounts_subscribed(raw)
elif ws_message.channel == "v4_markets" and ws_message.type == "subscribed":
self._handle_markets_subscribed(raw)
elif ws_message.type == "unsubscribed":
self._log.info(
f"Unsubscribed from channel {ws_message.channel} for {ws_message.id}",
Expand Down Expand Up @@ -749,6 +767,30 @@ def _handle_block_height_channel_data(self, raw: bytes) -> None:
f"Failed to parse block height channel message: {raw.decode()} with error {e}",
)

def _handle_markets(self, raw: bytes) -> None:
try:
msg: DYDXWsMarketChannelData = self._decoder_ws_instruments.decode(raw)

if msg.contents.oraclePrices is not None:
for symbol, oracle_price_market in msg.contents.oraclePrices.items():
instrument_id = DYDXSymbol(symbol).to_instrument_id()
self._oracle_prices[instrument_id] = Decimal(oracle_price_market.oraclePrice)

except Exception as e:
self._log.error(f"Failed to parse market data: {raw.decode()} with error {e}")

def _handle_markets_subscribed(self, raw: bytes) -> None:
try:
msg: DYDXWsMarketSubscribedData = self._decoder_ws_instruments_subscribed.decode(raw)

for symbol, oracle_price_market in msg.contents.markets.items():
if oracle_price_market.oraclePrice is not None:
instrument_id = DYDXSymbol(symbol).to_instrument_id()
self._oracle_prices[instrument_id] = Decimal(oracle_price_market.oraclePrice)

except Exception as e:
self._log.error(f"Failed to parse market channel data: {raw.decode()} with error {e}")

def _handle_subaccounts_subscribed(self, raw: bytes) -> None:
try:
msg: DYDXWsSubaccountsSubscribed = self._decoder_ws_msg_subaccounts_subscribed.decode(
Expand Down Expand Up @@ -780,6 +822,7 @@ def _handle_subaccounts_subscribed(self, raw: bytes) -> None:
margin_balance = perpetual_position.parse_margin_balance(
margin_init=instrument.margin_init,
margin_maint=instrument.margin_maint,
oracle_price=self._oracle_prices.get(instrument.id),
)

initial_margins[
Expand Down
14 changes: 11 additions & 3 deletions nautilus_trader/adapters/dydx/schemas/account/address.py
Original file line number Diff line number Diff line change
Expand Up @@ -75,20 +75,28 @@ def quote_currency(self) -> str:
currency = self.market.split("-")[1]
return CURRENCY_MAP.get(currency, currency)

def parse_margin_balance(self, margin_init: Decimal, margin_maint: Decimal) -> MarginBalance:
def parse_margin_balance(
self,
margin_init: Decimal,
margin_maint: Decimal,
oracle_price: Decimal | None = None,
) -> MarginBalance:
"""
Parse the position message into a margin balance report.
"""
currency = Currency.from_str(self.quote_currency())

if self.status == DYDXPerpetualPositionStatus.OPEN:
if oracle_price is None:
oracle_price = Decimal(self.entryPrice)

return MarginBalance(
initial=Money(
margin_init * abs(Decimal(self.size)) * Decimal(self.entryPrice),
margin_init * abs(Decimal(self.size)) * oracle_price,
currency,
),
maintenance=Money(
margin_maint * abs(Decimal(self.size)) * Decimal(self.entryPrice),
margin_maint * abs(Decimal(self.size)) * oracle_price,
currency,
),
)
Expand Down
Loading