Skip to content

Commit b612f08

Browse files
committed
feat: support pre-warming PartitionClient
Adds a refresh_leader() method to force the PartitionClient through the discovery process. This will ensure a cached connection to the leader is (likely) already established by the time the first produce() call is issued, preventing the first produce() calls (multiple of which may be queued up on the discovery mutex) from experiencing excessively high latency.
1 parent f0eaedf commit b612f08

File tree

1 file changed

+11
-0
lines changed

1 file changed

+11
-0
lines changed

src/client/partition.rs

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -115,6 +115,17 @@ impl PartitionClient {
115115
self.partition
116116
}
117117

118+
/// Re-run the leader discovery process for this client, and establish a
119+
/// connection to the leader.
120+
pub async fn refresh_leader(&self) -> Result<()> {
121+
// Remove the current broker connection, if any.
122+
*self.current_broker.lock().await = None;
123+
// And acquire a new one, forcing discovery and establishing a cached
124+
// connection to the leader.
125+
self.get().await?;
126+
Ok(())
127+
}
128+
118129
/// Produce a batch of records to the partition
119130
pub async fn produce(
120131
&self,

0 commit comments

Comments
 (0)