Skip to content

Commit 570b98e

Browse files
committed
fix: heuristic to detect OffsetOutOfRange for redpanda
1 parent 0cf4127 commit 570b98e

File tree

3 files changed

+41
-3
lines changed

3 files changed

+41
-3
lines changed

src/client/partition.rs

Lines changed: 13 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,7 @@ use std::ops::{ControlFlow, Deref, Range};
2323
use std::sync::Arc;
2424
use time::OffsetDateTime;
2525
use tokio::sync::Mutex;
26-
use tracing::{error, info};
26+
use tracing::{error, info, warn};
2727

2828
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
2929
pub enum Compression {
@@ -124,6 +124,18 @@ impl PartitionClient {
124124
})
125125
.await?;
126126

127+
// Redpanda never sends OffsetOutOfRange even when it should. "Luckily" it does not support deletions so we can
128+
// implement a simple heuristic.
129+
if partition.high_watermark.0 < offset {
130+
warn!(
131+
"This message looks like Redpanda wants to report a OffsetOutOfRange but doesn't."
132+
);
133+
return Err(Error::ServerError(
134+
ProtocolError::OffsetOutOfRange,
135+
String::from("Offset out of range"),
136+
));
137+
}
138+
127139
let records = extract_records(partition.records.0)?;
128140

129141
Ok((records, partition.high_watermark.0))

tests/client.rs

Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -157,6 +157,30 @@ async fn test_produce_empty() {
157157
.unwrap();
158158
}
159159

160+
#[tokio::test]
161+
async fn test_consume_empty() {
162+
maybe_start_logging();
163+
164+
let connection = maybe_skip_kafka_integration!();
165+
let topic_name = random_topic_name();
166+
let n_partitions = 2;
167+
168+
let client = ClientBuilder::new(vec![connection]).build().await.unwrap();
169+
let controller_client = client.controller_client().await.unwrap();
170+
controller_client
171+
.create_topic(&topic_name, n_partitions, 1, 5_000)
172+
.await
173+
.unwrap();
174+
175+
let partition_client = client.partition_client(&topic_name, 1).await.unwrap();
176+
let (records, watermark) = partition_client
177+
.fetch_records(0, 1..10_000, 1_000)
178+
.await
179+
.unwrap();
180+
assert!(records.is_empty());
181+
assert_eq!(watermark, 0);
182+
}
183+
160184
#[tokio::test]
161185
async fn test_get_high_watermark() {
162186
maybe_start_logging();

tests/consumer.rs

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -37,7 +37,9 @@ async fn test_stream_consumer() {
3737
.await
3838
.unwrap();
3939

40-
let mut stream = StreamConsumerBuilder::new(Arc::clone(&partition_client), 0).build();
40+
let mut stream = StreamConsumerBuilder::new(Arc::clone(&partition_client), 0)
41+
.with_max_wait_ms(50)
42+
.build();
4143

4244
let assert_ok =
4345
|r: Result<Option<<StreamConsumer as Stream>::Item>, tokio::time::error::Elapsed>| {
@@ -50,7 +52,7 @@ async fn test_stream_consumer() {
5052
assert_ok(timeout(Duration::from_millis(100), stream.next()).await);
5153

5254
// No further records
53-
timeout(Duration::from_millis(100), stream.next())
55+
timeout(Duration::from_millis(200), stream.next())
5456
.await
5557
.expect_err("timeout");
5658

0 commit comments

Comments
 (0)