Skip to content

Commit 0f9cd4e

Browse files
Merge pull request #96 from influxdata/crepererum/consumer_stream_offset_out_of_range
fix: terminate consumer stream on `OffsetOutOfRange`
2 parents a1b8533 + 1a4e849 commit 0f9cd4e

File tree

4 files changed

+187
-6
lines changed

4 files changed

+187
-6
lines changed

src/client/consumer.rs

Lines changed: 77 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -111,6 +111,7 @@ impl StreamConsumerBuilder {
111111
min_batch_size: self.min_batch_size,
112112
max_batch_size: self.max_batch_size,
113113
next_offset: self.start_offset,
114+
terminated: false,
114115
last_high_watermark: -1,
115116
buffer: Default::default(),
116117
fetch_fut: Fuse::terminated(),
@@ -122,6 +123,9 @@ type FetchResult = Result<(Vec<RecordAndOffset>, i64)>;
122123

123124
/// A trait wrapper to allow mocking
124125
trait FetchClient: std::fmt::Debug + Send + Sync {
126+
/// Fetch records.
127+
///
128+
/// Arguments are identical to [`PartitionClient::fetch_records`].
125129
fn fetch_records(
126130
&self,
127131
offset: i64,
@@ -142,6 +146,11 @@ impl FetchClient for PartitionClient {
142146
}
143147

144148
pin_project! {
149+
/// Stream consuming data from start offset.
150+
///
151+
/// # Error Handling
152+
/// If an error is returned by [`fetch_records`](`FetchClient::fetch_records`) then the stream will emit this error
153+
/// once and will terminate afterwards.
145154
pub struct StreamConsumer {
146155
client: Arc<dyn FetchClient>,
147156

@@ -153,6 +162,8 @@ pin_project! {
153162

154163
next_offset: i64,
155164

165+
terminated: bool,
166+
156167
last_high_watermark: i64,
157168

158169
buffer: VecDeque<RecordAndOffset>,
@@ -167,6 +178,9 @@ impl Stream for StreamConsumer {
167178
fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
168179
let this = self.project();
169180
loop {
181+
if *this.terminated {
182+
return Poll::Ready(None);
183+
}
170184
if let Some(x) = this.buffer.pop_front() {
171185
return Poll::Ready(Some(Ok((x, *this.last_high_watermark))));
172186
}
@@ -203,7 +217,12 @@ impl Stream for StreamConsumer {
203217
}
204218
continue;
205219
}
206-
Err(e) => return Poll::Ready(Some(Err(e))),
220+
Err(e) => {
221+
*this.terminated = true;
222+
223+
// report error once
224+
return Poll::Ready(Some(Err(e)));
225+
}
207226
}
208227
}
209228
}
@@ -217,21 +236,26 @@ impl std::fmt::Debug for StreamConsumer {
217236
.field("max_batch_size", &self.max_batch_size)
218237
.field("max_wait_ms", &self.max_wait_ms)
219238
.field("next_offset", &self.next_offset)
239+
.field("terminated", &self.terminated)
220240
.field("last_high_watermark", &self.last_high_watermark)
221241
.field("buffer", &self.buffer)
222-
.finish()
242+
.finish_non_exhaustive()
223243
}
224244
}
225245

226246
#[cfg(test)]
227247
mod tests {
228248
use std::time::Duration;
229249

250+
use assert_matches::assert_matches;
230251
use futures::{pin_mut, StreamExt};
231252
use time::OffsetDateTime;
232253
use tokio::sync::{mpsc, Mutex};
233254

234-
use crate::record::Record;
255+
use crate::{
256+
client::error::{Error, ProtocolError},
257+
record::Record,
258+
};
235259

236260
use super::*;
237261

@@ -348,6 +372,36 @@ mod tests {
348372
}
349373
}
350374

375+
#[derive(Debug)]
376+
struct MockErrFetch {
377+
inner: Arc<Mutex<Option<Error>>>,
378+
}
379+
380+
impl MockErrFetch {
381+
fn new(e: Error) -> Self {
382+
Self {
383+
inner: Arc::new(Mutex::new(Some(e))),
384+
}
385+
}
386+
}
387+
388+
impl FetchClient for MockErrFetch {
389+
fn fetch_records(
390+
&self,
391+
_offset: i64,
392+
_bytes: Range<i32>,
393+
_max_wait_ms: i32,
394+
) -> BoxFuture<'_, Result<(Vec<RecordAndOffset>, i64)>> {
395+
let inner = Arc::clone(&self.inner);
396+
Box::pin(async move {
397+
match inner.lock().await.take() {
398+
Some(e) => Err(e),
399+
None => panic!("EOF"),
400+
}
401+
})
402+
}
403+
}
404+
351405
#[tokio::test]
352406
async fn test_consumer() {
353407
let record = Record {
@@ -471,4 +525,24 @@ mod tests {
471525
.unwrap()
472526
.unwrap();
473527
}
528+
529+
#[tokio::test]
530+
async fn test_consumer_terminate() {
531+
let e = Error::ServerError(
532+
ProtocolError::OffsetOutOfRange,
533+
String::from("offset out of range"),
534+
);
535+
let consumer = Arc::new(MockErrFetch::new(e));
536+
537+
let mut stream = StreamConsumerBuilder::new_with_client(consumer, 0).build();
538+
539+
let error = stream.next().await.expect("stream not empty").unwrap_err();
540+
assert_matches!(
541+
error,
542+
Error::ServerError(ProtocolError::OffsetOutOfRange, _)
543+
);
544+
545+
// stream ends
546+
assert!(stream.next().await.is_none());
547+
}
474548
}

src/client/partition.rs

Lines changed: 18 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,7 @@ use std::ops::{ControlFlow, Deref, Range};
2323
use std::sync::Arc;
2424
use time::OffsetDateTime;
2525
use tokio::sync::Mutex;
26-
use tracing::{error, info};
26+
use tracing::{error, info, warn};
2727

2828
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
2929
pub enum Compression {
@@ -105,6 +105,11 @@ impl PartitionClient {
105105
/// Fetch `bytes` bytes of record data starting at sequence number `offset`
106106
///
107107
/// Returns the records, and the current high watermark.
108+
///
109+
///
110+
/// # Error Handling
111+
/// Fetching records outside the range known the to broker (marked by low and high watermark) will lead to a
112+
/// [`ServerError`](Error::ServerError) with [`OffsetOutOfRange`](ProtocolError::OffsetOutOfRange).
108113
pub async fn fetch_records(
109114
&self,
110115
offset: i64,
@@ -119,6 +124,18 @@ impl PartitionClient {
119124
})
120125
.await?;
121126

127+
// Redpanda never sends OffsetOutOfRange even when it should. "Luckily" it does not support deletions so we can
128+
// implement a simple heuristic.
129+
if partition.high_watermark.0 < offset {
130+
warn!(
131+
"This message looks like Redpanda wants to report a OffsetOutOfRange but doesn't."
132+
);
133+
return Err(Error::ServerError(
134+
ProtocolError::OffsetOutOfRange,
135+
String::from("Offset out of range"),
136+
));
137+
}
138+
122139
let records = extract_records(partition.records.0)?;
123140

124141
Ok((records, partition.high_watermark.0))

tests/client.rs

Lines changed: 58 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,4 @@
1+
use assert_matches::assert_matches;
12
use rskafka::{
23
client::{
34
error::{Error as ClientError, ProtocolError},
@@ -157,6 +158,63 @@ async fn test_produce_empty() {
157158
.unwrap();
158159
}
159160

161+
#[tokio::test]
162+
async fn test_consume_empty() {
163+
maybe_start_logging();
164+
165+
let connection = maybe_skip_kafka_integration!();
166+
let topic_name = random_topic_name();
167+
let n_partitions = 2;
168+
169+
let client = ClientBuilder::new(vec![connection]).build().await.unwrap();
170+
let controller_client = client.controller_client().await.unwrap();
171+
controller_client
172+
.create_topic(&topic_name, n_partitions, 1, 5_000)
173+
.await
174+
.unwrap();
175+
176+
let partition_client = client.partition_client(&topic_name, 1).await.unwrap();
177+
let (records, watermark) = partition_client
178+
.fetch_records(0, 1..10_000, 1_000)
179+
.await
180+
.unwrap();
181+
assert!(records.is_empty());
182+
assert_eq!(watermark, 0);
183+
}
184+
185+
#[tokio::test]
186+
async fn test_consume_offset_out_of_range() {
187+
maybe_start_logging();
188+
189+
let connection = maybe_skip_kafka_integration!();
190+
let topic_name = random_topic_name();
191+
let n_partitions = 2;
192+
193+
let client = ClientBuilder::new(vec![connection]).build().await.unwrap();
194+
let controller_client = client.controller_client().await.unwrap();
195+
controller_client
196+
.create_topic(&topic_name, n_partitions, 1, 5_000)
197+
.await
198+
.unwrap();
199+
200+
let partition_client = client.partition_client(&topic_name, 1).await.unwrap();
201+
let record = record();
202+
let offsets = partition_client
203+
.produce(vec![record], Compression::NoCompression)
204+
.await
205+
.unwrap();
206+
let offset = offsets[0];
207+
208+
let err = partition_client
209+
.fetch_records(offset + 2, 1..10_000, 1_000)
210+
.await
211+
.unwrap_err();
212+
assert_matches!(
213+
err,
214+
ClientError::ServerError(ProtocolError::OffsetOutOfRange, _)
215+
);
216+
}
217+
160218
#[tokio::test]
161219
async fn test_get_high_watermark() {
162220
maybe_start_logging();

tests/consumer.rs

Lines changed: 34 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,11 +1,13 @@
11
use std::sync::Arc;
22
use std::time::Duration;
33

4+
use assert_matches::assert_matches;
45
use futures::{Stream, StreamExt};
56
use tokio::time::timeout;
67

78
use rskafka::client::{
89
consumer::{StreamConsumer, StreamConsumerBuilder},
10+
error::{Error, ProtocolError},
911
partition::Compression,
1012
ClientBuilder,
1113
};
@@ -35,7 +37,9 @@ async fn test_stream_consumer() {
3537
.await
3638
.unwrap();
3739

38-
let mut stream = StreamConsumerBuilder::new(Arc::clone(&partition_client), 0).build();
40+
let mut stream = StreamConsumerBuilder::new(Arc::clone(&partition_client), 0)
41+
.with_max_wait_ms(50)
42+
.build();
3943

4044
let assert_ok =
4145
|r: Result<Option<<StreamConsumer as Stream>::Item>, tokio::time::error::Elapsed>| {
@@ -48,7 +52,7 @@ async fn test_stream_consumer() {
4852
assert_ok(timeout(Duration::from_millis(100), stream.next()).await);
4953

5054
// No further records
51-
timeout(Duration::from_millis(100), stream.next())
55+
timeout(Duration::from_millis(200), stream.next())
5256
.await
5357
.expect_err("timeout");
5458

@@ -71,3 +75,31 @@ async fn test_stream_consumer() {
7175
.await
7276
.expect_err("timeout");
7377
}
78+
79+
#[tokio::test]
80+
async fn test_stream_consumer_offset_out_of_range() {
81+
maybe_start_logging();
82+
83+
let connection = maybe_skip_kafka_integration!();
84+
let client = ClientBuilder::new(vec![connection]).build().await.unwrap();
85+
let controller_client = client.controller_client().await.unwrap();
86+
87+
let topic = random_topic_name();
88+
controller_client
89+
.create_topic(&topic, 1, 1, 5_000)
90+
.await
91+
.unwrap();
92+
93+
let partition_client = Arc::new(client.partition_client(&topic, 0).await.unwrap());
94+
95+
let mut stream = StreamConsumerBuilder::new(partition_client, 1).build();
96+
97+
let error = stream.next().await.expect("stream not empty").unwrap_err();
98+
assert_matches!(
99+
error,
100+
Error::ServerError(ProtocolError::OffsetOutOfRange, _)
101+
);
102+
103+
// stream ends
104+
assert!(stream.next().await.is_none());
105+
}

0 commit comments

Comments
 (0)