Skip to content

Commit cffa616

Browse files
committed
fix: filter out records that we've not requested
1 parent 24aac8b commit cffa616

File tree

2 files changed

+64
-8
lines changed

2 files changed

+64
-8
lines changed

src/client/partition.rs

Lines changed: 13 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -137,7 +137,7 @@ impl PartitionClient {
137137
));
138138
}
139139

140-
let records = extract_records(partition.records.0)?;
140+
let records = extract_records(partition.records.0, offset)?;
141141

142142
Ok((records, partition.high_watermark.0))
143143
}
@@ -535,7 +535,10 @@ fn process_fetch_response(
535535
Ok(response_partition)
536536
}
537537

538-
fn extract_records(partition_records: Vec<RecordBatch>) -> Result<Vec<RecordAndOffset>> {
538+
fn extract_records(
539+
partition_records: Vec<RecordBatch>,
540+
request_offset: i64,
541+
) -> Result<Vec<RecordAndOffset>> {
539542
let mut records = vec![];
540543

541544
for batch in partition_records {
@@ -547,6 +550,13 @@ fn extract_records(partition_records: Vec<RecordBatch>) -> Result<Vec<RecordAndO
547550
records.reserve(protocol_records.len());
548551

549552
for record in protocol_records {
553+
let offset = batch.base_offset + record.offset_delta as i64;
554+
if offset < request_offset {
555+
// Kafka does not split record batches on the server side, so we need to do this filtering on
556+
// the client side
557+
continue;
558+
}
559+
550560
let timestamp = OffsetDateTime::from_unix_timestamp_nanos(
551561
(batch.first_timestamp + record.timestamp_delta) as i128 * 1_000_000,
552562
)
@@ -565,7 +575,7 @@ fn extract_records(partition_records: Vec<RecordBatch>) -> Result<Vec<RecordAndO
565575
.collect(),
566576
timestamp,
567577
},
568-
offset: batch.base_offset + record.offset_delta as i64,
578+
offset,
569579
})
570580
}
571581
}

tests/client.rs

Lines changed: 51 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -338,6 +338,55 @@ async fn test_produce_consume_size_cutoff() {
338338
assert!(is_kafka ^ is_redpanda);
339339
}
340340

341+
#[tokio::test]
342+
async fn test_consume_midbatch() {
343+
maybe_start_logging();
344+
345+
let connection = maybe_skip_kafka_integration!();
346+
let topic_name = random_topic_name();
347+
348+
let client = ClientBuilder::new(vec![connection]).build().await.unwrap();
349+
let controller_client = client.controller_client().await.unwrap();
350+
controller_client
351+
.create_topic(&topic_name, 1, 1, 5_000)
352+
.await
353+
.unwrap();
354+
355+
let partition_client = client.partition_client(&topic_name, 0).await.unwrap();
356+
357+
// produce two records into a single batch
358+
let record_1 = record();
359+
let record_2 = Record {
360+
value: Some(b"x".to_vec()),
361+
timestamp: now(),
362+
..record_1.clone()
363+
};
364+
365+
let offsets = partition_client
366+
.produce(
367+
vec![record_1.clone(), record_2.clone()],
368+
Compression::NoCompression,
369+
)
370+
.await
371+
.unwrap();
372+
let _offset_1 = offsets[0];
373+
let offset_2 = offsets[1];
374+
375+
// when fetching from the middle of the record batch, the server will return both records but we should filter out
376+
// the first one on the client side
377+
let (records, _watermark) = partition_client
378+
.fetch_records(offset_2, 1..10_000, 1_000)
379+
.await
380+
.unwrap();
381+
assert_eq!(
382+
records,
383+
vec![RecordAndOffset {
384+
record: record_2,
385+
offset: offset_2
386+
},],
387+
);
388+
}
389+
341390
#[tokio::test]
342391
async fn test_delete_records() {
343392
maybe_start_logging();
@@ -429,18 +478,15 @@ async fn test_delete_records() {
429478
ClientError::ServerError(ProtocolError::OffsetOutOfRange, _)
430479
);
431480

432-
// fetching untouched records still works, however the middle record batch is NOT half-deleted and still contains record_2
481+
// fetching untouched records still works, however the middle record batch is NOT half-deleted and still contains
482+
// record_2. `fetch_records` should filter this however.
433483
let (records, _watermark) = partition_client
434484
.fetch_records(offset_3, 1..10_000, 1_000)
435485
.await
436486
.unwrap();
437487
assert_eq!(
438488
records,
439489
vec![
440-
RecordAndOffset {
441-
record: record_2,
442-
offset: offset_2
443-
},
444490
RecordAndOffset {
445491
record: record_3,
446492
offset: offset_3

0 commit comments

Comments
 (0)