Skip to content

Commit 1cca1b9

Browse files
committed
Add PNL updates subscriptions
1 parent de4d7b7 commit 1cca1b9

File tree

3 files changed

+91
-2
lines changed

3 files changed

+91
-2
lines changed

async_rithmic/client.py

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -49,6 +49,10 @@ def __init__(
4949
self.on_historical_tick = Event()
5050
self.on_historical_time_bar = Event()
5151

52+
# PNL events
53+
self.on_instrument_pnl_update = Event()
54+
self.on_account_pnl_update = Event()
55+
5256
if "://" not in url:
5357
url = f"wss://{url}"
5458

async_rithmic/plants/pnl.py

Lines changed: 45 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -18,9 +18,16 @@ def _ib_id(self):
1818

1919
async def _login(self):
2020
await super()._login()
21-
await self._subscribe_to_position_updates()
2221

23-
async def _subscribe_to_position_updates(self):
22+
if len(self._subscriptions["pnl"]) > 0:
23+
await self.subscribe_to_pnl_updates()
24+
25+
async def subscribe_to_pnl_updates(self):
26+
"""
27+
Subscribe to instruments/accounts PNL updates
28+
"""
29+
self._subscriptions["pnl"].add(1)
30+
2431
for account in self._accounts:
2532
await self._send_request(
2633
template_id=400,
@@ -30,16 +37,52 @@ async def _subscribe_to_position_updates(self):
3037
request=pb.request_pnl_position_updates_pb2.RequestPnLPositionUpdates.Request.SUBSCRIBE
3138
)
3239

40+
async def unsubscribe_from_pnl_updates(self):
41+
"""
42+
Unsubscribe from instruments/accounts PNL updates
43+
"""
44+
self._subscriptions["pnl"].discard(1)
45+
46+
for account in self._accounts:
47+
await self._send_request(
48+
template_id=400,
49+
fcm_id=self._fcm_id,
50+
ib_id=self._ib_id,
51+
account_id=account.account_id,
52+
request=pb.request_pnl_position_updates_pb2.RequestPnLPositionUpdates.Request.UNSUBSCRIBE
53+
)
54+
3355
async def list_positions(self, **kwargs):
56+
"""
57+
Instrument PNL snapshots
58+
"""
3459
return await self._send_and_collect(
3560
template_id=402,
3661
expected_response=dict(template_id=450, is_snapshot=True),
3762
**kwargs
3863
)
3964

4065
async def list_account_summary(self, **kwargs):
66+
"""
67+
Account PNL snapshots
68+
"""
4169
return await self._send_and_collect(
4270
template_id=402,
4371
expected_response=dict(template_id=451, is_snapshot=True),
4472
**kwargs
4573
)
74+
75+
async def _process_response(self, response):
76+
if await super()._process_response(response):
77+
return True
78+
79+
if response.template_id == 450:
80+
# Instrument PNL position update
81+
await self.client.on_instrument_pnl_update.call_async(response)
82+
83+
elif response.template_id == 451:
84+
# Account PNL position update
85+
await self.client.on_account_pnl_update.call_async(response)
86+
87+
else:
88+
self.logger.warning(f"Unhandled inbound message with template_id={response.template_id}")

docs/pnl.rst

Lines changed: 42 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -13,3 +13,45 @@ Use `list_account_summary()` to retrieve the PNL snapshot of an account.
1313
accounts = await client.list_account_summary(account_id="1234")
1414
1515
The result is a list which contains a single object. See the `account_pnl_position_update.proto <https://github.com/rundef/async_rithmic/blob/main/async_rithmic/protocol_buffers/source/account_pnl_position_update.proto>`_ definition for field details.
16+
17+
18+
Streaming PNL updates
19+
---------------------
20+
21+
Here's an example that gets pnl updates in a streaming fashion:
22+
23+
.. code-block:: python
24+
25+
import asyncio
26+
from async_rithmic import RithmicClient
27+
28+
async def on_instrument_pnl_update(data):
29+
print("instrument_pnl_update", data)
30+
31+
async def on_account_pnl_update(data):
32+
print("account_pnl_update", data)
33+
34+
async def main():
35+
client = RithmicClient(
36+
user="",
37+
password="",
38+
system_name="Rithmic Test",
39+
app_name="my_test_app",
40+
app_version="1.0",
41+
url="rituz00100.rithmic.com:443"
42+
)
43+
await client.connect()
44+
45+
client.on_instrument_pnl_update += on_instrument_pnl_update
46+
client.on_account_pnl_update += on_account_pnl_update
47+
48+
await client.subscribe_to_pnl_updates()
49+
50+
# Wait 10 seconds, unsubscribe and disconnect
51+
await asyncio.sleep(10)
52+
await client.unsubscribe_from_pnl_updates()
53+
await client.disconnect()
54+
55+
asyncio.run(main())
56+
57+
See the `account_pnl_position_update.proto <https://github.com/rundef/async_rithmic/blob/main/async_rithmic/protocol_buffers/source/account_pnl_position_update.proto>`_ and `instrument_pnl_position_update.proto <https://github.com/rundef/async_rithmic/blob/main/async_rithmic/protocol_buffers/source/instrument_pnl_position_update.proto>`_ definitions for field details.

0 commit comments

Comments
 (0)