Skip to content

Commit e9b9f77

Browse files
committed
Fix consistent ordering of execution events
- Consolidated event publishing and snapshotting in ExecutionEngine - Consolidated account state updates in Portfolio and simplified subscriptions - Downstream components will now receive consistently ordered event types
1 parent 359b27d commit e9b9f77

File tree

11 files changed

+86
-73
lines changed

11 files changed

+86
-73
lines changed

RELEASES.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -37,6 +37,7 @@ Released on TBD (UTC).
3737
- Upgraded `pyo3` crate to v0.24.2
3838

3939
### Fixes
40+
- Fixed consistent ordering of execution events (#2513), thanks for reporting @stastnypremysl
4041
- Fixed memory leak in `RetryManager` by simplifying the acquire-release pattern, avoiding the asynchronous context manager protocol that led to state sharing, thanks for reporting @DeevsDeevs
4142
- Fixed locked balance and initial margin calculations for reduce-only orders (#2505), thanks for reporting @stastnypremysl
4243
- Fixed purging order events from position (these needed to be purged prior to removing cache index entry), thanks @DeevsDeevs

nautilus_trader/execution/engine.pxd

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,6 @@ from nautilus_trader.common.component cimport TimeEvent
1919
from nautilus_trader.common.generators cimport PositionIdGenerator
2020
from nautilus_trader.core.rust.model cimport OmsType
2121
from nautilus_trader.core.rust.model cimport OrderSide
22-
from nautilus_trader.execution.algorithm cimport ExecAlgorithm
2322
from nautilus_trader.execution.client cimport ExecutionClient
2423
from nautilus_trader.execution.messages cimport BatchCancelOrders
2524
from nautilus_trader.execution.messages cimport CancelAllOrders
@@ -31,6 +30,7 @@ from nautilus_trader.execution.messages cimport SubmitOrderList
3130
from nautilus_trader.execution.messages cimport TradingCommand
3231
from nautilus_trader.model.events.order cimport OrderEvent
3332
from nautilus_trader.model.events.order cimport OrderFilled
33+
from nautilus_trader.model.events.position cimport PositionEvent
3434
from nautilus_trader.model.identifiers cimport InstrumentId
3535
from nautilus_trader.model.identifiers cimport PositionId
3636
from nautilus_trader.model.identifiers cimport StrategyId
@@ -52,6 +52,7 @@ cdef class ExecutionEngine(Component):
5252
cdef readonly dict[StrategyId, OmsType] _oms_overrides
5353
cdef readonly dict[InstrumentId, StrategyId] _external_order_claims
5454
cdef readonly str snapshot_positions_timer_name
55+
cdef list[PositionEvent] _pending_position_events
5556

5657
cdef readonly bint debug
5758
"""If debug mode is active (will provide extra debug logging).\n\n:returns: `bool`"""

nautilus_trader/execution/engine.pyx

Lines changed: 26 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -22,8 +22,7 @@ includes sending commands to, and receiving events from, the trading venue
2222
endpoints via its registered execution clients.
2323
2424
The engine employs a simple fan-in fan-out messaging pattern to execute
25-
`TradingCommand` messages, and process `AccountState` or `OrderEvent` type
26-
messages.
25+
`TradingCommand` messages and `OrderEvent` messages.
2726
2827
Alternative implementations can be written on top of the generic engine - which
2928
just need to override the `execute` and `process` methods.
@@ -48,7 +47,6 @@ from nautilus_trader.common.component cimport RECV
4847
from nautilus_trader.common.component cimport Clock
4948
from nautilus_trader.common.component cimport Component
5049
from nautilus_trader.common.component cimport LogColor
51-
from nautilus_trader.common.component cimport Logger
5250
from nautilus_trader.common.component cimport MessageBus
5351
from nautilus_trader.common.component cimport TimeEvent
5452
from nautilus_trader.common.generators cimport PositionIdGenerator
@@ -57,12 +55,9 @@ from nautilus_trader.core.fsm cimport InvalidStateTrigger
5755
from nautilus_trader.core.rust.core cimport secs_to_nanos
5856
from nautilus_trader.core.rust.model cimport ContingencyType
5957
from nautilus_trader.core.rust.model cimport OmsType
60-
from nautilus_trader.core.rust.model cimport OrderStatus
61-
from nautilus_trader.core.rust.model cimport OrderType
58+
from nautilus_trader.core.rust.model cimport OrderSide
6259
from nautilus_trader.core.rust.model cimport PositionSide
63-
from nautilus_trader.core.rust.model cimport TimeInForce
6460
from nautilus_trader.core.uuid cimport UUID4
65-
from nautilus_trader.execution.algorithm cimport ExecAlgorithm
6661
from nautilus_trader.execution.client cimport ExecutionClient
6762
from nautilus_trader.execution.messages cimport BatchCancelOrders
6863
from nautilus_trader.execution.messages cimport CancelAllOrders
@@ -91,11 +86,11 @@ from nautilus_trader.model.identifiers cimport PositionId
9186
from nautilus_trader.model.identifiers cimport StrategyId
9287
from nautilus_trader.model.identifiers cimport Venue
9388
from nautilus_trader.model.instruments.base cimport Instrument
94-
from nautilus_trader.model.instruments.currency_pair cimport CurrencyPair
9589
from nautilus_trader.model.objects cimport Money
9690
from nautilus_trader.model.objects cimport Price
9791
from nautilus_trader.model.objects cimport Quantity
9892
from nautilus_trader.model.orders.base cimport Order
93+
from nautilus_trader.trading.strategy cimport Strategy
9994

10095

10196
cdef class ExecutionEngine(Component):
@@ -151,6 +146,8 @@ cdef class ExecutionEngine(Component):
151146
clock=clock,
152147
)
153148

149+
self._pending_position_events = []
150+
154151
# Configuration
155152
self.debug: bool = config.debug
156153
self.manage_own_order_books = config.manage_own_order_books
@@ -1050,6 +1047,25 @@ cdef class ExecutionEngine(Component):
10501047
else:
10511048
self._apply_event_to_order(order, event)
10521049

1050+
# Publish events
1051+
self._msgbus.publish_c(
1052+
topic=f"events.order.{event.strategy_id}",
1053+
msg=event,
1054+
)
1055+
if self.snapshot_orders:
1056+
self._create_order_state_snapshot(order)
1057+
1058+
for pos_event in self._pending_position_events:
1059+
self._msgbus.publish_c(
1060+
topic=f"events.position.{pos_event.strategy_id}",
1061+
msg=pos_event,
1062+
)
1063+
if self.snapshot_positions:
1064+
position = self.cache.position(pos_event.position_id)
1065+
self._create_position_state_snapshot(position, open_only=isinstance(pos_event, PositionOpened))
1066+
1067+
self._pending_position_events.clear()
1068+
10531069
cpdef OmsType _determine_oms_type(self, OrderFilled fill):
10541070
cdef ExecutionClient client
10551071
# Check for strategy OMS override
@@ -1171,13 +1187,6 @@ cdef class ExecutionEngine(Component):
11711187

11721188
self._cache.update_order(order)
11731189

1174-
self._msgbus.publish_c(
1175-
topic=f"events.order.{event.strategy_id}",
1176-
msg=event,
1177-
)
1178-
if self.snapshot_orders:
1179-
self._create_order_state_snapshot(order)
1180-
11811190
cpdef void _handle_order_fill(self, Order order, OrderFilled fill, OmsType oms_type):
11821191
cdef Instrument instrument = self._cache.load_instrument(fill.instrument_id)
11831192
if instrument is None:
@@ -1222,8 +1231,6 @@ cdef class ExecutionEngine(Component):
12221231
if position is None:
12231232
position = Position(instrument, fill)
12241233
self._cache.add_position(position, oms_type)
1225-
if self.snapshot_positions:
1226-
self._create_position_state_snapshot(position, open_only=True)
12271234
else:
12281235
try:
12291236
# Always snapshot opening positions to handle NETTING OMS
@@ -1242,10 +1249,7 @@ cdef class ExecutionEngine(Component):
12421249
ts_init=self._clock.timestamp_ns(),
12431250
)
12441251

