Skip to content

Commit 0cf4127

Browse files
committed
fix: terminate consumer stream on OffsetOutOfRange
If we encounter `OffsetOutOfRange` this means that the start offset of the stream lies outside of the range that Kafka knows. In this case we should not hot-loop and try to poll new data forever, since this is very likely NOT intended. The only use case I could come up with is to start a stream at some "yet to be produced" offset, but I fail to come up with a practical reasoning behind this as well as a method to dermine the start offset for such a use case. So most users are likely interested to subscribe to data that already exists within the system.
1 parent a1b8533 commit 0cf4127

File tree

3 files changed

+112
-3
lines changed

3 files changed

+112
-3
lines changed

src/client/consumer.rs

Lines changed: 77 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -111,6 +111,7 @@ impl StreamConsumerBuilder {
111111
min_batch_size: self.min_batch_size,
112112
max_batch_size: self.max_batch_size,
113113
next_offset: self.start_offset,
114+
terminated: false,
114115
last_high_watermark: -1,
115116
buffer: Default::default(),
116117
fetch_fut: Fuse::terminated(),
@@ -122,6 +123,9 @@ type FetchResult = Result<(Vec<RecordAndOffset>, i64)>;
122123

123124
/// A trait wrapper to allow mocking
124125
trait FetchClient: std::fmt::Debug + Send + Sync {
126+
/// Fetch records.
127+
///
128+
/// Arguments are identical to [`PartitionClient::fetch_records`].
125129
fn fetch_records(
126130
&self,
127131
offset: i64,
@@ -142,6 +146,11 @@ impl FetchClient for PartitionClient {
142146
}
143147

144148
pin_project! {
149+
/// Stream consuming data from start offset.
150+
///
151+
/// # Error Handling
152+
/// If an error is returned by [`fetch_records`](`FetchClient::fetch_records`) then the stream will emit this error
153+
/// once and will terminate afterwards.
145154
pub struct StreamConsumer {
146155
client: Arc<dyn FetchClient>,
147156

@@ -153,6 +162,8 @@ pin_project! {
153162

154163
next_offset: i64,
155164

165+
terminated: bool,
166+
156167
last_high_watermark: i64,
157168

158169
buffer: VecDeque<RecordAndOffset>,
@@ -167,6 +178,9 @@ impl Stream for StreamConsumer {
167178
fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
168179
let this = self.project();
169180
loop {
181+
if *this.terminated {
182+
return Poll::Ready(None);
183+
}
170184
if let Some(x) = this.buffer.pop_front() {
171185
return Poll::Ready(Some(Ok((x, *this.last_high_watermark))));
172186
}
@@ -203,7 +217,12 @@ impl Stream for StreamConsumer {
203217
}
204218
continue;
205219
}
206-
Err(e) => return Poll::Ready(Some(Err(e))),
220+
Err(e) => {
221+
*this.terminated = true;
222+
223+
// report error once
224+
return Poll::Ready(Some(Err(e)));
225+
}
207226
}
208227
}
209228
}
@@ -217,21 +236,26 @@ impl std::fmt::Debug for StreamConsumer {
217236
.field("max_batch_size", &self.max_batch_size)
218237
.field("max_wait_ms", &self.max_wait_ms)
219238
.field("next_offset", &self.next_offset)
239+
.field("terminated", &self.terminated)
220240
.field("last_high_watermark", &self.last_high_watermark)
221241
.field("buffer", &self.buffer)
222-
.finish()
242+
.finish_non_exhaustive()
223243
}
224244
}
225245

226246
#[cfg(test)]
227247
mod tests {
228248
use std::time::Duration;
229249

250+
use assert_matches::assert_matches;
230251
use futures::{pin_mut, StreamExt};
231252
use time::OffsetDateTime;
232253
use tokio::sync::{mpsc, Mutex};
233254

234-
use crate::record::Record;
255+
use crate::{
256+
client::error::{Error, ProtocolError},
257+
record::Record,
258+
};
235259

236260
use super::*;
237261

@@ -348,6 +372,36 @@ mod tests {
348372
}
349373
}
350374

375+
#[derive(Debug)]
376+
struct MockErrFetch {
377+
inner: Arc<Mutex<Option<Error>>>,
378+
}
379+
380+
impl MockErrFetch {
381+
fn new(e: Error) -> Self {
382+
Self {
383+
inner: Arc::new(Mutex::new(Some(e))),
384+
}
385+
}
386+
}
387+
388+
impl FetchClient for MockErrFetch {
389+
fn fetch_records(
390+
&self,
391+
_offset: i64,
392+
_bytes: Range<i32>,
393+
_max_wait_ms: i32,
394+
) -> BoxFuture<'_, Result<(Vec<RecordAndOffset>, i64)>> {
395+
let inner = Arc::clone(&self.inner);
396+
Box::pin(async move {
397+
match inner.lock().await.take() {
398+
Some(e) => Err(e),
399+
None => panic!("EOF"),
400+
}
401+
})
402+
}
403+
}
404+
351405
#[tokio::test]
352406
async fn test_consumer() {
353407
let record = Record {
@@ -471,4 +525,24 @@ mod tests {
471525
.unwrap()
472526
.unwrap();
473527
}
528+
529+
#[tokio::test]
530+
async fn test_consumer_terminate() {
531+
let e = Error::ServerError(
532+
ProtocolError::OffsetOutOfRange,
533+
String::from("offset out of range"),
534+
);
535+
let consumer = Arc::new(MockErrFetch::new(e));
536+
537+
let mut stream = StreamConsumerBuilder::new_with_client(consumer, 0).build();
538+
539+
let error = stream.next().await.expect("stream not empty").unwrap_err();
540+
assert_matches!(
541+
error,
542+
Error::ServerError(ProtocolError::OffsetOutOfRange, _)
543+
);
544+
545+
// stream ends
546+
assert!(stream.next().await.is_none());
547+
}
474548
}

src/client/partition.rs

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -105,6 +105,11 @@ impl PartitionClient {
105105
/// Fetch `bytes` bytes of record data starting at sequence number `offset`
106106
///
107107
/// Returns the records, and the current high watermark.
108+
///
109+
///
110+
/// # Error Handling
111+
/// Fetching records outside the range known the to broker (marked by low and high watermark) will lead to a
112+
/// [`ServerError`](Error::ServerError) with [`OffsetOutOfRange`](ProtocolError::OffsetOutOfRange).
108113
pub async fn fetch_records(
109114
&self,
110115
offset: i64,

tests/consumer.rs

Lines changed: 30 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,11 +1,13 @@
11
use std::sync::Arc;
22
use std::time::Duration;
33

4+
use assert_matches::assert_matches;
45
use futures::{Stream, StreamExt};
56
use tokio::time::timeout;
67

78
use rskafka::client::{
89
consumer::{StreamConsumer, StreamConsumerBuilder},
10+
error::{Error, ProtocolError},
911
partition::Compression,
1012
ClientBuilder,
1113
};
@@ -71,3 +73,31 @@ async fn test_stream_consumer() {
7173
.await
7274
.expect_err("timeout");
7375
}
76+
77+
#[tokio::test]
78+
async fn test_stream_consumer_offset_out_of_range() {
79+
maybe_start_logging();
80+
81+
let connection = maybe_skip_kafka_integration!();
82+
let client = ClientBuilder::new(vec![connection]).build().await.unwrap();
83+
let controller_client = client.controller_client().await.unwrap();
84+
85+
let topic = random_topic_name();
86+
controller_client
87+
.create_topic(&topic, 1, 1, 5_000)
88+
.await
89+
.unwrap();
90+
91+
let partition_client = Arc::new(client.partition_client(&topic, 0).await.unwrap());
92+
93+
let mut stream = StreamConsumerBuilder::new(partition_client, 1).build();
94+
95+
let error = stream.next().await.expect("stream not empty").unwrap_err();
96+
assert_matches!(
97+
error,
98+
Error::ServerError(ProtocolError::OffsetOutOfRange, _)
99+
);
100+
101+
// stream ends
102+
assert!(stream.next().await.is_none());
103+
}

0 commit comments

Comments
 (0)