Skip to content

Commit 51ac55a

Browse files
committed
test: extend consumer stream tests
1 parent fe6006a commit 51ac55a

File tree

3 files changed

+109
-17
lines changed

3 files changed

+109
-17
lines changed

tests/client.rs

Lines changed: 3 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
use assert_matches::assert_matches;
22
use rskafka::{
33
client::{
4-
error::{Error as ClientError, ProtocolError, RequestError},
4+
error::{Error as ClientError, ProtocolError},
55
partition::{Compression, OffsetAt},
66
ClientBuilder,
77
},
@@ -10,9 +10,7 @@ use rskafka::{
1010
use std::{collections::BTreeMap, str::FromStr, sync::Arc, time::Duration};
1111

1212
mod test_helpers;
13-
use test_helpers::{now, random_topic_name, record};
14-
15-
use crate::test_helpers::maybe_start_logging;
13+
use test_helpers::{maybe_start_logging, now, random_topic_name, record};
1614

1715
#[tokio::test]
1816
async fn test_plain() {
@@ -449,18 +447,7 @@ async fn test_delete_records() {
449447
let offset_4 = offsets[0];
450448

451449
// delete from the middle of the 2nd batch
452-
match partition_client.delete_records(offset_3, 1_000).await {
453-
Ok(()) => {}
454-
// Redpanda does not support deletions (https://github.com/redpanda-data/redpanda/issues/1016), but we also
455-
// don't wanna skip it unconditionally
456-
Err(ClientError::Request(RequestError::NoVersionMatch { .. }))
457-
if std::env::var("TEST_DELETE_RECORDS").is_err() =>
458-
{
459-
println!("Skip test_delete_records");
460-
return;
461-
}
462-
Err(e) => panic!("Cannot delete: {e}"),
463-
}
450+
maybe_skip_delete!(partition_client, offset_3);
464451

465452
// fetching data before the record fails
466453
let err = partition_client

tests/consumer.rs

Lines changed: 82 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -68,6 +68,45 @@ async fn test_stream_consumer_start_at_0() {
6868
assert_stream_pending(&mut stream).await;
6969
}
7070

71+
#[tokio::test]
72+
async fn test_stream_consumer_start_at_1() {
73+
maybe_start_logging();
74+
75+
let connection = maybe_skip_kafka_integration!();
76+
let client = ClientBuilder::new(vec![connection]).build().await.unwrap();
77+
let controller_client = client.controller_client().await.unwrap();
78+
79+
let topic = random_topic_name();
80+
controller_client
81+
.create_topic(&topic, 1, 1, 5_000)
82+
.await
83+
.unwrap();
84+
85+
let record_1 = record(b"x");
86+
let record_2 = record(b"y");
87+
88+
let partition_client = Arc::new(client.partition_client(&topic, 0).await.unwrap());
89+
partition_client
90+
.produce(
91+
vec![record_1.clone(), record_2.clone()],
92+
Compression::NoCompression,
93+
)
94+
.await
95+
.unwrap();
96+
97+
let mut stream = StreamConsumerBuilder::new(Arc::clone(&partition_client), StartOffset::At(1))
98+
.with_max_wait_ms(50)
99+
.build();
100+
101+
// Skips first record
102+
let (record_and_offset, _watermark) =
103+
assert_ok(timeout(Duration::from_millis(100), stream.next()).await);
104+
assert_eq!(record_and_offset.record, record_2);
105+
106+
// No further records
107+
assert_stream_pending(&mut stream).await;
108+
}
109+
71110
#[tokio::test]
72111
async fn test_stream_consumer_offset_out_of_range() {
73112
maybe_start_logging();
@@ -179,13 +218,55 @@ async fn test_stream_consumer_start_at_earliest_empty() {
179218

180219
// Get second record
181220
let (record_and_offset, _) =
182-
assert_ok(timeout(Duration::from_millis(100), stream.next()).await);
221+
assert_ok(timeout(Duration::from_millis(200), stream.next()).await);
183222
assert_eq!(record_and_offset.record, record);
184223

185224
// No further records
186225
assert_stream_pending(&mut stream).await;
187226
}
188227

228+
#[tokio::test]
229+
async fn test_stream_consumer_start_at_earliest_after_deletion() {
230+
maybe_start_logging();
231+
232+
let connection = maybe_skip_kafka_integration!();
233+
let client = ClientBuilder::new(vec![connection]).build().await.unwrap();
234+
let controller_client = client.controller_client().await.unwrap();
235+
236+
let topic = random_topic_name();
237+
controller_client
238+
.create_topic(&topic, 1, 1, 5_000)
239+
.await
240+
.unwrap();
241+
242+
let record_1 = record(b"x");
243+
let record_2 = record(b"y");
244+
245+
let partition_client = Arc::new(client.partition_client(&topic, 0).await.unwrap());
246+
partition_client
247+
.produce(
248+
vec![record_1.clone(), record_2.clone()],
249+
Compression::NoCompression,
250+
)
251+
.await
252+
.unwrap();
253+
254+
maybe_skip_delete!(partition_client, 1);
255+
256+
let mut stream =
257+
StreamConsumerBuilder::new(Arc::clone(&partition_client), StartOffset::Earliest)
258+
.with_max_wait_ms(50)
259+
.build();
260+
261+
// First record skipped / deleted
262+
let (record_and_offset, _) =
263+
assert_ok(timeout(Duration::from_millis(100), stream.next()).await);
264+
assert_eq!(record_and_offset.record, record_2);
265+
266+
// No further records
267+
assert_stream_pending(&mut stream).await;
268+
}
269+
189270
#[tokio::test]
190271
async fn test_stream_consumer_start_at_latest() {
191272
maybe_start_logging();

tests/test_helpers.rs

Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@ use parking_lot::Once;
22
use rskafka::record::Record;
33
use std::collections::BTreeMap;
44
use time::OffsetDateTime;
5+
56
/// Get the testing Kafka connection string or return current scope.
67
///
78
/// If `TEST_INTEGRATION` and `KAFKA_CONNECT` are set, return the Kafka connection URL to the
@@ -44,6 +45,29 @@ macro_rules! maybe_skip_kafka_integration {
4445
}};
4546
}
4647

48+
/// Performs delete operation using.
49+
///
50+
/// This is skipped (via `return`) if the broker returns `NoVersionMatch`, except when the `TEST_DELETE_RECORDS`
51+
/// environment variable is set.
52+
///
53+
/// This is helpful because Redpanda does not support deletes yet, see
54+
/// <https://github.com/redpanda-data/redpanda/issues/1016> but we also don not want to skip these test unconditionally.
55+
#[macro_export]
56+
macro_rules! maybe_skip_delete {
57+
($partition_client:ident, $offset:expr) => {
58+
match $partition_client.delete_records($offset, 1_000).await {
59+
Ok(()) => {}
60+
Err(rskafka::client::error::Error::Request(
61+
rskafka::client::error::RequestError::NoVersionMatch { .. },
62+
)) if std::env::var("TEST_DELETE_RECORDS").is_err() => {
63+
println!("Skip test_delete_records");
64+
return;
65+
}
66+
Err(e) => panic!("Cannot delete: {e}"),
67+
}
68+
};
69+
}
70+
4771
/// Generated random topic name for testing.
4872
pub fn random_topic_name() -> String {
4973
format!("test_topic_{}", uuid::Uuid::new_v4())

0 commit comments

Comments
 (0)