Skip to content

Commit cb9195a

Browse files
authored
Merge pull request #165 from influxdata/dom/pre-warm
feat: support pre-warming PartitionClient
2 parents 4f13b67 + e060f60 commit cb9195a

File tree

7 files changed

+62
-32
lines changed

7 files changed

+62
-32
lines changed

benches/throughput.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -448,7 +448,7 @@ async fn setup_rskafka(connection: Vec<String>) -> PartitionClient {
448448
.await
449449
.unwrap();
450450

451-
client.partition_client(topic_name, 0).unwrap()
451+
client.partition_client(topic_name, 0).await.unwrap()
452452
}
453453

454454
static LOG_SETUP: Once = Once::new();

src/client/mod.rs

Lines changed: 2 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -116,16 +116,12 @@ impl Client {
116116
}
117117

118118
/// Returns a client for performing operations on a specific partition
119-
pub fn partition_client(
119+
pub async fn partition_client(
120120
&self,
121121
topic: impl Into<String> + Send,
122122
partition: i32,
123123
) -> Result<PartitionClient> {
124-
Ok(PartitionClient::new(
125-
topic.into(),
126-
partition,
127-
Arc::clone(&self.brokers),
128-
))
124+
PartitionClient::new(topic.into(), partition, Arc::clone(&self.brokers)).await
129125
}
130126

131127
/// Returns a list of topics in the cluster

src/client/partition.rs

Lines changed: 29 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -22,8 +22,10 @@ use crate::{
2222
validation::ExactlyOne,
2323
};
2424
use async_trait::async_trait;
25-
use std::ops::{ControlFlow, Deref, Range};
26-
use std::sync::Arc;
25+
use std::{
26+
ops::{ControlFlow, Deref, Range},
27+
sync::Arc,
28+
};
2729
use time::OffsetDateTime;
2830
use tokio::sync::Mutex;
2931
use tracing::{error, info};
@@ -95,14 +97,35 @@ impl std::fmt::Debug for PartitionClient {
9597
}
9698

9799
impl PartitionClient {
98-
pub(super) fn new(topic: String, partition: i32, brokers: Arc<BrokerConnector>) -> Self {
99-
Self {
100+
pub(super) async fn new(
101+
topic: String,
102+
partition: i32,
103+
brokers: Arc<BrokerConnector>,
104+
) -> Result<Self> {
105+
let p = Self {
100106
topic,
101107
partition,
102-
brokers,
108+
brokers: Arc::clone(&brokers),
103109
backoff_config: Default::default(),
104110
current_broker: Mutex::new(None),
105-
}
111+
};
112+
113+
// Force discover and establish a cached connection to the leader
114+
let scope = &p;
115+
maybe_retry(
116+
&Default::default(),
117+
&*brokers,
118+
"leader_detection",
119+
|| async move {
120+
scope
121+
.get_leader(MetadataLookupMode::CachedArbitrary)
122+
.await?;
123+
Ok(())
124+
},
125+
)
126+
.await?;
127+
128+
Ok(p)
106129
}
107130

108131
/// Topic

tests/client.rs

Lines changed: 15 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -88,7 +88,10 @@ async fn test_partition_client() {
8888
.await
8989
.unwrap();
9090

91-
let partition_client = client.partition_client(topic_name.clone(), 0).unwrap();
91+
let partition_client = client
92+
.partition_client(topic_name.clone(), 0)
93+
.await
94+
.unwrap();
9295
assert_eq!(partition_client.topic(), &topic_name);
9396
assert_eq!(partition_client.partition(), 0);
9497
}
@@ -162,7 +165,7 @@ async fn test_socks5() {
162165
.await
163166
.unwrap();
164167

165-
let partition_client = client.partition_client(topic_name, 0).unwrap();
168+
let partition_client = client.partition_client(topic_name, 0).await.unwrap();
166169

167170
let record = record(b"");
168171
partition_client
@@ -194,7 +197,7 @@ async fn test_produce_empty() {
194197
.await
195198
.unwrap();
196199

197-
let partition_client = client.partition_client(&topic_name, 1).unwrap();
200+
let partition_client = client.partition_client(&topic_name, 1).await.unwrap();
198201
partition_client
199202
.produce(vec![], Compression::NoCompression)
200203
.await
@@ -216,7 +219,7 @@ async fn test_consume_empty() {
216219
.await
217220
.unwrap();
218221

219-
let partition_client = client.partition_client(&topic_name, 1).unwrap();
222+
let partition_client = client.partition_client(&topic_name, 1).await.unwrap();
220223
let (records, watermark) = partition_client
221224
.fetch_records(0, 1..10_000, 1_000)
222225
.await
@@ -240,7 +243,7 @@ async fn test_consume_offset_out_of_range() {
240243
.await
241244
.unwrap();
242245

243-
let partition_client = client.partition_client(&topic_name, 1).unwrap();
246+
let partition_client = client.partition_client(&topic_name, 1).await.unwrap();
244247
let record = record(b"");
245248
let offsets = partition_client
246249
.produce(vec![record], Compression::NoCompression)
@@ -279,7 +282,10 @@ async fn test_get_offset() {
279282
.await
280283
.unwrap();
281284

282-
let partition_client = client.partition_client(topic_name.clone(), 0).unwrap();
285+
let partition_client = client
286+
.partition_client(topic_name.clone(), 0)
287+
.await
288+
.unwrap();
283289

284290
assert_eq!(
285291
partition_client
@@ -340,7 +346,7 @@ async fn test_produce_consume_size_cutoff() {
340346
.await
341347
.unwrap();
342348

343-
let partition_client = Arc::new(client.partition_client(&topic_name, 0).unwrap());
349+
let partition_client = Arc::new(client.partition_client(&topic_name, 0).await.unwrap());
344350

345351
let record_1 = large_record();
346352
let record_2 = large_record();
@@ -413,7 +419,7 @@ async fn test_consume_midbatch() {
413419
.await
414420
.unwrap();
415421

416-
let partition_client = client.partition_client(&topic_name, 0).unwrap();
422+
let partition_client = client.partition_client(&topic_name, 0).await.unwrap();
417423

418424
// produce two records into a single batch
419425
let record_1 = record(b"x");
@@ -458,7 +464,7 @@ async fn test_delete_records() {
458464
.await
459465
.unwrap();
460466

461-
let partition_client = client.partition_client(&topic_name, 0).unwrap();
467+
let partition_client = client.partition_client(&topic_name, 0).await.unwrap();
462468

463469
// produce the following record batches:
464470
// - record_1

tests/consumer.rs

Lines changed: 8 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -34,7 +34,7 @@ async fn test_stream_consumer_start_at_0() {
3434

3535
let record = record(b"x");
3636

37-
let partition_client = Arc::new(client.partition_client(&topic, 0).unwrap());
37+
let partition_client = Arc::new(client.partition_client(&topic, 0).await.unwrap());
3838
partition_client
3939
.produce(vec![record.clone()], Compression::NoCompression)
4040
.await
@@ -85,7 +85,7 @@ async fn test_stream_consumer_start_at_1() {
8585
let record_1 = record(b"x");
8686
let record_2 = record(b"y");
8787

88-
let partition_client = Arc::new(client.partition_client(&topic, 0).unwrap());
88+
let partition_client = Arc::new(client.partition_client(&topic, 0).await.unwrap());
8989
partition_client
9090
.produce(
9191
vec![record_1.clone(), record_2.clone()],
@@ -121,7 +121,7 @@ async fn test_stream_consumer_offset_out_of_range() {
121121
.await
122122
.unwrap();
123123

124-
let partition_client = Arc::new(client.partition_client(&topic, 0).unwrap());
124+
let partition_client = Arc::new(client.partition_client(&topic, 0).await.unwrap());
125125

126126
let mut stream = StreamConsumerBuilder::new(partition_client, StartOffset::At(1)).build();
127127

@@ -155,7 +155,7 @@ async fn test_stream_consumer_start_at_earliest() {
155155
let record_1 = record(b"x");
156156
let record_2 = record(b"y");
157157

158-
let partition_client = Arc::new(client.partition_client(&topic, 0).unwrap());
158+
let partition_client = Arc::new(client.partition_client(&topic, 0).await.unwrap());
159159
partition_client
160160
.produce(vec![record_1.clone()], Compression::NoCompression)
161161
.await
@@ -204,7 +204,7 @@ async fn test_stream_consumer_start_at_earliest_empty() {
204204

205205
let record = record(b"x");
206206

207-
let partition_client = Arc::new(client.partition_client(&topic, 0).unwrap());
207+
let partition_client = Arc::new(client.partition_client(&topic, 0).await.unwrap());
208208

209209
let mut stream =
210210
StreamConsumerBuilder::new(Arc::clone(&partition_client), StartOffset::Earliest)
@@ -245,7 +245,7 @@ async fn test_stream_consumer_start_at_earliest_after_deletion() {
245245
let record_1 = record(b"x");
246246
let record_2 = record(b"y");
247247

248-
let partition_client = Arc::new(client.partition_client(&topic, 0).unwrap());
248+
let partition_client = Arc::new(client.partition_client(&topic, 0).await.unwrap());
249249
partition_client
250250
.produce(
251251
vec![record_1.clone(), record_2.clone()],
@@ -287,7 +287,7 @@ async fn test_stream_consumer_start_at_latest() {
287287
let record_1 = record(b"x");
288288
let record_2 = record(b"y");
289289

290-
let partition_client = Arc::new(client.partition_client(&topic, 0).unwrap());
290+
let partition_client = Arc::new(client.partition_client(&topic, 0).await.unwrap());
291291
partition_client
292292
.produce(vec![record_1.clone()], Compression::NoCompression)
293293
.await
@@ -330,7 +330,7 @@ async fn test_stream_consumer_start_at_latest_empty() {
330330

331331
let record = record(b"x");
332332

333-
let partition_client = Arc::new(client.partition_client(&topic, 0).unwrap());
333+
let partition_client = Arc::new(client.partition_client(&topic, 0).await.unwrap());
334334

335335
let mut stream = StreamConsumerBuilder::new(Arc::clone(&partition_client), StartOffset::Latest)
336336
.with_max_wait_ms(50)

tests/produce_consume.rs

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -258,7 +258,12 @@ async fn assert_produce_consume<F1, G1, F2, G2>(
258258
.create_topic(&topic_name, n_partitions, 1, 5_000)
259259
.await
260260
.unwrap();
261-
let partition_client = Arc::new(client.partition_client(topic_name.clone(), 1).unwrap());
261+
let partition_client = Arc::new(
262+
client
263+
.partition_client(topic_name.clone(), 1)
264+
.await
265+
.unwrap(),
266+
);
262267

263268
// timestamps for records. We'll reorder the messages though to ts2, ts1, ts3
264269
let ts1 = now();

tests/producer.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,7 @@ async fn test_batch_producer() {
2525

2626
let record = record(b"");
2727

28-
let partition_client = Arc::new(client.partition_client(&topic, 0).unwrap());
28+
let partition_client = Arc::new(client.partition_client(&topic, 0).await.unwrap());
2929
let producer = BatchProducerBuilder::new(partition_client)
3030
.with_linger(Duration::from_secs(5))
3131
.build(RecordAggregator::new(record.approximate_size() * 2 + 1));

0 commit comments

Comments
 (0)