Skip to content

Commit badfe9d

Browse files
committed
feat: Capture errors from connecting to broker
1 parent bd45435 commit badfe9d

File tree

2 files changed

+33
-5
lines changed

2 files changed

+33
-5
lines changed

src/connection.rs

Lines changed: 24 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
use async_trait::async_trait;
22
use rand::prelude::*;
3+
use std::fmt::Display;
34
use std::ops::ControlFlow;
45
use std::sync::Arc;
56
use thiserror::Error;
@@ -53,6 +54,26 @@ pub enum Error {
5354

5455
pub type Result<T, E = Error> = std::result::Result<T, E>;
5556

57+
#[derive(Debug, Error)]
58+
pub struct MultiError(Vec<Box<dyn std::error::Error + Send + Sync>>);
59+
60+
impl Display for MultiError {
61+
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
62+
let mut needs_comma = false;
63+
if self.0.len() > 1 {
64+
write!(f, "Multiple errors occured: ")?;
65+
}
66+
for err in &self.0 {
67+
if needs_comma {
68+
write!(f, ", ")?;
69+
}
70+
needs_comma = true;
71+
write!(f, "{err}")?;
72+
}
73+
Ok(())
74+
}
75+
}
76+
5677
/// How to connect to a `Transport`
5778
#[async_trait]
5879
trait ConnectionHandler {
@@ -470,6 +491,7 @@ where
470491
let mut backoff = Backoff::new(backoff_config);
471492
backoff
472493
.retry_with_backoff("broker_connect", || async {
494+
let mut errors = Vec::<Box<dyn std::error::Error + Send + Sync>>::new();
473495
for broker in &brokers {
474496
let conn = broker
475497
.connect(
@@ -485,16 +507,14 @@ where
485507
Ok(transport) => transport,
486508
Err(e) => {
487509
warn!(%e, "Failed to connect to broker");
510+
errors.push(Box::new(e));
488511
continue;
489512
}
490513
};
491514

492515
return ControlFlow::Break(connection);
493516
}
494-
495-
let err = Box::<dyn std::error::Error + Send + Sync>::from(
496-
"Failed to connect to any broker, backing off".to_string(),
497-
);
517+
let err = Box::<dyn std::error::Error + Send + Sync>::from(MultiError(errors));
498518
let err: Arc<dyn std::error::Error + Send + Sync> = err.into();
499519
ControlFlow::Continue(ErrorOrThrottle::Error(err))
500520
})

tests/client.rs

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -721,7 +721,15 @@ async fn test_client_backoff_terminates() {
721721

722722
match client_builder.build().await {
723723
Err(rskafka::client::error::Error::Connection(e)) => {
724-
assert_eq!(e.to_string(), "all retries failed: Retry exceeded deadline");
724+
// Error can be slightly different depending on the exact underlying error.
725+
assert!(
726+
e.to_string().starts_with(concat!(
727+
"all retries failed: Retry exceeded deadline. ",
728+
"Source: error connecting to broker \"localhost:9000\""
729+
)),
730+
"expected error to start with \"all retries failed...\", actual: {}",
731+
e
732+
);
725733
}
726734
_ => {
727735
unreachable!();

0 commit comments

Comments
 (0)