Skip to content

Contingent orders handling for order matching engine #2404

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 3 commits into from
Mar 4, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 1 addition & 3 deletions crates/backtest/src/exchange.rs
Original file line number Diff line number Diff line change
Expand Up @@ -573,14 +573,12 @@ impl SimulatedExchange {
TradingCommand::BatchCancelOrders(ref command) => {
matching_engine.process_batch_cancel(command, account_id);
}
TradingCommand::QueryOrder(ref command) => {
matching_engine.process_query_order(command, account_id);
}
TradingCommand::SubmitOrderList(mut command) => {
for order in &mut command.order_list.orders {
matching_engine.process_order(order, account_id);
}
}
_ => {}
}
} else {
panic!("Matching engine should be initialized");
Expand Down
9 changes: 8 additions & 1 deletion crates/common/src/cache/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1229,7 +1229,7 @@ impl Cache {
pub fn add_position_id(
&mut self,
position_id: &PositionId,
_venue: &Venue,
venue: &Venue,
client_order_id: &ClientOrderId,
strategy_id: &StrategyId,
) -> anyhow::Result<()> {
Expand Down Expand Up @@ -1261,6 +1261,13 @@ impl Cache {
.or_default()
.insert(*position_id);

// Index: Venue -> set[PositionId]
self.index
.venue_positions
.entry(*venue)
.or_default()
.insert(*position_id);

Ok(())
}

Expand Down
152 changes: 145 additions & 7 deletions crates/execution/src/matching_engine/engine.rs
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ use ustr::Ustr;
use crate::{
matching_core::OrderMatchingCore,
matching_engine::{config::OrderMatchingEngineConfig, ids_generator::IdsGenerator},
messages::{BatchCancelOrders, CancelAllOrders, CancelOrder, ModifyOrder, QueryOrder},
messages::{BatchCancelOrders, CancelAllOrders, CancelOrder, ModifyOrder},
models::{
fee::{FeeModel, FeeModelAny},
fill::FillModel,
Expand Down Expand Up @@ -766,10 +766,6 @@ impl OrderMatchingEngine {
}
}

pub fn process_query_order(&self, command: &QueryOrder, account_id: AccountId) {
todo!("implement process_query_order")
}

fn process_market_order(&mut self, order: &mut OrderAny) {
if order.time_in_force() == TimeInForce::AtTheOpen
|| order.time_in_force() == TimeInForce::AtTheClose
Expand Down Expand Up @@ -1531,7 +1527,123 @@ impl OrderMatchingEngine {
return;
}

todo!("Check for contingent orders")
if let Some(contingency_type) = order.contingency_type() {
match contingency_type {
ContingencyType::Oto => {
if let Some(linked_orders_ids) = order.linked_order_ids() {
for client_order_id in &linked_orders_ids {
let mut child_order = match self.cache.borrow().order(client_order_id) {
Some(child_order) => child_order.clone(),
None => panic!("Order {client_order_id} not found in cache"),
};

if child_order.is_closed() || child_order.is_active_local() {
continue;
}

// Check if we need to index position id
if let (None, Some(position_id)) =
(child_order.position_id(), order.position_id())
{
self.cache
.borrow_mut()
.add_position_id(
&position_id,
&self.venue,
client_order_id,
&child_order.strategy_id(),
)
.unwrap();
log::debug!(
"Added position id {} to cache for order {}",
position_id,
client_order_id
)
}

if (!child_order.is_open())
|| (matches!(child_order.status(), OrderStatus::PendingUpdate)
&& child_order
.previous_status()
.is_some_and(|s| matches!(s, OrderStatus::Submitted)))
{
let account_id = order.account_id().unwrap_or_else(|| {
*self.account_ids.get(&order.trader_id()).unwrap_or_else(|| {
panic!(
"Account ID not found for trader {}",
order.trader_id()
)
})
});
self.process_order(&mut child_order, account_id);
}
}
} else {
log::error!(
"OTO order {} does not have linked orders",
order.client_order_id()
);
}
}
ContingencyType::Oco => {
if let Some(linked_orders_ids) = order.linked_order_ids() {
for client_order_id in &linked_orders_ids {
let child_order = match self.cache.borrow().order(client_order_id) {
Some(child_order) => child_order.clone(),
None => panic!("Order {client_order_id} not found in cache"),
};

if child_order.is_closed() || child_order.is_active_local() {
continue;
}

self.cancel_order(&child_order, None);
}
} else {
log::error!(
"OCO order {} does not have linked orders",
order.client_order_id()
);
}
}
ContingencyType::Ouo => {
if let Some(linked_orders_ids) = order.linked_order_ids() {
for client_order_id in &linked_orders_ids {
let mut child_order = match self.cache.borrow().order(client_order_id) {
Some(child_order) => child_order.clone(),
None => panic!("Order {client_order_id} not found in cache"),
};

if child_order.is_active_local() {
continue;
}

if order.is_closed() && child_order.is_open() {
self.cancel_order(&child_order, None);
} else if !order.leaves_qty().is_zero()
&& order.leaves_qty() != child_order.leaves_qty()
{
let price = child_order.price();
let trigger_price = child_order.trigger_price();
self.update_order(
&mut child_order,
Some(order.leaves_qty()),
price,
trigger_price,
Some(false),
)
}
}
} else {
log::error!(
"OUO order {} does not have linked orders",
order.client_order_id()
);
}
}
_ => {}
}
}
}

fn update_limit_order(&mut self, order: &mut OrderAny, quantity: Quantity, price: Price) {
Expand Down Expand Up @@ -1942,7 +2054,33 @@ impl OrderMatchingEngine {
}

fn update_contingent_order(&mut self, order: &OrderAny) {
todo!("update_contingent_order")
log::debug!("Updating OUO orders from {}", order.client_order_id());
if let Some(linked_order_ids) = order.linked_order_ids() {
for client_order_id in &linked_order_ids {
let mut child_order = match self.cache.borrow().order(client_order_id) {
Some(order) => order.clone(),
None => panic!("Order {client_order_id} not found in cache."),
};

if child_order.is_active_local() {
continue;
}

if order.leaves_qty().is_zero() {
self.cancel_order(&child_order, None);
} else if child_order.leaves_qty() != order.leaves_qty() {
let price = child_order.price();
let trigger_price = child_order.trigger_price();
self.update_order(
&mut child_order,
Some(order.leaves_qty()),
price,
trigger_price,
Some(false),
)
}
}
}
}

fn cancel_contingent_orders(&mut self, order: &OrderAny) {
Expand Down
124 changes: 124 additions & 0 deletions crates/execution/src/matching_engine/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2726,3 +2726,127 @@ fn test_updating_of_trailing_stop_market_order_with_no_trigger_price_set(
assert_eq!(order_updated.client_order_id, client_order_id);
assert_eq!(order_updated.trigger_price.unwrap(), Price::from("1481.00"));
}

#[rstest]
fn test_updating_of_contingent_orders(
instrument_eth_usdt: InstrumentAny,
mut msgbus: MessageBus,
order_event_handler: ShareableMessageHandler,
account_id: AccountId,
) {
msgbus.register(
msgbus.switchboard.exec_engine_process,
order_event_handler.clone(),
);
let cache = Rc::new(RefCell::new(Cache::default()));
// Create order matching engine which supports contingent orders
let mut engine_config = OrderMatchingEngineConfig::default();
engine_config.support_contingent_orders = true;
let mut engine_l2 = get_order_matching_engine_l2(
instrument_eth_usdt.clone(),
Rc::new(RefCell::new(msgbus)),
Some(cache.clone()),
None,
Some(engine_config),
);

let orderbook_delta_sell = OrderBookDeltaTestBuilder::new(instrument_eth_usdt.id())
.book_action(BookAction::Add)
.book_order(BookOrder::new(
OrderSide::Sell,
Price::from("1500.00"),
Quantity::from("1.000"),
1,
))
.build();
engine_l2.process_order_book_delta(&orderbook_delta_sell);

// Create primary limit order and StopMarket OUO orders
// and link them together
let client_order_id_primary = ClientOrderId::from("O-19700101-000000-001-001-1");
let client_order_id_contingent = ClientOrderId::from("O-19700101-000000-001-001-2");
let mut primary_order = OrderTestBuilder::new(OrderType::Limit)
.instrument_id(instrument_eth_usdt.id())
.side(OrderSide::Buy)
.price(Price::from("1495.00"))
.quantity(Quantity::from("1.000"))
.client_order_id(client_order_id_primary)
.contingency_type(ContingencyType::Ouo)
.linked_order_ids(vec![client_order_id_contingent])
.submit(true)
.build();
let mut contingent_stop_market_order = OrderTestBuilder::new(OrderType::StopMarket)
.instrument_id(instrument_eth_usdt.id())
.side(OrderSide::Sell)
.trigger_price(Price::from("1500.00"))
.quantity(Quantity::from("1.000"))
.client_order_id(client_order_id_contingent)
.linked_order_ids(vec![client_order_id_primary])
.contingency_type(ContingencyType::Ouo)
.submit(true)
.build();

// Save orders to cache and process it by engine
cache
.borrow_mut()
.add_order(primary_order.clone(), None, None, false)
.unwrap();
cache
.borrow_mut()
.add_order(contingent_stop_market_order.clone(), None, None, false)
.unwrap();
engine_l2.process_order(&mut primary_order, account_id);

engine_l2.process_order(&mut contingent_stop_market_order, account_id);

// Modify primary order quantity to 2.000 which will trigger the contingent order
// update of the same quantity
let modify_order_command = ModifyOrder::new(
TraderId::from("TRADER-001"),
ClientId::from("CLIENT-001"),
StrategyId::from("STRATEGY-001"),
instrument_eth_usdt.id(),
client_order_id_primary,
VenueOrderId::from("V1"),
Some(Quantity::from("2.000")),
None,
None,
UUID4::new(),
UnixNanos::default(),
);
engine_l2.process_modify(&modify_order_command.unwrap(), account_id);

// Check that we have received following sequence of events
// 1. OrderAccepted for primary limit order
// 2. OrderAccepted for contingent stop market order
// 3. OrderUpdated for primary limit order with new quantity of 2.000
// 4. OrderUpdated for contingent stop market order with new quantity of 2.000
let saved_messages = get_order_event_handler_messages(order_event_handler);
assert_eq!(saved_messages.len(), 4);
let order_event_first = saved_messages.first().unwrap();
let order_accepted = match order_event_first {
OrderEventAny::Accepted(order_accepted) => order_accepted,
_ => panic!("Expected OrderAccepted event in first message"),
};
assert_eq!(order_accepted.client_order_id, client_order_id_primary);
let order_event_second = saved_messages.get(1).unwrap();
let order_accepted = match order_event_second {
OrderEventAny::Accepted(order_accepted) => order_accepted,
_ => panic!("Expected OrderAccepted event in second message"),
};
assert_eq!(order_accepted.client_order_id, client_order_id_contingent);
let order_event_third = saved_messages.get(2).unwrap();
let order_updated = match order_event_third {
OrderEventAny::Updated(order_updated) => order_updated,
_ => panic!("Expected OrderUpdated event in third message"),
};
assert_eq!(order_updated.client_order_id, client_order_id_primary);
assert_eq!(order_updated.quantity, Quantity::from("2.000"));
let order_event_fourth = saved_messages.get(3).unwrap();
let order_updated = match order_event_fourth {
OrderEventAny::Updated(order_updated) => order_updated,
_ => panic!("Expected OrderUpdated event in fourth message"),
};
assert_eq!(order_updated.client_order_id, client_order_id_contingent);
assert_eq!(order_updated.quantity, Quantity::from("2.000"));
}
15 changes: 15 additions & 0 deletions crates/model/src/orders/any.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1009,6 +1009,21 @@ impl OrderAny {
}
}

#[must_use]
pub fn previous_status(&self) -> Option<OrderStatus> {
match self {
Self::Limit(order) => order.previous_status,
Self::LimitIfTouched(order) => order.previous_status,
Self::Market(order) => order.previous_status,
Self::MarketIfTouched(order) => order.previous_status,
Self::MarketToLimit(order) => order.previous_status,
Self::StopLimit(order) => order.previous_status,
Self::StopMarket(order) => order.previous_status,
Self::TrailingStopLimit(order) => order.previous_status,
Self::TrailingStopMarket(order) => order.previous_status,
}
}

pub fn set_position_id(&mut self, position_id: Option<PositionId>) {
match self {
Self::Limit(order) => order.position_id = position_id,
Expand Down
Loading