Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 13 additions & 3 deletions src/client/partition.rs
Original file line number Diff line number Diff line change
Expand Up @@ -137,7 +137,7 @@ impl PartitionClient {
));
}

let records = extract_records(partition.records.0)?;
let records = extract_records(partition.records.0, offset)?;

Ok((records, partition.high_watermark.0))
}
Expand Down Expand Up @@ -535,7 +535,10 @@ fn process_fetch_response(
Ok(response_partition)
}

fn extract_records(partition_records: Vec<RecordBatch>) -> Result<Vec<RecordAndOffset>> {
fn extract_records(
partition_records: Vec<RecordBatch>,
request_offset: i64,
) -> Result<Vec<RecordAndOffset>> {
let mut records = vec![];

for batch in partition_records {
Expand All @@ -547,6 +550,13 @@ fn extract_records(partition_records: Vec<RecordBatch>) -> Result<Vec<RecordAndO
records.reserve(protocol_records.len());

for record in protocol_records {
let offset = batch.base_offset + record.offset_delta as i64;
if offset < request_offset {
// Kafka does not split record batches on the server side, so we need to do this filtering on
// the client side
continue;
}

let timestamp = OffsetDateTime::from_unix_timestamp_nanos(
(batch.first_timestamp + record.timestamp_delta) as i128 * 1_000_000,
)
Expand All @@ -565,7 +575,7 @@ fn extract_records(partition_records: Vec<RecordBatch>) -> Result<Vec<RecordAndO
.collect(),
timestamp,
},
offset: batch.base_offset + record.offset_delta as i64,
offset,
})
}
}
Expand Down
56 changes: 51 additions & 5 deletions tests/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -338,6 +338,55 @@ async fn test_produce_consume_size_cutoff() {
assert!(is_kafka ^ is_redpanda);
}

#[tokio::test]
async fn test_consume_midbatch() {
maybe_start_logging();

let connection = maybe_skip_kafka_integration!();
let topic_name = random_topic_name();

let client = ClientBuilder::new(vec![connection]).build().await.unwrap();
let controller_client = client.controller_client().await.unwrap();
controller_client
.create_topic(&topic_name, 1, 1, 5_000)
.await
.unwrap();

let partition_client = client.partition_client(&topic_name, 0).await.unwrap();

// produce two records into a single batch
let record_1 = record();
let record_2 = Record {
value: Some(b"x".to_vec()),
timestamp: now(),
..record_1.clone()
};

let offsets = partition_client
.produce(
vec![record_1.clone(), record_2.clone()],
Compression::NoCompression,
)
.await
.unwrap();
let _offset_1 = offsets[0];
let offset_2 = offsets[1];

// when fetching from the middle of the record batch, the server will return both records but we should filter out
// the first one on the client side
let (records, _watermark) = partition_client
.fetch_records(offset_2, 1..10_000, 1_000)
.await
.unwrap();
assert_eq!(
records,
vec![RecordAndOffset {
record: record_2,
offset: offset_2
},],
);
}

#[tokio::test]
async fn test_delete_records() {
maybe_start_logging();
Expand Down Expand Up @@ -429,18 +478,15 @@ async fn test_delete_records() {
ClientError::ServerError(ProtocolError::OffsetOutOfRange, _)
);

// fetching untouched records still works, however the middle record batch is NOT half-deleted and still contains record_2
// fetching untouched records still works, however the middle record batch is NOT half-deleted and still contains
// record_2. `fetch_records` should filter this however.
let (records, _watermark) = partition_client
.fetch_records(offset_3, 1..10_000, 1_000)
.await
.unwrap();
assert_eq!(
records,
vec![
RecordAndOffset {
record: record_2,
offset: offset_2
},
RecordAndOffset {
record: record_3,
offset: offset_3
Expand Down