Skip to content

Commit 321c040

Browse files
committed
Avoid topic pollution by prefixing with test name.
1 parent e347f9a commit 321c040

File tree

8 files changed

+49
-43
lines changed

8 files changed

+49
-43
lines changed

tests/test_admin.rs

Lines changed: 7 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -32,7 +32,7 @@ fn create_admin_client() -> AdminClient<DefaultClientContext> {
3232

3333
async fn create_consumer_group(consumer_group_name: &str) {
3434
let admin_client = create_admin_client();
35-
let topic_name = &rand_test_topic();
35+
let topic_name = &rand_test_topic(consumer_group_name);
3636
let consumer: BaseConsumer = create_config()
3737
.set("group.id", consumer_group_name.clone())
3838
.create()
@@ -124,8 +124,8 @@ async fn test_topics() {
124124
// Verify that topics are created as specified, and that they can later
125125
// be deleted.
126126
{
127-
let name1 = rand_test_topic();
128-
let name2 = rand_test_topic();
127+
let name1 = rand_test_topic("test_topics");
128+
let name2 = rand_test_topic("test_topics");
129129

130130
// Test both the builder API and the literal construction.
131131
let topic1 =
@@ -254,7 +254,7 @@ async fn test_topics() {
254254
// Verify that incorrect replication configurations are ignored when
255255
// creating partitions.
256256
{
257-
let name = rand_test_topic();
257+
let name = rand_test_topic("test_topics");
258258
let topic = NewTopic::new(&name, 1, TopicReplication::Fixed(1));
259259

260260
let res = admin_client
@@ -291,7 +291,7 @@ async fn test_topics() {
291291

292292
// Verify that deleting a non-existent topic fails.
293293
{
294-
let name = rand_test_topic();
294+
let name = rand_test_topic("test_topics");
295295
let res = admin_client
296296
.delete_topics(&[&name], &opts)
297297
.await
@@ -305,8 +305,8 @@ async fn test_topics() {
305305
// Verify that mixed-success operations properly report the successful and
306306
// failing operators.
307307
{
308-
let name1 = rand_test_topic();
309-
let name2 = rand_test_topic();
308+
let name1 = rand_test_topic("test_topics");
309+
let name2 = rand_test_topic("test_topics");
310310

311311
let topic1 = NewTopic::new(&name1, 1, TopicReplication::Fixed(1));
312312
let topic2 = NewTopic::new(&name2, 1, TopicReplication::Fixed(1));

tests/test_high_consumers.rs

Lines changed: 10 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -70,7 +70,7 @@ async fn test_produce_consume_base() {
7070
let _r = env_logger::try_init();
7171

7272
let start_time = current_time_millis();
73-
let topic_name = rand_test_topic();
73+
let topic_name = rand_test_topic("test_produce_consume_base");
7474
let message_map = populate_topic(&topic_name, 100, &value_fn, &key_fn, None, None).await;
7575
let consumer = create_stream_consumer(&rand_test_group(), None);
7676
consumer.subscribe(&[topic_name.as_str()]).unwrap();
@@ -105,7 +105,7 @@ async fn test_produce_consume_base() {
105105
async fn test_produce_consume_base_concurrent() {
106106
let _r = env_logger::try_init();
107107

108-
let topic_name = rand_test_topic();
108+
let topic_name = rand_test_topic("test_produce_consume_base_concurrent");
109109
populate_topic(&topic_name, 100, &value_fn, &key_fn, None, None).await;
110110

111111
let consumer = Arc::new(create_stream_consumer(&rand_test_group(), None));
@@ -135,7 +135,7 @@ async fn test_produce_consume_base_concurrent() {
135135
async fn test_produce_consume_base_assign() {
136136
let _r = env_logger::try_init();
137137

138-
let topic_name = rand_test_topic();
138+
let topic_name = rand_test_topic("test_produce_consume_base_assign");
139139
populate_topic(&topic_name, 10, &value_fn, &key_fn, Some(0), None).await;
140140
populate_topic(&topic_name, 10, &value_fn, &key_fn, Some(1), None).await;
141141
populate_topic(&topic_name, 10, &value_fn, &key_fn, Some(2), None).await;
@@ -170,7 +170,7 @@ async fn test_produce_consume_base_assign() {
170170
async fn test_produce_consume_base_unassign() {
171171
let _r = env_logger::try_init();
172172

173-
let topic_name = rand_test_topic();
173+
let topic_name = rand_test_topic("test_produce_consume_base_unassign");
174174
populate_topic(&topic_name, 10, &value_fn, &key_fn, Some(0), None).await;
175175
populate_topic(&topic_name, 10, &value_fn, &key_fn, Some(1), None).await;
176176
populate_topic(&topic_name, 10, &value_fn, &key_fn, Some(2), None).await;
@@ -195,7 +195,7 @@ async fn test_produce_consume_base_unassign() {
195195
async fn test_produce_consume_base_incremental_assign_and_unassign() {
196196
let _r = env_logger::try_init();
197197

198-
let topic_name = rand_test_topic();
198+
let topic_name = rand_test_topic("test_produce_consume_base_incremental_assign_and_unassign");
199199
populate_topic(&topic_name, 10, &value_fn, &key_fn, Some(0), None).await;
200200
populate_topic(&topic_name, 10, &value_fn, &key_fn, Some(1), None).await;
201201
populate_topic(&topic_name, 10, &value_fn, &key_fn, Some(2), None).await;
@@ -236,7 +236,7 @@ async fn test_produce_consume_base_incremental_assign_and_unassign() {
236236
async fn test_produce_consume_with_timestamp() {
237237
let _r = env_logger::try_init();
238238

239-
let topic_name = rand_test_topic();
239+
let topic_name = rand_test_topic("test_produce_consume_with_timestamp");
240240
let message_map =
241241
populate_topic(&topic_name, 100, &value_fn, &key_fn, Some(0), Some(1111)).await;
242242
let consumer = create_stream_consumer(&rand_test_group(), None);
@@ -277,7 +277,7 @@ async fn test_produce_consume_with_timestamp() {
277277
async fn test_consumer_commit_message() {
278278
let _r = env_logger::try_init();
279279

280-
let topic_name = rand_test_topic();
280+
let topic_name = rand_test_topic("test_consumer_commit_message");
281281
populate_topic(&topic_name, 10, &value_fn, &key_fn, Some(0), None).await;
282282
populate_topic(&topic_name, 11, &value_fn, &key_fn, Some(1), None).await;
283283
populate_topic(&topic_name, 12, &value_fn, &key_fn, Some(2), None).await;
@@ -355,7 +355,7 @@ async fn test_consumer_commit_message() {
355355
async fn test_consumer_store_offset_commit() {
356356
let _r = env_logger::try_init();
357357

358-
let topic_name = rand_test_topic();
358+
let topic_name = rand_test_topic("test_consumer_store_offset_commit");
359359
populate_topic(&topic_name, 10, &value_fn, &key_fn, Some(0), None).await;
360360
populate_topic(&topic_name, 11, &value_fn, &key_fn, Some(1), None).await;
361361
populate_topic(&topic_name, 12, &value_fn, &key_fn, Some(2), None).await;
@@ -440,7 +440,7 @@ async fn test_consumer_store_offset_commit() {
440440
async fn test_consumer_commit_metadata() -> Result<(), Box<dyn Error>> {
441441
let _ = env_logger::try_init();
442442

443-
let topic_name = rand_test_topic();
443+
let topic_name = rand_test_topic("test_consumer_commit_metadata");
444444
let group_name = rand_test_group();
445445
populate_topic(&topic_name, 10, &value_fn, &key_fn, None, None).await;
446446

@@ -495,7 +495,7 @@ async fn test_consumer_commit_metadata() -> Result<(), Box<dyn Error>> {
495495
async fn test_consume_partition_order() {
496496
let _r = env_logger::try_init();
497497

498-
let topic_name = rand_test_topic();
498+
let topic_name = rand_test_topic("test_consume_partition_order");
499499
populate_topic(&topic_name, 4, &value_fn, &key_fn, Some(0), None).await;
500500
populate_topic(&topic_name, 4, &value_fn, &key_fn, Some(1), None).await;
501501
populate_topic(&topic_name, 4, &value_fn, &key_fn, Some(2), None).await;

tests/test_high_producers.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -30,7 +30,7 @@ fn future_producer(config_overrides: HashMap<&str, &str>) -> FutureProducer<Defa
3030
#[tokio::test]
3131
async fn test_future_producer_send() {
3232
let producer = future_producer(HashMap::new());
33-
let topic_name = rand_test_topic();
33+
let topic_name = rand_test_topic("test_future_producer_send");
3434

3535
let results: FuturesUnordered<_> = (0..10)
3636
.map(|_| {
@@ -60,7 +60,7 @@ async fn test_future_producer_send_full() {
6060
config.insert("message.timeout.ms", "5000");
6161
config.insert("queue.buffering.max.messages", "1");
6262
let producer = &future_producer(config);
63-
let topic_name = &rand_test_topic();
63+
let topic_name = &rand_test_topic("test_future_producer_send_full");
6464

6565
// Fill up the queue.
6666
producer

tests/test_low_consumers.rs

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -31,7 +31,7 @@ fn create_base_consumer(
3131
async fn test_produce_consume_seek() {
3232
let _r = env_logger::try_init();
3333

34-
let topic_name = rand_test_topic();
34+
let topic_name = rand_test_topic("test_produce_consume_seek");
3535
populate_topic(&topic_name, 5, &value_fn, &key_fn, Some(0), None).await;
3636
let consumer = create_base_consumer(&rand_test_group(), None);
3737
consumer.subscribe(&[topic_name.as_str()]).unwrap();
@@ -96,7 +96,7 @@ async fn test_produce_consume_seek() {
9696
async fn test_produce_consume_seek_partitions() {
9797
let _r = env_logger::try_init();
9898

99-
let topic_name = rand_test_topic();
99+
let topic_name = rand_test_topic("test_produce_consume_seek_partitions");
100100
populate_topic(&topic_name, 30, &value_fn, &key_fn, None, None).await;
101101

102102
let consumer = create_base_consumer(&rand_test_group(), None);
@@ -158,7 +158,7 @@ async fn test_produce_consume_iter() {
158158
let _r = env_logger::try_init();
159159

160160
let start_time = current_time_millis();
161-
let topic_name = rand_test_topic();
161+
let topic_name = rand_test_topic("test_produce_consume_iter");
162162
let message_map = populate_topic(&topic_name, 100, &value_fn, &key_fn, None, None).await;
163163
let consumer = create_base_consumer(&rand_test_group(), None);
164164
consumer.subscribe(&[topic_name.as_str()]).unwrap();
@@ -196,7 +196,7 @@ async fn test_pause_resume_consumer_iter() {
196196

197197
let _r = env_logger::try_init();
198198

199-
let topic_name = rand_test_topic();
199+
let topic_name = rand_test_topic("test_pause_resume_consumer_iter");
200200
populate_topic(
201201
&topic_name,
202202
MESSAGE_COUNT,
@@ -237,7 +237,7 @@ async fn test_pause_resume_consumer_iter() {
237237
async fn test_consume_partition_order() {
238238
let _r = env_logger::try_init();
239239

240-
let topic_name = rand_test_topic();
240+
let topic_name = rand_test_topic("test_consume_partition_order");
241241
populate_topic(&topic_name, 4, &value_fn, &key_fn, Some(0), None).await;
242242
populate_topic(&topic_name, 4, &value_fn, &key_fn, Some(1), None).await;
243243
populate_topic(&topic_name, 4, &value_fn, &key_fn, Some(2), None).await;
@@ -357,7 +357,7 @@ async fn test_consume_partition_order() {
357357
async fn test_produce_consume_message_queue_nonempty_callback() {
358358
let _r = env_logger::try_init();
359359

360-
let topic_name = rand_test_topic();
360+
let topic_name = rand_test_topic("test_produce_consume_message_queue_nonempty_callback");
361361

362362
create_topic(&topic_name, 1).await;
363363

tests/test_low_producers.rs

Lines changed: 14 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -191,7 +191,7 @@ where
191191
#[test]
192192
fn test_base_producer_queue_full() {
193193
let producer = base_producer(hashmap! { "queue.buffering.max.messages" => "10" });
194-
let topic_name = rand_test_topic();
194+
let topic_name = rand_test_topic("test_base_producer_queue_full");
195195

196196
let results = (0..30)
197197
.map(|id| {
@@ -235,7 +235,7 @@ fn test_base_producer_timeout() {
235235
"bootstrap.servers" => "1.2.3.4"
236236
},
237237
);
238-
let topic_name = rand_test_topic();
238+
let topic_name = rand_test_topic("test_base_producer_timeout");
239239

240240
let results_count = (0..10)
241241
.map(|id| {
@@ -346,7 +346,7 @@ fn test_base_producer_headers() {
346346
ids: ids_set.clone(),
347347
};
348348
let producer = base_producer_with_context(context, HashMap::new());
349-
let topic_name = rand_test_topic();
349+
let topic_name = rand_test_topic("test_base_producer_headers");
350350

351351
let results_count = (0..10)
352352
.map(|id| {
@@ -387,7 +387,7 @@ fn test_base_producer_headers() {
387387
fn test_threaded_producer_send() {
388388
let context = CollectingContext::new();
389389
let producer = threaded_producer_with_context(context.clone(), HashMap::new());
390-
let topic_name = rand_test_topic();
390+
let topic_name = rand_test_topic("test_threaded_producer_send");
391391

392392
let results_count = (0..10)
393393
.map(|id| {
@@ -431,7 +431,7 @@ fn test_base_producer_opaque_arc() -> Result<(), Box<dyn Error>> {
431431
let shared_count = Arc::new(Mutex::new(0));
432432
let context = OpaqueArcContext {};
433433
let producer = base_producer_with_context(context, HashMap::new());
434-
let topic_name = rand_test_topic();
434+
let topic_name = rand_test_topic("test_base_producer_opaque_arc");
435435

436436
let results_count = (0..10)
437437
.map(|_| {
@@ -482,7 +482,13 @@ fn test_register_custom_partitioner_linger_non_zero_key_null() {
482482
let producer = base_producer_with_context(context.clone(), config_overrides);
483483

484484
producer
485-
.send(BaseRecord::<(), str, usize>::with_opaque_to(&rand_test_topic(), 0).payload(""))
485+
.send(
486+
BaseRecord::<(), str, usize>::with_opaque_to(
487+
&rand_test_topic("test_register_custom_partitioner_linger_non_zero_key_null"),
488+
0,
489+
)
490+
.payload(""),
491+
)
486492
.unwrap();
487493
producer.flush(Duration::from_secs(10)).unwrap();
488494

@@ -499,7 +505,7 @@ fn test_register_custom_partitioner_linger_non_zero_key_null() {
499505
fn test_custom_partitioner_base_producer() {
500506
let context = CollectingContext::new_with_custom_partitioner(FixedPartitioner::new(2));
501507
let producer = base_producer_with_context(context.clone(), HashMap::new());
502-
let topic_name = rand_test_topic();
508+
let topic_name = rand_test_topic("test_custom_partitioner_base_producer");
503509

504510
let results_count = (0..10)
505511
.map(|id| {
@@ -527,7 +533,7 @@ fn test_custom_partitioner_base_producer() {
527533
fn test_custom_partitioner_threaded_producer() {
528534
let context = CollectingContext::new_with_custom_partitioner(FixedPartitioner::new(2));
529535
let producer = threaded_producer_with_context(context.clone(), HashMap::new());
530-
let topic_name = rand_test_topic();
536+
let topic_name = rand_test_topic("test_custom_partitioner_threaded_producer");
531537

532538
let results_count = (0..10)
533539
.map(|id| {

tests/test_metadata.rs

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -31,7 +31,7 @@ fn create_consumer(group_id: &str) -> StreamConsumer {
3131
async fn test_metadata() {
3232
let _r = env_logger::try_init();
3333

34-
let topic_name = rand_test_topic();
34+
let topic_name = rand_test_topic("test_metadata");
3535
populate_topic(&topic_name, 1, &value_fn, &key_fn, Some(0), None).await;
3636
populate_topic(&topic_name, 1, &value_fn, &key_fn, Some(1), None).await;
3737
populate_topic(&topic_name, 1, &value_fn, &key_fn, Some(2), None).await;
@@ -92,7 +92,7 @@ async fn test_metadata() {
9292
async fn test_subscription() {
9393
let _r = env_logger::try_init();
9494

95-
let topic_name = rand_test_topic();
95+
let topic_name = rand_test_topic("test_subscription");
9696
populate_topic(&topic_name, 10, &value_fn, &key_fn, None, None).await;
9797
let consumer = create_consumer(&rand_test_group());
9898
consumer.subscribe(&[topic_name.as_str()]).unwrap();
@@ -109,7 +109,7 @@ async fn test_subscription() {
109109
async fn test_group_membership() {
110110
let _r = env_logger::try_init();
111111

112-
let topic_name = rand_test_topic();
112+
let topic_name = rand_test_topic("test_group_membership");
113113
let group_name = rand_test_group();
114114
populate_topic(&topic_name, 1, &value_fn, &key_fn, Some(0), None).await;
115115
populate_topic(&topic_name, 1, &value_fn, &key_fn, Some(1), None).await;

tests/test_transactions.rs

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -64,8 +64,8 @@ fn count_records(topic: &str, iso: IsolationLevel) -> Result<usize, KafkaError>
6464

6565
#[tokio::test]
6666
async fn test_transaction_abort() -> Result<(), Box<dyn Error>> {
67-
let consume_topic = rand_test_topic();
68-
let produce_topic = rand_test_topic();
67+
let consume_topic = rand_test_topic("test_transaction_abort");
68+
let produce_topic = rand_test_topic("test_transaction_abort");
6969

7070
populate_topic(&consume_topic, 30, &value_fn, &key_fn, Some(0), None).await;
7171

@@ -132,8 +132,8 @@ async fn test_transaction_abort() -> Result<(), Box<dyn Error>> {
132132

133133
#[tokio::test]
134134
async fn test_transaction_commit() -> Result<(), Box<dyn Error>> {
135-
let consume_topic = rand_test_topic();
136-
let produce_topic = rand_test_topic();
135+
let consume_topic = rand_test_topic("test_transaction_commit");
136+
let produce_topic = rand_test_topic("test_transaction_commit");
137137

138138
populate_topic(&consume_topic, 30, &value_fn, &key_fn, Some(0), None).await;
139139

tests/utils.rs

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -17,12 +17,12 @@ use rdkafka::producer::{FutureProducer, FutureRecord};
1717
use rdkafka::statistics::Statistics;
1818
use rdkafka::TopicPartitionList;
1919

20-
pub fn rand_test_topic() -> String {
20+
pub fn rand_test_topic(test_name: &str) -> String {
2121
let id = rand::thread_rng()
2222
.gen_ascii_chars()
2323
.take(10)
2424
.collect::<String>();
25-
format!("__test_{}", id)
25+
format!("__{}_{}", test_name, id)
2626
}
2727

2828
pub fn rand_test_group() -> String {
@@ -170,7 +170,7 @@ mod tests {
170170

171171
#[tokio::test]
172172
async fn test_populate_topic() {
173-
let topic_name = rand_test_topic();
173+
let topic_name = rand_test_topic("test_populate_topic");
174174
let message_map = populate_topic(&topic_name, 100, &value_fn, &key_fn, Some(0), None).await;
175175

176176
let total_messages = message_map

0 commit comments

Comments
 (0)