Skip to content

Commit a12992f

Browse files
committed
chore: apply suggestions
Signed-off-by: WenyXu <[email protected]>
1 parent bc582e9 commit a12992f

File tree

7 files changed

+70
-23
lines changed

7 files changed

+70
-23
lines changed

src/client/controller.rs

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -71,7 +71,8 @@ impl ControllerClient {
7171
let response = broker
7272
.request(request)
7373
.await
74-
.map_err(|e| ErrorOrThrottle::Error((e.into(), Some(r#gen))))?;
74+
.map_err(|e| ErrorOrThrottle::Error((e.into(), Some(r#gen))))?
75+
.response;
7576

7677
maybe_throttle(response.throttle_time_ms)?;
7778

@@ -122,7 +123,8 @@ impl ControllerClient {
122123
let response = broker
123124
.request(request)
124125
.await
125-
.map_err(|e| ErrorOrThrottle::Error((e.into(), Some(r#gen))))?;
126+
.map_err(|e| ErrorOrThrottle::Error((e.into(), Some(r#gen))))?
127+
.response;
126128

127129
maybe_throttle(response.throttle_time_ms)?;
128130

src/client/partition.rs

Lines changed: 7 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -222,7 +222,7 @@ impl PartitionClient {
222222
response,
223223
encoded_request_size,
224224
} = broker
225-
.request_with_metadata(&request)
225+
.request(&request)
226226
.await
227227
.map_err(|e| ErrorOrThrottle::Error((e.into(), Some(r#gen))))?;
228228
maybe_throttle(response.throttle_time_ms)?;
@@ -267,7 +267,8 @@ impl PartitionClient {
267267
let response = broker
268268
.request(&request)
269269
.await
270-
.map_err(|e| ErrorOrThrottle::Error((e.into(), Some(r#gen))))?;
270+
.map_err(|e| ErrorOrThrottle::Error((e.into(), Some(r#gen))))?
271+
.response;
271272
maybe_throttle(response.throttle_time_ms)?;
272273
process_fetch_response(self.partition, &self.topic, response, offset)
273274
.map_err(|e| ErrorOrThrottle::Error((e, Some(r#gen))))
@@ -303,7 +304,8 @@ impl PartitionClient {
303304
let response = broker
304305
.request(&request)
305306
.await
306-
.map_err(|e| ErrorOrThrottle::Error((e.into(), Some(r#gen))))?;
307+
.map_err(|e| ErrorOrThrottle::Error((e.into(), Some(r#gen))))?
308+
.response;
307309
maybe_throttle(response.throttle_time_ms)?;
308310
process_list_offsets_response(self.partition, &self.topic, response)
309311
.map_err(|e| ErrorOrThrottle::Error((e, Some(r#gen))))
@@ -336,7 +338,8 @@ impl PartitionClient {
336338
let response = broker
337339
.request(&request)
338340
.await
339-
.map_err(|e| ErrorOrThrottle::Error((e.into(), Some(r#gen))))?;
341+
.map_err(|e| ErrorOrThrottle::Error((e.into(), Some(r#gen))))?
342+
.response;
340343
maybe_throttle(Some(response.throttle_time_ms))?;
341344
process_delete_records_response(&self.topic, self.partition, response)
342345
.map_err(|e| ErrorOrThrottle::Error((e, Some(r#gen))))

src/client/producer.rs

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -300,8 +300,9 @@ impl BatchProducerBuilder {
300300
}
301301
}
302302

303-
/// The result of the produce call.
303+
/// The result of the [produce](ProducerClient::produce) call.
304304
#[derive(Debug, Default)]
305+
#[non_exhaustive]
305306
pub struct ProduceResult {
306307
/// The offsets of the produced records.
307308
pub offsets: Vec<i64>,

src/connection.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -392,7 +392,7 @@ impl RequestHandler for MessengerTransport {
392392
&self,
393393
request_params: &MetadataRequest,
394394
) -> Result<MetadataResponse, RequestError> {
395-
self.request(request_params).await
395+
self.request(request_params).await.map(|r| r.response)
396396
}
397397
}
398398

src/messenger.rs

Lines changed: 5 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -317,17 +317,7 @@ where
317317
self.version_ranges = ranges;
318318
}
319319

320-
pub async fn request<R>(&self, msg: R) -> Result<R::ResponseBody, RequestError>
321-
where
322-
R: RequestBody + Send + WriteVersionedType<Vec<u8>>,
323-
R::ResponseBody: ReadVersionedType<Cursor<Vec<u8>>>,
324-
{
325-
self.request_with_version_ranges(msg, &self.version_ranges)
326-
.await
327-
.map(|body| body.response)
328-
}
329-
330-
pub async fn request_with_metadata<R>(
320+
pub async fn request<R>(
331321
&self,
332322
msg: R,
333323
) -> Result<ResponseBodyWithMetadata<R::ResponseBody>, RequestError>
@@ -580,7 +570,7 @@ where
580570
auth_bytes: Vec<u8>,
581571
) -> Result<SaslAuthenticateResponse, SaslError> {
582572
let req = SaslAuthenticateRequest::new(auth_bytes);
583-
let resp = self.request(req).await?;
573+
let resp = self.request(req).await?.response;
584574
if let Some(err) = resp.error_code {
585575
if let Some(s) = resp.error_message.0 {
586576
debug!("Sasl auth error message: {s}");
@@ -593,7 +583,7 @@ where
593583

594584
async fn sasl_handshake(&self, mechanism: &str) -> Result<SaslHandshakeResponse, SaslError> {
595585
let req = SaslHandshakeRequest::new(mechanism);
596-
let resp = self.request(req).await?;
586+
let resp = self.request(req).await?.response;
597587
if let Some(err) = resp.error_code {
598588
return Err(SaslError::ApiError(err));
599589
}
@@ -1204,7 +1194,8 @@ mod tests {
12041194
tagged_fields: Some(TaggedFields::default()),
12051195
})
12061196
.await
1207-
.unwrap();
1197+
.unwrap()
1198+
.response;
12081199
assert_eq!(actual, resp);
12091200
}
12101201

src/protocol/messages/mod.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -86,7 +86,7 @@ impl<W: Write, T: WriteVersionedType<W>> WriteVersionedType<W> for &T {
8686
}
8787
}
8888

89-
/// A response body with metadata about the request.
89+
/// A response body with metadata about the request & response.
9090
#[derive(Debug)]
9191
pub struct ResponseBodyWithMetadata<R> {
9292
/// The response body.

tests/client.rs

Lines changed: 50 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@ use rskafka::{
66
ClientBuilder,
77
error::{Error as ClientError, ProtocolError, ServerErrorResponse},
88
partition::{Compression, OffsetAt, UnknownTopicHandling},
9+
producer::ProduceResult,
910
},
1011
record::{Record, RecordAndOffset},
1112
};
@@ -390,6 +391,55 @@ async fn test_consume_offset_out_of_range() {
390391
);
391392
}
392393

394+
#[tokio::test]
395+
async fn test_produce_metadata() {
396+
maybe_start_logging();
397+
398+
let test_cfg = maybe_skip_kafka_integration!();
399+
let topic_name = random_topic_name();
400+
let n_partitions = 1;
401+
402+
let client = ClientBuilder::new(test_cfg.bootstrap_brokers.clone())
403+
.build()
404+
.await
405+
.unwrap();
406+
let controller_client = client.controller_client().unwrap();
407+
controller_client
408+
.create_topic(&topic_name, n_partitions, 1, 5_000)
409+
.await
410+
.unwrap();
411+
412+
let partition_client = client
413+
.partition_client(topic_name.clone(), 0, UnknownTopicHandling::Retry)
414+
.await
415+
.unwrap();
416+
417+
let record1 = record(b"");
418+
let ProduceResult {
419+
offsets,
420+
encoded_request_size,
421+
..
422+
} = partition_client
423+
.produce(vec![record1.clone()], Compression::NoCompression)
424+
.await
425+
.unwrap();
426+
assert_eq!(offsets[0], 0);
427+
assert_eq!(encoded_request_size, 177);
428+
429+
// Produce a record with repeated string
430+
let record2 = record(b"value value value value value value value");
431+
let ProduceResult {
432+
offsets,
433+
encoded_request_size,
434+
..
435+
} = partition_client
436+
.produce(vec![record2.clone()], Compression::NoCompression)
437+
.await
438+
.unwrap();
439+
assert_eq!(offsets[0], 1);
440+
assert_eq!(encoded_request_size, 219);
441+
}
442+
393443
#[tokio::test]
394444
async fn test_get_offset() {
395445
maybe_start_logging();

0 commit comments

Comments
 (0)