Skip to content

Commit 39d93c0

Browse files
committed
feat: add ProduceResult with request size metadata
Signed-off-by: WenyXu <[email protected]>
1 parent 1076b3c commit 39d93c0

File tree

5 files changed

+71
-15
lines changed

5 files changed

+71
-15
lines changed

src/client/partition.rs

Lines changed: 16 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,9 @@
11
use crate::{
22
backoff::{Backoff, BackoffConfig, ErrorOrThrottle},
3-
client::error::{Error, RequestContext, Result},
3+
client::{
4+
error::{Error, RequestContext, Result},
5+
producer::ProduceResult,
6+
},
47
connection::{
58
BrokerCache, BrokerCacheGeneration, BrokerConnection, BrokerConnector, MessengerTransport,
69
MetadataLookupMode,
@@ -15,6 +18,7 @@ use crate::{
1518
ListOffsetsRequest, ListOffsetsRequestPartition, ListOffsetsRequestTopic,
1619
ListOffsetsResponse, ListOffsetsResponsePartition, NORMAL_CONSUMER, ProduceRequest,
1720
ProduceRequestPartitionData, ProduceRequestTopicData, ProduceResponse,
21+
ResponseBodyWithMetadata,
1822
},
1923
primitives::*,
2024
record::{Record as ProtocolRecord, *},
@@ -195,10 +199,10 @@ impl PartitionClient {
195199
&self,
196200
records: Vec<Record>,
197201
compression: Compression,
198-
) -> Result<Vec<i64>> {
202+
) -> Result<ProduceResult> {
199203
// skip request entirely if `records` is empty
200204
if records.is_empty() {
201-
return Ok(vec![]);
205+
return Ok(ProduceResult::default());
202206
}
203207

204208
let n = records.len() as i64;
@@ -214,13 +218,20 @@ impl PartitionClient {
214218
.get()
215219
.await
216220
.map_err(|e| ErrorOrThrottle::Error((e, None)))?;
217-
let response = broker
218-
.request(&request)
221+
let ResponseBodyWithMetadata {
222+
response,
223+
encoded_request_size,
224+
} = broker
225+
.request_with_metadata(&request)
219226
.await
220227
.map_err(|e| ErrorOrThrottle::Error((e.into(), Some(r#gen))))?;
221228
maybe_throttle(response.throttle_time_ms)?;
222229
process_produce_response(self.partition, &self.topic, n, response)
223230
.map_err(|e| ErrorOrThrottle::Error((e, Some(r#gen))))
231+
.map(|offsets| ProduceResult {
232+
offsets,
233+
encoded_request_size,
234+
})
224235
},
225236
)
226237
.await

src/client/producer.rs

Lines changed: 20 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -207,7 +207,7 @@
207207
use std::sync::Arc;
208208
use std::time::Duration;
209209

210-
use futures::future::BoxFuture;
210+
use futures::{FutureExt, TryFutureExt, future::BoxFuture};
211211
use thiserror::Error;
212212
use tokio::task::JoinHandle;
213213
use tracing::*;
@@ -300,6 +300,16 @@ impl BatchProducerBuilder {
300300
}
301301
}
302302

303+
/// The result of the produce call.
304+
#[derive(Debug, Default)]
305+
pub struct ProduceResult {
306+
/// The offsets of the produced records.
307+
pub offsets: Vec<i64>,
308+
309+
/// The size of the request encoded in bytes.
310+
pub encoded_request_size: usize,
311+
}
312+
303313
/// The [`ProducerClient`] provides an abstraction over a Kafka client than can
304314
/// produce a record.
305315
///
@@ -316,15 +326,15 @@ pub trait ProducerClient: std::fmt::Debug + Send + Sync {
316326
&self,
317327
records: Vec<Record>,
318328
compression: Compression,
319-
) -> BoxFuture<'_, Result<Vec<i64>, ClientError>>;
329+
) -> BoxFuture<'_, Result<ProduceResult, ClientError>>;
320330
}
321331

322332
impl ProducerClient for PartitionClient {
323333
fn produce(
324334
&self,
325335
records: Vec<Record>,
326336
compression: Compression,
327-
) -> BoxFuture<'_, Result<Vec<i64>, ClientError>> {
337+
) -> BoxFuture<'_, Result<ProduceResult, ClientError>> {
328338
Box::pin(self.produce(records, compression))
329339
}
330340
}
@@ -703,7 +713,7 @@ mod tests {
703713
&self,
704714
records: Vec<Record>,
705715
_compression: Compression,
706-
) -> BoxFuture<'_, Result<Vec<i64>, ClientError>> {
716+
) -> BoxFuture<'_, Result<ProduceResult, ClientError>> {
707717
Box::pin(async move {
708718
tokio::time::sleep(self.delay).await;
709719

@@ -727,7 +737,12 @@ mod tests {
727737
.map(|x| (x + offset_base) as i64)
728738
.collect();
729739
batch_sizes.push(records.len());
730-
Ok(offsets)
740+
let record_size = records.iter().map(|r| r.approximate_size()).sum::<usize>();
741+
Ok(ProduceResult {
742+
offsets,
743+
// Uses the approximate size of the records to estimate the size of the request.
744+
encoded_request_size: record_size,
745+
})
731746
})
732747
}
733748
}

src/client/producer/batch.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -158,7 +158,7 @@ where
158158
async move {
159159
let res = match client.produce(batch, compression).await {
160160
Ok(status) => Ok(Arc::new(AggregatedStatus {
161-
aggregated_status: status,
161+
aggregated_status: status.offsets,
162162
status_deagg,
163163
})),
164164
Err(e) => {

src/messenger.rs

Lines changed: 24 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,10 @@ use tokio::{
2727
};
2828
use tracing::{debug, info, warn};
2929

30-
use crate::protocol::{messages::ApiVersionsRequest, traits::ReadType};
30+
use crate::protocol::{
31+
messages::{ApiVersionsRequest, ResponseBodyWithMetadata},
32+
traits::ReadType,
33+
};
3134
use crate::{
3235
backoff::ErrorOrThrottle,
3336
protocol::{
@@ -315,6 +318,19 @@ where
315318
}
316319

317320
pub async fn request<R>(&self, msg: R) -> Result<R::ResponseBody, RequestError>
321+
where
322+
R: RequestBody + Send + WriteVersionedType<Vec<u8>>,
323+
R::ResponseBody: ReadVersionedType<Cursor<Vec<u8>>>,
324+
{
325+
self.request_with_version_ranges(msg, &self.version_ranges)
326+
.await
327+
.map(|body| body.response)
328+
}
329+
330+
pub async fn request_with_metadata<R>(
331+
&self,
332+
msg: R,
333+
) -> Result<ResponseBodyWithMetadata<R::ResponseBody>, RequestError>
318334
where
319335
R: RequestBody + Send + WriteVersionedType<Vec<u8>>,
320336
R::ResponseBody: ReadVersionedType<Cursor<Vec<u8>>>,
@@ -327,7 +343,7 @@ where
327343
&self,
328344
msg: R,
329345
version_ranges: &HashMap<ApiKey, ApiVersionRange>,
330-
) -> Result<R::ResponseBody, RequestError>
346+
) -> Result<ResponseBodyWithMetadata<R::ResponseBody>, RequestError>
331347
where
332348
R: RequestBody + Send + WriteVersionedType<Vec<u8>>,
333349
R::ResponseBody: ReadVersionedType<Cursor<Vec<u8>>>,
@@ -371,6 +387,7 @@ where
371387
.write_versioned(&mut buf, header_version)
372388
.expect("Writing header to buffer should always work");
373389
msg.write_versioned(&mut buf, body_api_version)?;
390+
let encoded_size = buf.len();
374391

375392
let (tx, rx) = channel();
376393

@@ -412,7 +429,10 @@ where
412429
});
413430
}
414431

415-
Ok(body)
432+
Ok(ResponseBodyWithMetadata {
433+
response: body,
434+
encoded_request_size: encoded_size,
435+
})
416436
}
417437

418438
async fn send_message(&self, msg: Vec<u8>) -> Result<(), RequestError> {
@@ -468,7 +488,7 @@ where
468488
.request_with_version_ranges(&body, &version_ranges)
469489
.await
470490
{
471-
Ok(response) => {
491+
Ok(ResponseBodyWithMetadata { response, .. }) => {
472492
if let Err(ErrorOrThrottle::Throttle(throttle)) =
473493
maybe_throttle::<SyncVersionsError>(response.throttle_time_ms)
474494
{

src/protocol/messages/mod.rs

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -86,6 +86,16 @@ impl<W: Write, T: WriteVersionedType<W>> WriteVersionedType<W> for &T {
8686
}
8787
}
8888

89+
/// A response body with metadata about the request.
90+
#[derive(Debug)]
91+
pub struct ResponseBodyWithMetadata<R> {
92+
/// The response body.
93+
pub response: R,
94+
95+
/// The size of the request encoded in bytes.
96+
pub encoded_request_size: usize,
97+
}
98+
8999
/// Specifies a request body.
90100
pub trait RequestBody {
91101
/// The response type that will follow when issuing this request.

0 commit comments

Comments
 (0)