Skip to content

Commit 27797c4

Browse files
committed
refactor: retry failed leader discovery
Adds a retry loop (w/ backoff) over the leader discover when initialising a PartitionClient.
1 parent 387a96d commit 27797c4

File tree

2 files changed

+30
-19
lines changed

2 files changed

+30
-19
lines changed

src/client/mod.rs

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -121,9 +121,7 @@ impl Client {
121121
topic: impl Into<String> + Send,
122122
partition: i32,
123123
) -> Result<PartitionClient> {
124-
let client = PartitionClient::new(topic.into(), partition, Arc::clone(&self.brokers));
125-
client.refresh_leader().await?;
126-
Ok(client)
124+
PartitionClient::new(topic.into(), partition, Arc::clone(&self.brokers)).await
127125
}
128126

129127
/// Returns a list of topics in the cluster

src/client/partition.rs

Lines changed: 29 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -22,8 +22,11 @@ use crate::{
2222
validation::ExactlyOne,
2323
};
2424
use async_trait::async_trait;
25-
use std::ops::{ControlFlow, Deref, Range};
2625
use std::sync::Arc;
26+
use std::{
27+
ops::{ControlFlow, Deref, Range},
28+
sync::Arc,
29+
};
2730
use time::OffsetDateTime;
2831
use tokio::sync::Mutex;
2932
use tracing::{error, info};
@@ -95,14 +98,35 @@ impl std::fmt::Debug for PartitionClient {
9598
}
9699

97100
impl PartitionClient {
98-
pub(super) fn new(topic: String, partition: i32, brokers: Arc<BrokerConnector>) -> Self {
99-
Self {
101+
pub(super) async fn new(
102+
topic: String,
103+
partition: i32,
104+
brokers: Arc<BrokerConnector>,
105+
) -> Result<Self> {
106+
let p = Self {
100107
topic,
101108
partition,
102-
brokers,
109+
brokers: Arc::clone(&brokers),
103110
backoff_config: Default::default(),
104111
current_broker: Mutex::new(None),
105-
}
112+
};
113+
114+
// Force discover and establish a cached connection to the leader
115+
let scope = &p;
116+
maybe_retry(
117+
&Default::default(),
118+
&*brokers,
119+
"leader_detection",
120+
|| async move {
121+
scope
122+
.get_leader(MetadataLookupMode::CachedArbitrary)
123+
.await?;
124+
Ok(())
125+
},
126+
)
127+
.await?;
128+
129+
Ok(p)
106130
}
107131

108132
/// Topic
@@ -115,17 +139,6 @@ impl PartitionClient {
115139
self.partition
116140
}
117141

118-
/// Re-run the leader discovery process for this client, and establish a
119-
/// connection to the leader.
120-
pub(crate) async fn refresh_leader(&self) -> Result<()> {
121-
// Remove the current broker connection, if any.
122-
*self.current_broker.lock().await = None;
123-
// And acquire a new one, forcing discovery and establishing a cached
124-
// connection to the leader.
125-
self.get().await?;
126-
Ok(())
127-
}
128-
129142
/// Produce a batch of records to the partition
130143
pub async fn produce(
131144
&self,

0 commit comments

Comments
 (0)