Skip to content

Commit 1a4e849

Browse files
committed
fix: heuristic to detect OffsetOutOfRange for redpanda
1 parent 0cf4127 commit 1a4e849

File tree

3 files changed

+75
-3
lines changed

3 files changed

+75
-3
lines changed

src/client/partition.rs

Lines changed: 13 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,7 @@ use std::ops::{ControlFlow, Deref, Range};
2323
use std::sync::Arc;
2424
use time::OffsetDateTime;
2525
use tokio::sync::Mutex;
26-
use tracing::{error, info};
26+
use tracing::{error, info, warn};
2727

2828
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
2929
pub enum Compression {
@@ -124,6 +124,18 @@ impl PartitionClient {
124124
})
125125
.await?;
126126

127+
// Redpanda never sends OffsetOutOfRange even when it should. "Luckily" it does not support deletions so we can
128+
// implement a simple heuristic.
129+
if partition.high_watermark.0 < offset {
130+
warn!(
131+
"This message looks like Redpanda wants to report a OffsetOutOfRange but doesn't."
132+
);
133+
return Err(Error::ServerError(
134+
ProtocolError::OffsetOutOfRange,
135+
String::from("Offset out of range"),
136+
));
137+
}
138+
127139
let records = extract_records(partition.records.0)?;
128140

129141
Ok((records, partition.high_watermark.0))

tests/client.rs

Lines changed: 58 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,4 @@
1+
use assert_matches::assert_matches;
12
use rskafka::{
23
client::{
34
error::{Error as ClientError, ProtocolError},
@@ -157,6 +158,63 @@ async fn test_produce_empty() {
157158
.unwrap();
158159
}
159160

161+
#[tokio::test]
162+
async fn test_consume_empty() {
163+
maybe_start_logging();
164+
165+
let connection = maybe_skip_kafka_integration!();
166+
let topic_name = random_topic_name();
167+
let n_partitions = 2;
168+
169+
let client = ClientBuilder::new(vec![connection]).build().await.unwrap();
170+
let controller_client = client.controller_client().await.unwrap();
171+
controller_client
172+
.create_topic(&topic_name, n_partitions, 1, 5_000)
173+
.await
174+
.unwrap();
175+
176+
let partition_client = client.partition_client(&topic_name, 1).await.unwrap();
177+
let (records, watermark) = partition_client
178+
.fetch_records(0, 1..10_000, 1_000)
179+
.await
180+
.unwrap();
181+
assert!(records.is_empty());
182+
assert_eq!(watermark, 0);
183+
}
184+
185+
#[tokio::test]
186+
async fn test_consume_offset_out_of_range() {
187+
maybe_start_logging();
188+
189+
let connection = maybe_skip_kafka_integration!();
190+
let topic_name = random_topic_name();
191+
let n_partitions = 2;
192+
193+
let client = ClientBuilder::new(vec![connection]).build().await.unwrap();
194+
let controller_client = client.controller_client().await.unwrap();
195+
controller_client
196+
.create_topic(&topic_name, n_partitions, 1, 5_000)
197+
.await
198+
.unwrap();
199+
200+
let partition_client = client.partition_client(&topic_name, 1).await.unwrap();
201+
let record = record();
202+
let offsets = partition_client
203+
.produce(vec![record], Compression::NoCompression)
204+
.await
205+
.unwrap();
206+
let offset = offsets[0];
207+
208+
let err = partition_client
209+
.fetch_records(offset + 2, 1..10_000, 1_000)
210+
.await
211+
.unwrap_err();
212+
assert_matches!(
213+
err,
214+
ClientError::ServerError(ProtocolError::OffsetOutOfRange, _)
215+
);
216+
}
217+
160218
#[tokio::test]
161219
async fn test_get_high_watermark() {
162220
maybe_start_logging();

tests/consumer.rs

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -37,7 +37,9 @@ async fn test_stream_consumer() {
3737
.await
3838
.unwrap();
3939

40-
let mut stream = StreamConsumerBuilder::new(Arc::clone(&partition_client), 0).build();
40+
let mut stream = StreamConsumerBuilder::new(Arc::clone(&partition_client), 0)
41+
.with_max_wait_ms(50)
42+
.build();
4143

4244
let assert_ok =
4345
|r: Result<Option<<StreamConsumer as Stream>::Item>, tokio::time::error::Elapsed>| {
@@ -50,7 +52,7 @@ async fn test_stream_consumer() {
5052
assert_ok(timeout(Duration::from_millis(100), stream.next()).await);
5153

5254
// No further records
53-
timeout(Duration::from_millis(100), stream.next())
55+
timeout(Duration::from_millis(200), stream.next())
5456
.await
5557
.expect_err("timeout");
5658

0 commit comments

Comments
 (0)