Skip to content

Actor framework and MessageBus v3 #2402

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 14 commits into from
Mar 6, 2025
8 changes: 4 additions & 4 deletions crates/backtest/src/exchange.rs
Original file line number Diff line number Diff line change
Expand Up @@ -620,7 +620,7 @@ mod tests {
use nautilus_common::{
cache::Cache,
msgbus::{
MessageBus,
MessageBus, register,
stubs::{get_message_saving_handler, get_saved_messages},
},
};
Expand Down Expand Up @@ -1042,10 +1042,10 @@ mod tests {
#[rstest]
fn test_accounting() {
let account_type = AccountType::Margin;
let mut msgbus = MessageBus::default();
let msgbus = MessageBus::default().register_message_bus();
let mut cache = Cache::default();
let handler = get_message_saving_handler::<AccountState>(None);
msgbus.register(Ustr::from("Portfolio.update_account"), handler.clone());
register(Ustr::from("Portfolio.update_account"), handler.clone());
let margin_account = MarginAccount::new(
AccountState::new(
AccountId::from("SIM-001"),
Expand Down Expand Up @@ -1074,7 +1074,7 @@ mod tests {
Venue::new("SIM"),
account_type,
BookType::L2_MBP,
Some(Rc::new(RefCell::new(msgbus))),
Some(msgbus.clone()),
Some(Rc::new(RefCell::new(cache))),
);
exchange.initialize_account();
Expand Down
62 changes: 61 additions & 1 deletion crates/common/src/actor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,12 +13,72 @@
// limitations under the License.
// -------------------------------------------------------------------------------------------------

use std::{
any::Any,
cell::{RefCell, UnsafeCell},
collections::HashMap,
rc::Rc,
sync::OnceLock,
};

use nautilus_core::UUID4;

use crate::messages::data::DataResponse;

/// TODO: deprecate for `MessageHandler` trait which has all the relevant functions
pub trait Actor {
pub trait Actor: Any {
fn handle(&self, resp: DataResponse); // TODO: Draft
fn id(&self) -> UUID4;
fn as_any(&self) -> &dyn Any;
}

pub struct ActorRegistry {
actors: RefCell<HashMap<UUID4, Rc<UnsafeCell<dyn Actor>>>>,
}

impl Default for ActorRegistry {
fn default() -> Self {
Self::new()
}
}

impl ActorRegistry {
pub fn new() -> Self {
Self {
actors: RefCell::new(HashMap::new()),
}
}

pub fn insert(&self, id: UUID4, actor: Rc<UnsafeCell<dyn Actor>>) {
self.actors.borrow_mut().insert(id, actor);
}

pub fn get(&self, id: &UUID4) -> Option<Rc<UnsafeCell<dyn Actor>>> {
self.actors.borrow().get(id).cloned()
}
}

// SAFETY: Actor registry is not meant to be passed between threads
unsafe impl Sync for ActorRegistry {}
unsafe impl Send for ActorRegistry {}

static ACTOR_REGISTRY: OnceLock<ActorRegistry> = OnceLock::new();

pub fn get_actor_registry() -> &'static ActorRegistry {
ACTOR_REGISTRY.get_or_init(ActorRegistry::new)
}

pub fn register_actor(actor: Rc<UnsafeCell<dyn Actor>>) {
let actor_id = unsafe { &mut *actor.get() }.id();
get_actor_registry().insert(actor_id, actor);
}

pub fn get_actor(id: &UUID4) -> Option<Rc<UnsafeCell<dyn Actor>>> {
get_actor_registry().get(id)
}

#[allow(clippy::mut_from_ref)]
pub fn get_actor_unchecked<T: Actor>(id: &UUID4) -> &mut T {
let actor = get_actor(id).unwrap_or_else(|| panic!("Actor for {} not found", id));
unsafe { &mut *(actor.get() as *mut _ as *mut T) }
}
15 changes: 15 additions & 0 deletions crates/common/src/clock.rs
Original file line number Diff line number Diff line change
Expand Up @@ -102,6 +102,8 @@ pub trait Clock {
fn next_time_ns(&self, name: &str) -> UnixNanos;
fn cancel_timer(&mut self, name: &str);
fn cancel_timers(&mut self);

fn reset(&mut self);
}

/// A static test clock.
Expand Down Expand Up @@ -374,6 +376,13 @@ impl Clock for TestClock {
}
self.timers = BTreeMap::new();
}

fn reset(&mut self) {
self.time = AtomicTime::new(false, UnixNanos::default());
self.timers = BTreeMap::new();
self.heap = BinaryHeap::new();
self.callbacks = HashMap::new();
}
}

/// A real-time clock which uses system time.
Expand Down Expand Up @@ -630,6 +639,12 @@ impl Clock for LiveClock {
}
self.timers.clear();
}

fn reset(&mut self) {
self.timers = HashMap::new();
self.heap = Arc::new(Mutex::new(BinaryHeap::new()));
self.callbacks = HashMap::new();
}
}

// Helper struct to stream events from the heap
Expand Down
1 change: 0 additions & 1 deletion crates/common/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,6 @@
//! - `ffi`: Enables the C foreign function interface (FFI) from `cbindgen`.
//! - `python`: Enables Python bindings from `pyo3`.
//! - `stubs`: Enables type stubs for use in testing scenarios.

#![warn(rustc::all)]
#![deny(nonstandard_style)]
#![deny(rustdoc::broken_intra_doc_links)]
Expand Down
Loading
Loading