Skip to content

Commit e060f60

Browse files
committed
refactor: retry failed leader discovery
Adds a retry loop (w/ backoff) over the leader discover when initialising a PartitionClient.
1 parent 387a96d commit e060f60

File tree

2 files changed

+30
-20
lines changed

2 files changed

+30
-20
lines changed

src/client/mod.rs

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -121,9 +121,7 @@ impl Client {
121121
topic: impl Into<String> + Send,
122122
partition: i32,
123123
) -> Result<PartitionClient> {
124-
let client = PartitionClient::new(topic.into(), partition, Arc::clone(&self.brokers));
125-
client.refresh_leader().await?;
126-
Ok(client)
124+
PartitionClient::new(topic.into(), partition, Arc::clone(&self.brokers)).await
127125
}
128126

129127
/// Returns a list of topics in the cluster

src/client/partition.rs

Lines changed: 29 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -22,8 +22,10 @@ use crate::{
2222
validation::ExactlyOne,
2323
};
2424
use async_trait::async_trait;
25-
use std::ops::{ControlFlow, Deref, Range};
26-
use std::sync::Arc;
25+
use std::{
26+
ops::{ControlFlow, Deref, Range},
27+
sync::Arc,
28+
};
2729
use time::OffsetDateTime;
2830
use tokio::sync::Mutex;
2931
use tracing::{error, info};
@@ -95,14 +97,35 @@ impl std::fmt::Debug for PartitionClient {
9597
}
9698

9799
impl PartitionClient {
98-
pub(super) fn new(topic: String, partition: i32, brokers: Arc<BrokerConnector>) -> Self {
99-
Self {
100+
pub(super) async fn new(
101+
topic: String,
102+
partition: i32,
103+
brokers: Arc<BrokerConnector>,
104+
) -> Result<Self> {
105+
let p = Self {
100106
topic,
101107
partition,
102-
brokers,
108+
brokers: Arc::clone(&brokers),
103109
backoff_config: Default::default(),
104110
current_broker: Mutex::new(None),
105-
}
111+
};
112+
113+
// Force discover and establish a cached connection to the leader
114+
let scope = &p;
115+
maybe_retry(
116+
&Default::default(),
117+
&*brokers,
118+
"leader_detection",
119+
|| async move {
120+
scope
121+
.get_leader(MetadataLookupMode::CachedArbitrary)
122+
.await?;
123+
Ok(())
124+
},
125+
)
126+
.await?;
127+
128+
Ok(p)
106129
}
107130

108131
/// Topic
@@ -115,17 +138,6 @@ impl PartitionClient {
115138
self.partition
116139
}
117140

118-
/// Re-run the leader discovery process for this client, and establish a
119-
/// connection to the leader.
120-
pub(crate) async fn refresh_leader(&self) -> Result<()> {
121-
// Remove the current broker connection, if any.
122-
*self.current_broker.lock().await = None;
123-
// And acquire a new one, forcing discovery and establishing a cached
124-
// connection to the leader.
125-
self.get().await?;
126-
Ok(())
127-
}
128-
129141
/// Produce a batch of records to the partition
130142
pub async fn produce(
131143
&self,

0 commit comments

Comments
 (0)