Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ jobs:
- uses: Swatinem/rust-cache@v2
- run: rustup show active-toolchain -v
- run: cargo build --all-targets --all-features
- run: RUSTFLAGS='--cfg tokio_unstable' cargo build --all-targets --all-features

msrv:
runs-on: ubuntu-latest
Expand Down
3 changes: 3 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,9 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
<!-- next-header -->

## [Unreleased] - ReleaseDate
### Added
- core: integration with `tokio-console` if `cfg(tokio_unstable)` is set.
- core/task: expose unstable `task::spawn_blocking` and `task::Builder`.

## [0.2.0-alpha.20] - 2025-09-11
### Added
Expand Down
3 changes: 3 additions & 0 deletions elfo-core/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,9 @@ unicycle = "0.10.2"
rmp-serde = { version = "1.1.0", optional = true }
humantime-serde = "1"

[target.'cfg(tokio_unstable)'.dependencies]
tokio = { workspace = true, features = ["tracing"] }

[dev-dependencies]
elfo-utils = { version = "0.2.7", path = "../elfo-utils", features = ["test-util"] }

Expand Down
1 change: 1 addition & 0 deletions elfo-core/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ pub mod signal;
pub mod stream;
#[cfg(feature = "unstable-stuck-detection")]
pub mod stuck_detection;
pub mod task;
pub mod time;
pub mod topology;
pub mod tracing;
Expand Down
10 changes: 8 additions & 2 deletions elfo-core/src/supervisor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -410,15 +410,21 @@ where

entry.insert(Object::new(addr, actor));

let scope = Scope::new(scope::trace_id(), addr, meta, self.scope_shared.clone())
let group_scope = self.scope_shared.clone();
let scope = Scope::new(scope::trace_id(), addr, meta.clone(), group_scope)
.with_telemetry(&system_config.telemetry);

#[cfg(feature = "unstable-stuck-detection")]
let fut = MeasurePoll::new(fut.instrument(span), self.rt_manager.stuck_detector());
#[cfg(not(feature = "unstable-stuck-detection"))]
let fut = MeasurePoll::new(fut.instrument(span));

rt.spawn(scope.within(fut));
// Finally, start the actor's task.
crate::task::Builder::new(scope)
.spawn_on(fut, &rt)
.expect("spawn an actor's task");

// Register the actor in the book to make it reachable.
let object = self.context.book().get_owned(addr).expect("just created");
Some(object)
}
Expand Down
120 changes: 120 additions & 0 deletions elfo-core/src/task.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,120 @@
#![doc(hidden)] // TODO: "partially" unstable for now

use std::{future::Future, io};

use tokio::{runtime::Handle, task::JoinHandle};

use crate::scope::{self, Scope};

/// A factory which is used to configure the properties of a new task.
///
/// This is a stable wrapper over [`tokio::task::Builder`].
///
/// Features:
/// * Spawning tasks inside a given scope (or the current scope by default). It
/// means that logs/metrics/dumps will be associated with specific actor, that
/// is especially useful for blocking tasks (e.g. for I/O operations).
/// * If compiled with the `tokio_unstable` feature, it will set the task name
/// in order to make it easier to identify tasks in `tokio-console`.
///
/// Note: this is an unstable API.
///
/// [`tokio::task::Builder`]: https://docs.rs/tokio/latest/tokio/task/struct.Builder.html
pub struct Builder {
scope: Scope,
}

impl Default for Builder {
fn default() -> Self {
Self::new(scope::expose())
}
}

// NOTE: all spawning methods should be marked with the `track_caller` attribute
// to show better locations in `tokio-console` listings.

