Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 4 additions & 2 deletions src/client/controller.rs
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,8 @@ impl ControllerClient {
let response = broker
.request(request)
.await
.map_err(|e| ErrorOrThrottle::Error((e.into(), Some(r#gen))))?;
.map_err(|e| ErrorOrThrottle::Error((e.into(), Some(r#gen))))?
.response;

maybe_throttle(response.throttle_time_ms)?;

Expand Down Expand Up @@ -122,7 +123,8 @@ impl ControllerClient {
let response = broker
.request(request)
.await
.map_err(|e| ErrorOrThrottle::Error((e.into(), Some(r#gen))))?;
.map_err(|e| ErrorOrThrottle::Error((e.into(), Some(r#gen))))?
.response;

maybe_throttle(response.throttle_time_ms)?;

Expand Down
28 changes: 21 additions & 7 deletions src/client/partition.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,9 @@
use crate::{
backoff::{Backoff, BackoffConfig, ErrorOrThrottle},
client::error::{Error, RequestContext, Result},
client::{
error::{Error, RequestContext, Result},
producer::ProduceResult,
},
connection::{
BrokerCache, BrokerCacheGeneration, BrokerConnection, BrokerConnector, MessengerTransport,
MetadataLookupMode,
Expand All @@ -15,6 +18,7 @@ use crate::{
ListOffsetsRequest, ListOffsetsRequestPartition, ListOffsetsRequestTopic,
ListOffsetsResponse, ListOffsetsResponsePartition, NORMAL_CONSUMER, ProduceRequest,
ProduceRequestPartitionData, ProduceRequestTopicData, ProduceResponse,
ResponseBodyWithMetadata,
},
primitives::*,
record::{Record as ProtocolRecord, *},
Expand Down Expand Up @@ -195,10 +199,10 @@ impl PartitionClient {
&self,
records: Vec<Record>,
compression: Compression,
) -> Result<Vec<i64>> {
) -> Result<ProduceResult> {
// skip request entirely if `records` is empty
if records.is_empty() {
return Ok(vec![]);
return Ok(ProduceResult::default());
}

let n = records.len() as i64;
Expand All @@ -214,13 +218,20 @@ impl PartitionClient {
.get()
.await
.map_err(|e| ErrorOrThrottle::Error((e, None)))?;
let response = broker
let ResponseBodyWithMetadata {
response,
encoded_request_size,
} = broker
.request(&request)
.await
.map_err(|e| ErrorOrThrottle::Error((e.into(), Some(r#gen))))?;
maybe_throttle(response.throttle_time_ms)?;
process_produce_response(self.partition, &self.topic, n, response)
.map_err(|e| ErrorOrThrottle::Error((e, Some(r#gen))))
.map(|offsets| ProduceResult {
offsets,
encoded_request_size,
})
},
)
.await
Expand Down Expand Up @@ -256,7 +267,8 @@ impl PartitionClient {
let response = broker
.request(&request)
.await
.map_err(|e| ErrorOrThrottle::Error((e.into(), Some(r#gen))))?;
.map_err(|e| ErrorOrThrottle::Error((e.into(), Some(r#gen))))?
.response;
maybe_throttle(response.throttle_time_ms)?;
process_fetch_response(self.partition, &self.topic, response, offset)
.map_err(|e| ErrorOrThrottle::Error((e, Some(r#gen))))
Expand Down Expand Up @@ -292,7 +304,8 @@ impl PartitionClient {
let response = broker
.request(&request)
.await
.map_err(|e| ErrorOrThrottle::Error((e.into(), Some(r#gen))))?;
.map_err(|e| ErrorOrThrottle::Error((e.into(), Some(r#gen))))?
.response;
maybe_throttle(response.throttle_time_ms)?;
process_list_offsets_response(self.partition, &self.topic, response)
.map_err(|e| ErrorOrThrottle::Error((e, Some(r#gen))))
Expand Down Expand Up @@ -325,7 +338,8 @@ impl PartitionClient {
let response = broker
.request(&request)
.await
.map_err(|e| ErrorOrThrottle::Error((e.into(), Some(r#gen))))?;
.map_err(|e| ErrorOrThrottle::Error((e.into(), Some(r#gen))))?
.response;
maybe_throttle(Some(response.throttle_time_ms))?;
process_delete_records_response(&self.topic, self.partition, response)
.map_err(|e| ErrorOrThrottle::Error((e, Some(r#gen))))
Expand Down
24 changes: 20 additions & 4 deletions src/client/producer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -300,6 +300,17 @@ impl BatchProducerBuilder {
}
}

/// The result of the [produce](ProducerClient::produce) call.
#[derive(Debug, Default)]
#[non_exhaustive]
pub struct ProduceResult {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
pub struct ProduceResult {
#[non_exhaustive]
pub struct ProduceResult {

makes it easier to extend this later w/o introducing breaking changes.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added in a12992f

/// The offsets of the produced records.
pub offsets: Vec<i64>,

/// The size of the request encoded in bytes.
pub encoded_request_size: usize,
}

/// The [`ProducerClient`] provides an abstraction over a Kafka client than can
/// produce a record.
///
Expand All @@ -316,15 +327,15 @@ pub trait ProducerClient: std::fmt::Debug + Send + Sync {
&self,
records: Vec<Record>,
compression: Compression,
) -> BoxFuture<'_, Result<Vec<i64>, ClientError>>;
) -> BoxFuture<'_, Result<ProduceResult, ClientError>>;
}

impl ProducerClient for PartitionClient {
fn produce(
&self,
records: Vec<Record>,
compression: Compression,
) -> BoxFuture<'_, Result<Vec<i64>, ClientError>> {
) -> BoxFuture<'_, Result<ProduceResult, ClientError>> {
Box::pin(self.produce(records, compression))
}
}
Expand Down Expand Up @@ -703,7 +714,7 @@ mod tests {
&self,
records: Vec<Record>,
_compression: Compression,
) -> BoxFuture<'_, Result<Vec<i64>, ClientError>> {
) -> BoxFuture<'_, Result<ProduceResult, ClientError>> {
Box::pin(async move {
tokio::time::sleep(self.delay).await;

Expand All @@ -727,7 +738,12 @@ mod tests {
.map(|x| (x + offset_base) as i64)
.collect();
batch_sizes.push(records.len());
Ok(offsets)
let record_size = records.iter().map(|r| r.approximate_size()).sum::<usize>();
Ok(ProduceResult {
offsets,
// Uses the approximate size of the records to estimate the size of the request.
encoded_request_size: record_size,
})
})
}
}
Expand Down
2 changes: 1 addition & 1 deletion src/client/producer/batch.rs
Original file line number Diff line number Diff line change
Expand Up @@ -158,7 +158,7 @@ where
async move {
let res = match client.produce(batch, compression).await {
Ok(status) => Ok(Arc::new(AggregatedStatus {
aggregated_status: status,
aggregated_status: status.offsets,
status_deagg,
})),
Err(e) => {
Expand Down
2 changes: 1 addition & 1 deletion src/connection.rs
Original file line number Diff line number Diff line change
Expand Up @@ -392,7 +392,7 @@ impl RequestHandler for MessengerTransport {
&self,
request_params: &MetadataRequest,
) -> Result<MetadataResponse, RequestError> {
self.request(request_params).await
self.request(request_params).await.map(|r| r.response)
}
}

Expand Down
27 changes: 19 additions & 8 deletions src/messenger.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,10 @@ use tokio::{
};
use tracing::{debug, info, warn};

use crate::protocol::{messages::ApiVersionsRequest, traits::ReadType};
use crate::protocol::{
messages::{ApiVersionsRequest, ResponseBodyWithMetadata},
traits::ReadType,
};
use crate::{
backoff::ErrorOrThrottle,
protocol::{
Expand Down Expand Up @@ -314,7 +317,10 @@ where
self.version_ranges = ranges;
}

pub async fn request<R>(&self, msg: R) -> Result<R::ResponseBody, RequestError>
pub async fn request<R>(
&self,
msg: R,
) -> Result<ResponseBodyWithMetadata<R::ResponseBody>, RequestError>
where
R: RequestBody + Send + WriteVersionedType<Vec<u8>>,
R::ResponseBody: ReadVersionedType<Cursor<Vec<u8>>>,
Expand All @@ -327,7 +333,7 @@ where
&self,
msg: R,
version_ranges: &HashMap<ApiKey, ApiVersionRange>,
) -> Result<R::ResponseBody, RequestError>
) -> Result<ResponseBodyWithMetadata<R::ResponseBody>, RequestError>
where
R: RequestBody + Send + WriteVersionedType<Vec<u8>>,
R::ResponseBody: ReadVersionedType<Cursor<Vec<u8>>>,
Expand Down Expand Up @@ -371,6 +377,7 @@ where
.write_versioned(&mut buf, header_version)
.expect("Writing header to buffer should always work");
msg.write_versioned(&mut buf, body_api_version)?;
let encoded_size = buf.len();

let (tx, rx) = channel();

Expand Down Expand Up @@ -412,7 +419,10 @@ where
});
}

Ok(body)
Ok(ResponseBodyWithMetadata {
response: body,
encoded_request_size: encoded_size,
})
}

async fn send_message(&self, msg: Vec<u8>) -> Result<(), RequestError> {
Expand Down Expand Up @@ -468,7 +478,7 @@ where
.request_with_version_ranges(&body, &version_ranges)
.await
{
Ok(response) => {
Ok(ResponseBodyWithMetadata { response, .. }) => {
if let Err(ErrorOrThrottle::Throttle(throttle)) =
maybe_throttle::<SyncVersionsError>(response.throttle_time_ms)
{
Expand Down Expand Up @@ -560,7 +570,7 @@ where
auth_bytes: Vec<u8>,
) -> Result<SaslAuthenticateResponse, SaslError> {
let req = SaslAuthenticateRequest::new(auth_bytes);
let resp = self.request(req).await?;
let resp = self.request(req).await?.response;
if let Some(err) = resp.error_code {
if let Some(s) = resp.error_message.0 {
debug!("Sasl auth error message: {s}");
Expand All @@ -573,7 +583,7 @@ where

async fn sasl_handshake(&self, mechanism: &str) -> Result<SaslHandshakeResponse, SaslError> {
let req = SaslHandshakeRequest::new(mechanism);
let resp = self.request(req).await?;
let resp = self.request(req).await?.response;
if let Some(err) = resp.error_code {
return Err(SaslError::ApiError(err));
}
Expand Down Expand Up @@ -1184,7 +1194,8 @@ mod tests {
tagged_fields: Some(TaggedFields::default()),
})
.await
.unwrap();
.unwrap()
.response;
assert_eq!(actual, resp);
}

Expand Down
10 changes: 10 additions & 0 deletions src/protocol/messages/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,16 @@ impl<W: Write, T: WriteVersionedType<W>> WriteVersionedType<W> for &T {
}
}

/// A response body with metadata about the request & response.
#[derive(Debug)]
pub struct ResponseBodyWithMetadata<R> {
/// The response body.
pub response: R,

/// The size of the request encoded in bytes.
pub encoded_request_size: usize,
}

/// Specifies a request body.
pub trait RequestBody {
/// The response type that will follow when issuing this request.
Expand Down
Loading