Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
21 changes: 16 additions & 5 deletions src/client/partition.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,9 @@
use crate::{
backoff::{Backoff, BackoffConfig, ErrorOrThrottle},
client::error::{Error, RequestContext, Result},
client::{
error::{Error, RequestContext, Result},
producer::ProduceResult,
},
connection::{
BrokerCache, BrokerCacheGeneration, BrokerConnection, BrokerConnector, MessengerTransport,
MetadataLookupMode,
Expand All @@ -15,6 +18,7 @@ use crate::{
ListOffsetsRequest, ListOffsetsRequestPartition, ListOffsetsRequestTopic,
ListOffsetsResponse, ListOffsetsResponsePartition, NORMAL_CONSUMER, ProduceRequest,
ProduceRequestPartitionData, ProduceRequestTopicData, ProduceResponse,
ResponseBodyWithMetadata,
},
primitives::*,
record::{Record as ProtocolRecord, *},
Expand Down Expand Up @@ -195,10 +199,10 @@ impl PartitionClient {
&self,
records: Vec<Record>,
compression: Compression,
) -> Result<Vec<i64>> {
) -> Result<ProduceResult> {
// skip request entirely if `records` is empty
if records.is_empty() {
return Ok(vec![]);
return Ok(ProduceResult::default());
}

let n = records.len() as i64;
Expand All @@ -214,13 +218,20 @@ impl PartitionClient {
.get()
.await
.map_err(|e| ErrorOrThrottle::Error((e, None)))?;
let response = broker
.request(&request)
let ResponseBodyWithMetadata {
response,
encoded_request_size,
} = broker
.request_with_metadata(&request)
.await
.map_err(|e| ErrorOrThrottle::Error((e.into(), Some(r#gen))))?;
maybe_throttle(response.throttle_time_ms)?;
process_produce_response(self.partition, &self.topic, n, response)
.map_err(|e| ErrorOrThrottle::Error((e, Some(r#gen))))
.map(|offsets| ProduceResult {
offsets,
encoded_request_size,
})
},
)
.await
Expand Down
23 changes: 19 additions & 4 deletions src/client/producer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -300,6 +300,16 @@ impl BatchProducerBuilder {
}
}

/// The result of the produce call.
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
/// The result of the produce call.
/// The result of the [produce](ProducerClient::produce) call.

This makes it clearer which API you're referencing here.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Updated in a12992f

#[derive(Debug, Default)]
pub struct ProduceResult {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
pub struct ProduceResult {
#[non_exhaustive]
pub struct ProduceResult {

makes it easier to extend this later w/o introducing breaking changes.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added in a12992f

/// The offsets of the produced records.
pub offsets: Vec<i64>,

/// The size of the request encoded in bytes.
pub encoded_request_size: usize,
}

/// The [`ProducerClient`] provides an abstraction over a Kafka client than can
/// produce a record.
///
Expand All @@ -316,15 +326,15 @@ pub trait ProducerClient: std::fmt::Debug + Send + Sync {
&self,
records: Vec<Record>,
compression: Compression,
) -> BoxFuture<'_, Result<Vec<i64>, ClientError>>;
) -> BoxFuture<'_, Result<ProduceResult, ClientError>>;
}

impl ProducerClient for PartitionClient {
fn produce(
&self,
records: Vec<Record>,
compression: Compression,
) -> BoxFuture<'_, Result<Vec<i64>, ClientError>> {
) -> BoxFuture<'_, Result<ProduceResult, ClientError>> {
Box::pin(self.produce(records, compression))
}
}
Expand Down Expand Up @@ -703,7 +713,7 @@ mod tests {
&self,
records: Vec<Record>,
_compression: Compression,
) -> BoxFuture<'_, Result<Vec<i64>, ClientError>> {
) -> BoxFuture<'_, Result<ProduceResult, ClientError>> {
Box::pin(async move {
tokio::time::sleep(self.delay).await;

Expand All @@ -727,7 +737,12 @@ mod tests {
.map(|x| (x + offset_base) as i64)
.collect();
batch_sizes.push(records.len());
Ok(offsets)
let record_size = records.iter().map(|r| r.approximate_size()).sum::<usize>();
Ok(ProduceResult {
offsets,
// Uses the approximate size of the records to estimate the size of the request.
encoded_request_size: record_size,
})
})
}
}
Expand Down
2 changes: 1 addition & 1 deletion src/client/producer/batch.rs
Original file line number Diff line number Diff line change
Expand Up @@ -158,7 +158,7 @@ where
async move {
let res = match client.produce(batch, compression).await {
Ok(status) => Ok(Arc::new(AggregatedStatus {
aggregated_status: status,
aggregated_status: status.offsets,
status_deagg,
})),
Err(e) => {
Expand Down
28 changes: 24 additions & 4 deletions src/messenger.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,10 @@ use tokio::{
};
use tracing::{debug, info, warn};

use crate::protocol::{messages::ApiVersionsRequest, traits::ReadType};
use crate::protocol::{
messages::{ApiVersionsRequest, ResponseBodyWithMetadata},
traits::ReadType,
};
use crate::{
backoff::ErrorOrThrottle,
protocol::{
Expand Down Expand Up @@ -315,6 +318,19 @@ where
}

pub async fn request<R>(&self, msg: R) -> Result<R::ResponseBody, RequestError>
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

While this method is technically public, I doubt that anybody really uses it. So I think request should just return the metadata as well, in the same way that request_with_version_ranges does. This keeps the API kinda lean and avoids having all sorts of permutations.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Resolved in a12992f

where
R: RequestBody + Send + WriteVersionedType<Vec<u8>>,
R::ResponseBody: ReadVersionedType<Cursor<Vec<u8>>>,
{
self.request_with_version_ranges(msg, &self.version_ranges)
.await
.map(|body| body.response)
}

pub async fn request_with_metadata<R>(
&self,
msg: R,
) -> Result<ResponseBodyWithMetadata<R::ResponseBody>, RequestError>
where
R: RequestBody + Send + WriteVersionedType<Vec<u8>>,
R::ResponseBody: ReadVersionedType<Cursor<Vec<u8>>>,
Expand All @@ -327,7 +343,7 @@ where
&self,
msg: R,
version_ranges: &HashMap<ApiKey, ApiVersionRange>,
) -> Result<R::ResponseBody, RequestError>
) -> Result<ResponseBodyWithMetadata<R::ResponseBody>, RequestError>
where
R: RequestBody + Send + WriteVersionedType<Vec<u8>>,
R::ResponseBody: ReadVersionedType<Cursor<Vec<u8>>>,
Expand Down Expand Up @@ -371,6 +387,7 @@ where
.write_versioned(&mut buf, header_version)
.expect("Writing header to buffer should always work");
msg.write_versioned(&mut buf, body_api_version)?;
let encoded_size = buf.len();

let (tx, rx) = channel();

Expand Down Expand Up @@ -412,7 +429,10 @@ where
});
}

Ok(body)
Ok(ResponseBodyWithMetadata {
response: body,
encoded_request_size: encoded_size,
})
}

async fn send_message(&self, msg: Vec<u8>) -> Result<(), RequestError> {
Expand Down Expand Up @@ -468,7 +488,7 @@ where
.request_with_version_ranges(&body, &version_ranges)
.await
{
Ok(response) => {
Ok(ResponseBodyWithMetadata { response, .. }) => {
if let Err(ErrorOrThrottle::Throttle(throttle)) =
maybe_throttle::<SyncVersionsError>(response.throttle_time_ms)
{
Expand Down
10 changes: 10 additions & 0 deletions src/protocol/messages/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,16 @@ impl<W: Write, T: WriteVersionedType<W>> WriteVersionedType<W> for &T {
}
}

/// A response body with metadata about the request.
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
/// A response body with metadata about the request.
/// A response body with metadata about the request & response.

I think we can make this a bit more generic for future additions.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Updated in a12992f

#[derive(Debug)]
pub struct ResponseBodyWithMetadata<R> {
/// The response body.
pub response: R,

/// The size of the request encoded in bytes.
pub encoded_request_size: usize,
}

/// Specifies a request body.
pub trait RequestBody {
/// The response type that will follow when issuing this request.
Expand Down
21 changes: 14 additions & 7 deletions tests/client.rs
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think the end2end test should assert this new field as well.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added in b1b5c99

Original file line number Diff line number Diff line change
Expand Up @@ -372,7 +372,8 @@ async fn test_consume_offset_out_of_range() {
let offsets = partition_client
.produce(vec![record], Compression::NoCompression)
.await
.unwrap();
.unwrap()
.offsets;
let offset = offsets[0];

let err = partition_client
Expand Down Expand Up @@ -434,13 +435,15 @@ async fn test_get_offset() {
let offsets = partition_client
.produce(vec![record_late.clone()], Compression::NoCompression)
.await
.unwrap();
.unwrap()
.offsets;
assert_eq!(offsets[0], 0);

let offsets = partition_client
.produce(vec![record_early.clone()], Compression::NoCompression)
.await
.unwrap();
.unwrap()
.offsets;
assert_eq!(offsets.len(), 1);
assert_eq!(offsets[0], 1);

Expand Down Expand Up @@ -570,7 +573,8 @@ async fn test_consume_midbatch() {
Compression::NoCompression,
)
.await
.unwrap();
.unwrap()
.offsets;
let _offset_1 = offsets[0];
let offset_2 = offsets[1];

Expand Down Expand Up @@ -623,7 +627,8 @@ async fn test_delete_records() {
let offsets = partition_client
.produce(vec![record_1.clone()], Compression::NoCompression)
.await
.unwrap();
.unwrap()
.offsets;
let offset_1 = offsets[0];

let offsets = partition_client
Expand All @@ -632,14 +637,16 @@ async fn test_delete_records() {
Compression::NoCompression,
)
.await
.unwrap();
.unwrap()
.offsets;
let offset_2 = offsets[0];
let offset_3 = offsets[1];

let offsets = partition_client
.produce(vec![record_4.clone()], Compression::NoCompression)
.await
.unwrap();
.unwrap()
.offsets;
let offset_4 = offsets[0];

// delete from the middle of the 2nd batch
Expand Down
1 change: 1 addition & 0 deletions tests/produce_consume.rs
Original file line number Diff line number Diff line change
Expand Up @@ -401,6 +401,7 @@ async fn produce_rskafka(
.produce(records, compression)
.await
.unwrap()
.offsets
}

async fn consume_java(
Expand Down