impl Builder {
/// Creates a new task builder with the given scope instead of the current
/// one used by default in `Builder::default()`.
pub fn new(scope: Scope) -> Self {
Self { scope }
}

/// Spawns a task with this builder’s settings on the current runtime.
///
/// # Panics
///
/// Panics if not called from the context of a Tokio runtime.
#[track_caller]
pub fn spawn<F>(self, f: F) -> io::Result<JoinHandle<F::Output>>
where
F: Future + Send + 'static,
F::Output: Send + 'static,
{
self.spawn_on(f, &Handle::current())
}

// Spawns a task with this builder’s settings on the provided runtime handle.
#[track_caller]
pub fn spawn_on<F>(self, f: F, handle: &Handle) -> io::Result<JoinHandle<F::Output>>
where
F: Future + Send + 'static,
F::Output: Send + 'static,
{
#[cfg(not(tokio_unstable))]
return Ok(handle.spawn(self.scope.within(f)));

#[cfg(tokio_unstable)]
tokio::task::Builder::new()
.name(&self.scope.meta().to_string())
.spawn_on(self.scope.within(f), handle)
}

/// Spawns blocking code on the current runtime’s blocking threadpool.
///
/// # Panics
///
/// Panics if not called from the context of a Tokio runtime.
#[track_caller]
pub fn spawn_blocking<F, R>(self, f: F) -> io::Result<JoinHandle<R>>
where
F: FnOnce() -> R + Send + 'static,
R: Send + 'static,
{
self.spawn_blocking_on(f, &Handle::current())
}

/// Spawns blocking code on the provided runtime’s blocking threadpool.
#[track_caller]
pub fn spawn_blocking_on<F, R>(self, f: F, handle: &Handle) -> io::Result<JoinHandle<R>>
where
F: FnOnce() -> R + Send + 'static,
R: Send + 'static,
{
#[cfg(not(tokio_unstable))]
return Ok(handle.spawn_blocking(|| self.scope.sync_within(f)));

#[cfg(tokio_unstable)]
tokio::task::Builder::new()
.name(&self.scope.meta().to_string())
.spawn_blocking_on(|| self.scope.sync_within(f), handle)
}
}

/// Spawns blocking code on the current runtime’s blocking threadpool.
///
/// See `Builder` for more details.
///
/// # Panics
///
/// Panics if not called from the context of a Tokio runtime.
#[track_caller]
pub fn spawn_blocking<F, R>(f: F) -> JoinHandle<R>
where
F: FnOnce() -> R + Send + 'static,
R: Send + 'static,
{
Builder::default()
.spawn_blocking(f)
.expect("spawn a blocking task")
}
4 changes: 1 addition & 3 deletions elfo-dumper/src/actor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@ use std::{
use eyre::{Result, WrapErr};
use fxhash::FxHashSet;
use parking_lot::Mutex;
use tokio::task;
use tracing::{error, info};

use elfo_core::{
Expand Down Expand Up @@ -196,8 +195,7 @@ impl Dumper {
};

// Run the background task and wait until it's completed.
let scope = scope::expose();
match task::spawn_blocking(|| scope.sync_within(background)).await {
match elfo_core::task::spawn_blocking(background).await {
Ok(Ok(state)) => {
serializer = state.0;
rule_set = state.1;
Expand Down
3 changes: 2 additions & 1 deletion examples/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,8 @@ rust-version.workspace = true
elfo = { path = "../elfo", features = ["full"] }
elfo-telemeter = { path = "../elfo-telemeter" } # for `AllocatorStats`

tokio = { workspace = true, features = ["full"] }
tokio = { workspace = true, features = ["full", "tracing"] }
console-subscriber = "0.4.1"
metrics.workspace = true
toml.workspace = true
derive_more.workspace = true
Expand Down
7 changes: 4 additions & 3 deletions examples/usage/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -271,15 +271,16 @@ fn topology() -> elfo::Topology {
// * `RUST_LOG=info,[{actor_group=aggregators}]`
//
// However, it's more useful to control logging in the config file.
let logger = elfo::batteries::logger::init();
console_subscriber::init();
// let logger = elfo::batteries::logger::init();
// Setup up telemetry (based on the `metrics` crate).
let telemeter = elfo::batteries::telemeter::init();

// Define actor groups.
let producers = topology.local("producers");
let aggregators = topology.local("aggregators");
let reporters = topology.local("reporters");
let loggers = topology.local("system.loggers");
// let loggers = topology.local("system.loggers");
let telemeters = topology.local("system.telemeters");
let dumpers = topology.local("system.dumpers");
let pingers = topology.local("system.pingers");
Expand All @@ -295,7 +296,7 @@ fn topology() -> elfo::Topology {
producers.mount(producer::new());
aggregators.mount(aggregator::new());
reporters.mount(reporter::new());
loggers.mount(logger);
// loggers.mount(logger);
telemeters.mount(telemeter);
dumpers.mount(elfo::batteries::dumper::new());
pingers.mount(elfo::batteries::pinger::new(&topology));
Expand Down