Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
64 changes: 64 additions & 0 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -260,6 +260,34 @@ impl SyncCallback {
}
}

#[derive(Clone)]
struct SyncCallbackWithOutput(Arc<dyn Fn() -> Option<Option<String>> + Send + Sync>);

impl Debug for SyncCallbackWithOutput {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.write_str("SyncCallbackWithOutput()")
}
}

impl PartialEq for SyncCallbackWithOutput {
fn eq(&self, other: &Self) -> bool {
Arc::ptr_eq(&self.0, &other.0)
}
}

impl SyncCallbackWithOutput {
fn new(
f: impl Fn() -> Option<Option<String>> + Send + Sync + 'static,
) -> SyncCallbackWithOutput {
SyncCallbackWithOutput(Arc::new(f))
}

fn run(&self) -> Option<Option<String>> {
let callback = &self.0;
callback()
}
}

/// Supported tasks.
#[derive(Clone, Debug, PartialEq)]
enum Task {
Expand All @@ -281,6 +309,8 @@ enum Task {
Delay(u64),
/// Call callback function.
Callback(SyncCallback),
/// Call callback function, and pass output to the fail_point expression.
CallbackWithOutput(SyncCallbackWithOutput),
}

#[derive(Debug)]
Expand Down Expand Up @@ -324,6 +354,17 @@ impl Action {
}
}

fn from_callback_with_output(
f: impl Fn() -> Option<Option<String>> + Send + Sync + 'static,
) -> Action {
let task = Task::CallbackWithOutput(SyncCallbackWithOutput::new(f));
Action {
task,
freq: 1.0,
count: None,
}
}

fn get_task(&self) -> Option<Task> {
use rand::Rng;

Expand Down Expand Up @@ -508,6 +549,7 @@ impl FailPoint {
Task::Callback(f) => {
f.run();
}
Task::CallbackWithOutput(f) => return f.run(),
}
None
}
Expand Down Expand Up @@ -696,6 +738,28 @@ where
Ok(())
}

/// Configure the actions for a fail point at runtime.
///
/// Each fail point can be configured by a callback. Process will call this callback function
/// when it meet this fail-point.
/// Its output will be used as the expression parameter for the `fail_point!` macro.
///
/// Refer to the `test_callback_with_output` test for more information.
pub fn cfg_callback_with_output<S, F>(name: S, f: F) -> Result<(), String>
where
S: Into<String>,
F: Fn() -> Option<Option<String>> + Send + Sync + 'static,
{
let mut registry = REGISTRY.registry.write().unwrap();
let p = registry
.entry(name.into())
.or_insert_with(|| Arc::new(FailPoint::new()));
let action = Action::from_callback_with_output(f);
let actions = vec![action];
p.set_actions("callback_with_output", actions);
Ok(())
}

/// Remove a fail point.
///
/// If the fail point doesn't exist, nothing will happen.
Expand Down
64 changes: 64 additions & 0 deletions tests/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -160,6 +160,70 @@ fn test_callback() {
assert_eq!(2, counter.load(Ordering::SeqCst));
}

#[test]
#[cfg_attr(not(feature = "failpoints"), ignore)]
fn test_callback_with_output() {
let return_42 = || {
fail_point!("cb_with_output", |output_from_callback: Option<String>| {
if let Some(output) = output_from_callback {
let as_i32 = output.parse().expect("invalid integer");
// return the number that was passed
as_i32
} else {
// No output was passed by the callback,
// return an arbitrary number
84
}
});
// default function behavior
42
};

let counter = Arc::new(AtomicUsize::new(0));
let counter2 = counter.clone();

// We haven't set up the callback yet,
// the function will behave as usual
assert_eq!(42, return_42());

// Configure the callback to return the counter,
// Only if counter can be divided by two
fail::cfg_callback_with_output("cb_with_output", move || {
let prev = counter2.fetch_add(1, Ordering::SeqCst);

if prev == 0 {
// First call, we decide to not return anything
return None;
}

if prev % 2 == 0 {
Some(Some(prev.to_string()))
} else {
Some(None)
}
})
.unwrap();

// Fist call via the callback,
// The callback must have passed `None`
// to the fail point,
// Thus not triggering it.
assert_eq!(42, return_42());
// Second call via the callback,
// which returned Some(None)
// We entered the "arbitrary number" branch
assert_eq!(84, return_42());
// Third call via the callback,
// which returned Some(Some("2".to_string()))
// We entered the "return passed number" branch
assert_eq!(2, return_42());

// The callback has always been called,
// it was responsible for determining the parameter
// passed to the failpoint
assert_eq!(3, counter.load(Ordering::SeqCst));
}

#[test]
#[cfg_attr(not(feature = "failpoints"), ignore)]
fn test_delay() {
Expand Down