Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 1 addition & 7 deletions src/backoff.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,13 +28,10 @@ impl Default for BackoffConfig {

type SourceError = Box<dyn std::error::Error + Send + Sync>;

// TODO: Currently, retrying can't fail, but there should be a global maximum timeout that
// causes an error if the total time retrying exceeds that amount.
// See https://github.com/influxdata/rskafka/issues/65
#[derive(Debug, thiserror::Error)]
#[allow(missing_copy_implementations)]
pub enum BackoffError {
#[error("Retry exceeded deadline")]
#[error("Retry exceeded deadline. Source: {source}")]
DeadlineExceded {
deadline: Duration,
source: SourceError,
Expand Down Expand Up @@ -103,9 +100,6 @@ impl Backoff {
}

/// Perform an async operation that retries with a backoff
// TODO: Currently, this can't fail, but there should be a global maximum timeout that
// causes an error if the total time retrying exceeds that amount.
// See https://github.com/influxdata/rskafka/issues/65
pub async fn retry_with_backoff<F, F1, B, E>(
&mut self,
request_name: &str,
Expand Down
28 changes: 24 additions & 4 deletions src/connection.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
use async_trait::async_trait;
use rand::prelude::*;
use std::fmt::Display;
use std::ops::ControlFlow;
use std::sync::Arc;
use thiserror::Error;
Expand Down Expand Up @@ -53,6 +54,26 @@ pub enum Error {

pub type Result<T, E = Error> = std::result::Result<T, E>;

#[derive(Debug, Error)]
pub struct MultiError(Vec<Box<dyn std::error::Error + Send + Sync>>);

impl Display for MultiError {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
let mut needs_comma = false;
if self.0.len() > 1 {
write!(f, "Multiple errors occured: ")?;
}
for err in &self.0 {
if needs_comma {
write!(f, ", ")?;
}
needs_comma = true;
write!(f, "{err}")?;
}
Ok(())
}
}

/// How to connect to a `Transport`
#[async_trait]
trait ConnectionHandler {
Expand Down Expand Up @@ -470,6 +491,7 @@ where
let mut backoff = Backoff::new(backoff_config);
backoff
.retry_with_backoff("broker_connect", || async {
let mut errors = Vec::<Box<dyn std::error::Error + Send + Sync>>::new();
for broker in &brokers {
let conn = broker
.connect(
Expand All @@ -485,16 +507,14 @@ where
Ok(transport) => transport,
Err(e) => {
warn!(%e, "Failed to connect to broker");
errors.push(Box::new(e));
continue;
}
};

return ControlFlow::Break(connection);
}

let err = Box::<dyn std::error::Error + Send + Sync>::from(
"Failed to connect to any broker, backing off".to_string(),
);
let err = Box::<dyn std::error::Error + Send + Sync>::from(MultiError(errors));
let err: Arc<dyn std::error::Error + Send + Sync> = err.into();
ControlFlow::Continue(ErrorOrThrottle::Error(err))
})
Expand Down
2 changes: 2 additions & 0 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,8 @@ pub mod client;

mod connection;

pub use connection::Error as ConnectionError;

#[cfg(feature = "unstable-fuzzing")]
pub mod messenger;
#[cfg(not(feature = "unstable-fuzzing"))]
Expand Down
10 changes: 9 additions & 1 deletion tests/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -721,7 +721,15 @@ async fn test_client_backoff_terminates() {

match client_builder.build().await {
Err(rskafka::client::error::Error::Connection(e)) => {
assert_eq!(e.to_string(), "all retries failed: Retry exceeded deadline");
// Error can be slightly different depending on the exact underlying error.
assert!(
e.to_string().starts_with(concat!(
"all retries failed: Retry exceeded deadline. ",
"Source: error connecting to broker \"localhost:9000\""
)),
"expected error to start with \"all retries failed...\", actual: {}",
e
);
}
_ => {
unreachable!();
Expand Down