Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
80 changes: 77 additions & 3 deletions src/client/consumer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -111,6 +111,7 @@ impl StreamConsumerBuilder {
min_batch_size: self.min_batch_size,
max_batch_size: self.max_batch_size,
next_offset: self.start_offset,
terminated: false,
last_high_watermark: -1,
buffer: Default::default(),
fetch_fut: Fuse::terminated(),
Expand All @@ -122,6 +123,9 @@ type FetchResult = Result<(Vec<RecordAndOffset>, i64)>;

/// A trait wrapper to allow mocking
trait FetchClient: std::fmt::Debug + Send + Sync {
/// Fetch records.
///
/// Arguments are identical to [`PartitionClient::fetch_records`].
fn fetch_records(
&self,
offset: i64,
Expand All @@ -142,6 +146,11 @@ impl FetchClient for PartitionClient {
}

pin_project! {
/// Stream consuming data from start offset.
///
/// # Error Handling
/// If an error is returned by [`fetch_records`](`FetchClient::fetch_records`) then the stream will emit this error
/// once and will terminate afterwards.
pub struct StreamConsumer {
client: Arc<dyn FetchClient>,

Expand All @@ -153,6 +162,8 @@ pin_project! {

next_offset: i64,

terminated: bool,

last_high_watermark: i64,

buffer: VecDeque<RecordAndOffset>,
Expand All @@ -167,6 +178,9 @@ impl Stream for StreamConsumer {
fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
let this = self.project();
loop {
if *this.terminated {
return Poll::Ready(None);
}
if let Some(x) = this.buffer.pop_front() {
return Poll::Ready(Some(Ok((x, *this.last_high_watermark))));
}
Expand Down Expand Up @@ -203,7 +217,12 @@ impl Stream for StreamConsumer {
}
continue;
}
Err(e) => return Poll::Ready(Some(Err(e))),
Err(e) => {
*this.terminated = true;

// report error once
return Poll::Ready(Some(Err(e)));
}
}
}
}
Expand All @@ -217,21 +236,26 @@ impl std::fmt::Debug for StreamConsumer {
.field("max_batch_size", &self.max_batch_size)
.field("max_wait_ms", &self.max_wait_ms)
.field("next_offset", &self.next_offset)
.field("terminated", &self.terminated)
.field("last_high_watermark", &self.last_high_watermark)
.field("buffer", &self.buffer)
.finish()
.finish_non_exhaustive()
}
}

#[cfg(test)]
mod tests {
use std::time::Duration;

use assert_matches::assert_matches;
use futures::{pin_mut, StreamExt};
use time::OffsetDateTime;
use tokio::sync::{mpsc, Mutex};

use crate::record::Record;
use crate::{
client::error::{Error, ProtocolError},
record::Record,
};

use super::*;

Expand Down Expand Up @@ -348,6 +372,36 @@ mod tests {
}
}

#[derive(Debug)]
struct MockErrFetch {
inner: Arc<Mutex<Option<Error>>>,
}

impl MockErrFetch {
fn new(e: Error) -> Self {
Self {
inner: Arc::new(Mutex::new(Some(e))),
}
}
}

impl FetchClient for MockErrFetch {
fn fetch_records(
&self,
_offset: i64,
_bytes: Range<i32>,
_max_wait_ms: i32,
) -> BoxFuture<'_, Result<(Vec<RecordAndOffset>, i64)>> {
let inner = Arc::clone(&self.inner);
Box::pin(async move {
match inner.lock().await.take() {
Some(e) => Err(e),
None => panic!("EOF"),
}
})
}
}

#[tokio::test]
async fn test_consumer() {
let record = Record {
Expand Down Expand Up @@ -471,4 +525,24 @@ mod tests {
.unwrap()
.unwrap();
}

#[tokio::test]
async fn test_consumer_terminate() {
let e = Error::ServerError(
ProtocolError::OffsetOutOfRange,
String::from("offset out of range"),
);
let consumer = Arc::new(MockErrFetch::new(e));

let mut stream = StreamConsumerBuilder::new_with_client(consumer, 0).build();

let error = stream.next().await.expect("stream not empty").unwrap_err();
assert_matches!(
error,
Error::ServerError(ProtocolError::OffsetOutOfRange, _)
);

// stream ends
assert!(stream.next().await.is_none());
}
}
5 changes: 5 additions & 0 deletions src/client/partition.rs
Original file line number Diff line number Diff line change
Expand Up @@ -105,6 +105,11 @@ impl PartitionClient {
/// Fetch `bytes` bytes of record data starting at sequence number `offset`
///
/// Returns the records, and the current high watermark.
///
///
/// # Error Handling
/// Fetching records outside the range known the to broker (marked by low and high watermark) will lead to a
/// [`ServerError`](Error::ServerError) with [`OffsetOutOfRange`](ProtocolError::OffsetOutOfRange).
pub async fn fetch_records(
&self,
offset: i64,
Expand Down
30 changes: 30 additions & 0 deletions tests/consumer.rs
Original file line number Diff line number Diff line change
@@ -1,11 +1,13 @@
use std::sync::Arc;
use std::time::Duration;

use assert_matches::assert_matches;
use futures::{Stream, StreamExt};
use tokio::time::timeout;

use rskafka::client::{
consumer::{StreamConsumer, StreamConsumerBuilder},
error::{Error, ProtocolError},
partition::Compression,
ClientBuilder,
};
Expand Down Expand Up @@ -71,3 +73,31 @@ async fn test_stream_consumer() {
.await
.expect_err("timeout");
}

#[tokio::test]
async fn test_stream_consumer_offset_out_of_range() {
maybe_start_logging();

let connection = maybe_skip_kafka_integration!();
let client = ClientBuilder::new(vec![connection]).build().await.unwrap();
let controller_client = client.controller_client().await.unwrap();

let topic = random_topic_name();
controller_client
.create_topic(&topic, 1, 1, 5_000)
.await
.unwrap();

let partition_client = Arc::new(client.partition_client(&topic, 0).await.unwrap());

let mut stream = StreamConsumerBuilder::new(partition_client, 1).build();

let error = stream.next().await.expect("stream not empty").unwrap_err();
assert_matches!(
error,
Error::ServerError(ProtocolError::OffsetOutOfRange, _)
);

// stream ends
assert!(stream.next().await.is_none());
}