Skip to content

Commit f4e50d6

Browse files
committed
Capture errors from connecting to broker
1 parent 1eba7e3 commit f4e50d6

File tree

2 files changed

+17
-4
lines changed

2 files changed

+17
-4
lines changed

src/connection.rs

Lines changed: 12 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -470,6 +470,7 @@ where
470470
let mut backoff = Backoff::new(backoff_config);
471471
backoff
472472
.retry_with_backoff("broker_connect", || async {
473+
let mut errors = Vec::new();
473474
for broker in &brokers {
474475
let conn = broker
475476
.connect(
@@ -485,16 +486,24 @@ where
485486
Ok(transport) => transport,
486487
Err(e) => {
487488
warn!(%e, "Failed to connect to broker");
489+
errors.push(e);
488490
continue;
489491
}
490492
};
491493

492494
return ControlFlow::Break(connection);
493495
}
494496

495-
let err = Box::<dyn std::error::Error + Send + Sync>::from(
496-
"Failed to connect to any broker, backing off".to_string(),
497-
);
497+
// errors should always contain at least 1 element here.
498+
let errors_string = errors
499+
.into_iter()
500+
.map(|e| e.to_string())
501+
.collect::<Vec<String>>()
502+
.join(", ");
503+
504+
let err = Box::<dyn std::error::Error + Send + Sync>::from(format!(
505+
"Failed to connect to any broker, backing off. Errors: {errors_string}"
506+
));
498507
let err: Arc<dyn std::error::Error + Send + Sync> = err.into();
499508
ControlFlow::Continue(ErrorOrThrottle::Error(err))
500509
})

tests/client.rs

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -721,7 +721,11 @@ async fn test_client_backoff_terminates() {
721721

722722
match client_builder.build().await {
723723
Err(rskafka::client::error::Error::Connection(e)) => {
724-
assert_eq!(e.to_string(), "all retries failed: Retry exceeded deadline");
724+
assert_eq!(e.to_string(), concat!(
725+
"all retries failed: Retry exceeded deadline. ",
726+
"Source: Failed to connect to any broker, backing off. ",
727+
"Errors: error connecting to broker \"localhost:9000\": IO Error: Connection refused (os error 111)"
728+
));
725729
}
726730
_ => {
727731
unreachable!();

0 commit comments

Comments
 (0)