1245-
self._msgbus.publish_c(
1246-
topic=f"events.position.{event.strategy_id}",
1247-
msg=event,
1248-
)
1252+
self._pending_position_events.append(event)
12491253

12501254
return position
12511255

@@ -1258,8 +1262,6 @@ cdef class ExecutionEngine(Component):
12581262
return # Not re-raising to avoid crashing engine
12591263

12601264
self._cache.update_position(position)
1261-
if self.snapshot_positions:
1262-
self._create_position_state_snapshot(position, open_only=False)
12631265

12641266
cdef PositionEvent event
12651267
if position.is_closed_c():
@@ -1277,10 +1279,7 @@ cdef class ExecutionEngine(Component):
12771279
ts_init=self._clock.timestamp_ns(),
12781280
)
12791281

1280-
self._msgbus.publish_c(
1281-
topic=f"events.position.{event.strategy_id}",
1282-
msg=event,
1283-
)
1282+
self._pending_position_events.append(event)
12841283

12851284
cpdef bint _will_flip_position(self, Position position, OrderFilled fill):
12861285
return (

nautilus_trader/portfolio/portfolio.pxd

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,8 @@
1313
# limitations under the License.
1414
# -------------------------------------------------------------------------------------------------
1515

16+
from libc.stdint cimport uint64_t
17+
1618
from nautilus_trader.accounting.accounts.base cimport Account
1719
from nautilus_trader.accounting.manager cimport AccountsManager
1820
from nautilus_trader.cache.cache cimport Cache
@@ -25,6 +27,7 @@ from nautilus_trader.model.data cimport QuoteTick
2527
from nautilus_trader.model.events.account cimport AccountState
2628
from nautilus_trader.model.events.order cimport OrderEvent
2729
from nautilus_trader.model.events.position cimport PositionEvent
30+
from nautilus_trader.model.identifiers cimport AccountId
2831
from nautilus_trader.model.identifiers cimport InstrumentId
2932
from nautilus_trader.model.identifiers cimport PositionId
3033
from nautilus_trader.model.identifiers cimport Venue
@@ -74,6 +77,7 @@ cdef class Portfolio(PortfolioFacade):
7477

7578
# -- INTERNAL -------------------------------------------------------------------------------------
7679

80+
cdef AccountState _update_position(self, InstrumentId instrument_id, AccountId account_id, uint64_t ts_event)
7781
cdef object _net_position(self, InstrumentId instrument_id)
7882
cdef void _update_instrument_id(self, InstrumentId instrument_id)
7983
cdef void _update_net_position(self, InstrumentId instrument_id, list positions_open)

nautilus_trader/portfolio/portfolio.pyx

Lines changed: 22 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,8 @@ The portfolio can satisfy queries for account information, margin balances,
2424
total risk exposures and total net positions.
2525
"""
2626

27+
from libc.stdint cimport uint64_t
28+
2729
from collections import defaultdict
2830
from decimal import Decimal
2931

@@ -56,6 +58,7 @@ from nautilus_trader.model.events.order cimport OrderRejected
5658
from nautilus_trader.model.events.order cimport OrderUpdated
5759
from nautilus_trader.model.events.position cimport PositionEvent
5860
from nautilus_trader.model.functions cimport position_side_to_str
61+
from nautilus_trader.model.identifiers cimport AccountId
5962
from nautilus_trader.model.identifiers cimport InstrumentId
6063
from nautilus_trader.model.identifiers cimport PositionId
6164
from nautilus_trader.model.identifiers cimport Venue
@@ -167,7 +170,6 @@ cdef class Portfolio(PortfolioFacade):
167170

168171
# Required subscriptions
169172
self._msgbus.subscribe(topic="events.order.*", handler=self.update_order, priority=10)
170-
self._msgbus.subscribe(topic="events.position.*", handler=self.update_position, priority=10)
171173
self._msgbus.subscribe(topic="events.account.*", handler=self.update_account, priority=10)
172174

173175
if config.use_mark_prices:
@@ -526,6 +528,11 @@ cdef class Portfolio(PortfolioFacade):
526528
ts_event=event.ts_event,
527529
)
528530

531+
if isinstance(event, OrderFilled):
532+
maybe_account_state = self._update_position(event.instrument_id, event.account_id, event.ts_event)
533+
if maybe_account_state is not None:
534+
account_state = maybe_account_state
535+
529536
if account_state is None:
530537
self._log.debug(f"Added pending calculation for {instrument.id}")
531538
self._pending_calcs.add(instrument.id)
@@ -547,48 +554,49 @@ cdef class Portfolio(PortfolioFacade):
547554
The event to update with.
548555
549556
"""
550-
Condition.not_none(event, "event")
557+
self._update_position(event.instrument_id, event.account_id, event.ts_event)
551558

559+
cdef AccountState _update_position(self, InstrumentId instrument_id, AccountId account_id, uint64_t ts_event):
552560
cdef list positions_open = self._cache.positions_open(
553561
venue=None, # Faster query filtering
554-
instrument_id=event.instrument_id,
562+
instrument_id=instrument_id,
555563
)
556564
self._update_net_position(
557-
instrument_id=event.instrument_id,
565+
instrument_id=instrument_id,
558566
positions_open=positions_open,
559567
)
560568

561-
self._realized_pnls[event.instrument_id] = self._calculate_realized_pnl(
562-
instrument_id=event.instrument_id,
569+
self._realized_pnls[instrument_id] = self._calculate_realized_pnl(
570+
instrument_id=instrument_id,
563571
)
564-
self._unrealized_pnls[event.instrument_id] = self._calculate_unrealized_pnl(
565-
instrument_id=event.instrument_id,
572+
self._unrealized_pnls[instrument_id] = self._calculate_unrealized_pnl(
573+
instrument_id=instrument_id,
566574
)
567575

568-
cdef Account account = self._cache.account(event.account_id)
576+
cdef Account account = self._cache.account(account_id)
569577
if account is None:
570578
self._log.error(
571579
f"Cannot update position: "
572-
f"no account registered for {event.account_id}",
580+
f"no account registered for {account_id}",
573581
)
574582
return # No account registered
575583

576584
if account.type != AccountType.MARGIN or not account.calculate_account_state:
577585
return # Nothing to calculate
578586

579-
cdef Instrument instrument = self._cache.instrument(event.instrument_id)
587+
cdef Instrument instrument = self._cache.instrument(instrument_id)
580588
if instrument is None:
581589
self._log.error(
582590
f"Cannot update position: "
583-
f"no instrument found for {event.instrument_id}",
591+
f"no instrument found for {instrument_id}",
584592
)
585593
return # No instrument found
586594

587-
self._accounts.update_positions(
595+
return self._accounts.update_positions(
588596
account=account,
589597
instrument=instrument,
590598
positions_open=positions_open,
591-
ts_event=event.ts_event,
599+
ts_event=ts_event,
592600
)
593601

594602
def _reset(self) -> None:

0 commit comments

Comments
 (0)