@@ -10,22 +10,28 @@ use rskafka::{
10
10
use std:: { collections:: BTreeMap , str:: FromStr , sync:: Arc , time:: Duration } ;
11
11
12
12
mod test_helpers;
13
- use test_helpers:: { maybe_start_logging, now, random_topic_name, record} ;
13
+ use test_helpers:: { maybe_start_logging, now, random_topic_name, record, TEST_TIMEOUT } ;
14
14
15
15
#[ tokio:: test]
16
16
async fn test_plain ( ) {
17
17
maybe_start_logging ( ) ;
18
18
19
- let connection = maybe_skip_kafka_integration ! ( ) ;
20
- ClientBuilder :: new ( connection) . build ( ) . await . unwrap ( ) ;
19
+ let test_cfg = maybe_skip_kafka_integration ! ( ) ;
20
+ ClientBuilder :: new ( test_cfg. bootstrap_brokers )
21
+ . build ( )
22
+ . await
23
+ . unwrap ( ) ;
21
24
}
22
25
23
26
#[ tokio:: test]
24
27
async fn test_topic_crud ( ) {
25
28
maybe_start_logging ( ) ;
26
29
27
- let connection = maybe_skip_kafka_integration ! ( ) ;
28
- let client = ClientBuilder :: new ( connection) . build ( ) . await . unwrap ( ) ;
30
+ let test_cfg = maybe_skip_kafka_integration ! ( ) ;
31
+ let client = ClientBuilder :: new ( test_cfg. bootstrap_brokers )
32
+ . build ( )
33
+ . await
34
+ . unwrap ( ) ;
29
35
let controller_client = client. controller_client ( ) . unwrap ( ) ;
30
36
let topics = client. list_topics ( ) . await . unwrap ( ) ;
31
37
@@ -46,7 +52,7 @@ async fn test_topic_crud() {
46
52
. unwrap ( ) ;
47
53
48
54
// might take a while to converge
49
- tokio:: time:: timeout ( Duration :: from_millis ( 1_000 ) , async {
55
+ tokio:: time:: timeout ( TEST_TIMEOUT , async {
50
56
loop {
51
57
let topics = client. list_topics ( ) . await . unwrap ( ) ;
52
58
let topic = topics. iter ( ) . find ( |t| t. name == new_topic) ;
@@ -77,10 +83,13 @@ async fn test_topic_crud() {
77
83
async fn test_partition_client ( ) {
78
84
maybe_start_logging ( ) ;
79
85
80
- let connection = maybe_skip_kafka_integration ! ( ) ;
86
+ let test_cfg = maybe_skip_kafka_integration ! ( ) ;
81
87
let topic_name = random_topic_name ( ) ;
82
88
83
- let client = ClientBuilder :: new ( connection) . build ( ) . await . unwrap ( ) ;
89
+ let client = ClientBuilder :: new ( test_cfg. bootstrap_brokers )
90
+ . build ( )
91
+ . await
92
+ . unwrap ( ) ;
84
93
85
94
let controller_client = client. controller_client ( ) . unwrap ( ) ;
86
95
controller_client
@@ -100,13 +109,17 @@ async fn test_partition_client() {
100
109
async fn test_non_existing_partition ( ) {
101
110
maybe_start_logging ( ) ;
102
111
103
- let connection = maybe_skip_kafka_integration ! ( ) ;
112
+ let test_cfg = maybe_skip_kafka_integration ! ( ) ;
104
113
let topic_name = random_topic_name ( ) ;
105
114
106
- let client = ClientBuilder :: new ( connection) . build ( ) . await . unwrap ( ) ;
115
+ let client = ClientBuilder :: new ( test_cfg. bootstrap_brokers )
116
+ . build ( )
117
+ . await
118
+ . unwrap ( ) ;
107
119
108
120
// do NOT create the topic
109
121
122
+ // short timeout, should just check that we will never finish
110
123
tokio:: time:: timeout ( Duration :: from_millis ( 100 ) , async {
111
124
client
112
125
. partition_client ( topic_name. clone ( ) , 0 , UnknownTopicHandling :: Retry )
@@ -167,8 +180,8 @@ async fn test_tls() {
167
180
. with_single_cert ( vec ! [ producer_root] , private_key)
168
181
. unwrap ( ) ;
169
182
170
- let connection = maybe_skip_kafka_integration ! ( ) ;
171
- ClientBuilder :: new ( connection )
183
+ let test_cfg = maybe_skip_kafka_integration ! ( ) ;
184
+ ClientBuilder :: new ( test_cfg . bootstrap_brokers )
172
185
. tls_config ( Arc :: new ( config) )
173
186
. build ( )
174
187
. await
@@ -180,14 +193,11 @@ async fn test_tls() {
180
193
async fn test_socks5 ( ) {
181
194
maybe_start_logging ( ) ;
182
195
183
- // e.g. "my-connection-kafka-bootstrap:9092"
184
- let connection = maybe_skip_kafka_integration ! ( ) ;
185
- // e.g. "localhost:1080"
186
- let proxy = maybe_skip_SOCKS_PROXY ! ( ) ;
196
+ let test_cfg = maybe_skip_kafka_integration ! ( socks5) ;
187
197
let topic_name = random_topic_name ( ) ;
188
198
189
- let client = ClientBuilder :: new ( connection )
190
- . socks5_proxy ( proxy )
199
+ let client = ClientBuilder :: new ( test_cfg . bootstrap_brokers )
200
+ . socks5_proxy ( test_cfg . socks5_proxy . unwrap ( ) )
191
201
. build ( )
192
202
. await
193
203
. unwrap ( ) ;
@@ -222,11 +232,14 @@ async fn test_socks5() {
222
232
async fn test_produce_empty ( ) {
223
233
maybe_start_logging ( ) ;
224
234
225
- let connection = maybe_skip_kafka_integration ! ( ) ;
235
+ let test_cfg = maybe_skip_kafka_integration ! ( ) ;
226
236
let topic_name = random_topic_name ( ) ;
227
237
let n_partitions = 2 ;
228
238
229
- let client = ClientBuilder :: new ( connection) . build ( ) . await . unwrap ( ) ;
239
+ let client = ClientBuilder :: new ( test_cfg. bootstrap_brokers )
240
+ . build ( )
241
+ . await
242
+ . unwrap ( ) ;
230
243
let controller_client = client. controller_client ( ) . unwrap ( ) ;
231
244
controller_client
232
245
. create_topic ( & topic_name, n_partitions, 1 , 5_000 )
@@ -247,11 +260,14 @@ async fn test_produce_empty() {
247
260
async fn test_consume_empty ( ) {
248
261
maybe_start_logging ( ) ;
249
262
250
- let connection = maybe_skip_kafka_integration ! ( ) ;
263
+ let test_cfg = maybe_skip_kafka_integration ! ( ) ;
251
264
let topic_name = random_topic_name ( ) ;
252
265
let n_partitions = 2 ;
253
266
254
- let client = ClientBuilder :: new ( connection) . build ( ) . await . unwrap ( ) ;
267
+ let client = ClientBuilder :: new ( test_cfg. bootstrap_brokers )
268
+ . build ( )
269
+ . await
270
+ . unwrap ( ) ;
255
271
let controller_client = client. controller_client ( ) . unwrap ( ) ;
256
272
controller_client
257
273
. create_topic ( & topic_name, n_partitions, 1 , 5_000 )
@@ -274,11 +290,14 @@ async fn test_consume_empty() {
274
290
async fn test_consume_offset_out_of_range ( ) {
275
291
maybe_start_logging ( ) ;
276
292
277
- let connection = maybe_skip_kafka_integration ! ( ) ;
293
+ let test_cfg = maybe_skip_kafka_integration ! ( ) ;
278
294
let topic_name = random_topic_name ( ) ;
279
295
let n_partitions = 2 ;
280
296
281
- let client = ClientBuilder :: new ( connection) . build ( ) . await . unwrap ( ) ;
297
+ let client = ClientBuilder :: new ( test_cfg. bootstrap_brokers )
298
+ . build ( )
299
+ . await
300
+ . unwrap ( ) ;
282
301
let controller_client = client. controller_client ( ) . unwrap ( ) ;
283
302
controller_client
284
303
. create_topic ( & topic_name, n_partitions, 1 , 5_000 )
@@ -314,11 +333,11 @@ async fn test_consume_offset_out_of_range() {
314
333
async fn test_get_offset ( ) {
315
334
maybe_start_logging ( ) ;
316
335
317
- let connection = maybe_skip_kafka_integration ! ( ) ;
336
+ let test_cfg = maybe_skip_kafka_integration ! ( ) ;
318
337
let topic_name = random_topic_name ( ) ;
319
338
let n_partitions = 1 ;
320
339
321
- let client = ClientBuilder :: new ( connection . clone ( ) )
340
+ let client = ClientBuilder :: new ( test_cfg . bootstrap_brokers . clone ( ) )
322
341
. build ( )
323
342
. await
324
343
. unwrap ( ) ;
@@ -382,10 +401,13 @@ async fn test_get_offset() {
382
401
async fn test_produce_consume_size_cutoff ( ) {
383
402
maybe_start_logging ( ) ;
384
403
385
- let connection = maybe_skip_kafka_integration ! ( ) ;
404
+ let test_cfg = maybe_skip_kafka_integration ! ( ) ;
386
405
let topic_name = random_topic_name ( ) ;
387
406
388
- let client = ClientBuilder :: new ( connection) . build ( ) . await . unwrap ( ) ;
407
+ let client = ClientBuilder :: new ( test_cfg. bootstrap_brokers )
408
+ . build ( )
409
+ . await
410
+ . unwrap ( ) ;
389
411
let controller_client = client. controller_client ( ) . unwrap ( ) ;
390
412
controller_client
391
413
. create_topic ( & topic_name, 1 , 1 , 5_000 )
@@ -460,10 +482,13 @@ async fn test_produce_consume_size_cutoff() {
460
482
async fn test_consume_midbatch ( ) {
461
483
maybe_start_logging ( ) ;
462
484
463
- let connection = maybe_skip_kafka_integration ! ( ) ;
485
+ let test_cfg = maybe_skip_kafka_integration ! ( ) ;
464
486
let topic_name = random_topic_name ( ) ;
465
487
466
- let client = ClientBuilder :: new ( connection) . build ( ) . await . unwrap ( ) ;
488
+ let client = ClientBuilder :: new ( test_cfg. bootstrap_brokers )
489
+ . build ( )
490
+ . await
491
+ . unwrap ( ) ;
467
492
let controller_client = client. controller_client ( ) . unwrap ( ) ;
468
493
controller_client
469
494
. create_topic ( & topic_name, 1 , 1 , 5_000 )
@@ -508,10 +533,13 @@ async fn test_consume_midbatch() {
508
533
async fn test_delete_records ( ) {
509
534
maybe_start_logging ( ) ;
510
535
511
- let connection = maybe_skip_kafka_integration ! ( ) ;
536
+ let test_cfg = maybe_skip_kafka_integration ! ( delete ) ;
512
537
let topic_name = random_topic_name ( ) ;
513
538
514
- let client = ClientBuilder :: new ( connection) . build ( ) . await . unwrap ( ) ;
539
+ let client = ClientBuilder :: new ( test_cfg. bootstrap_brokers )
540
+ . build ( )
541
+ . await
542
+ . unwrap ( ) ;
515
543
let controller_client = client. controller_client ( ) . unwrap ( ) ;
516
544
controller_client
517
545
. create_topic ( & topic_name, 1 , 1 , 5_000 )
@@ -555,7 +583,10 @@ async fn test_delete_records() {
555
583
let offset_4 = offsets[ 0 ] ;
556
584
557
585
// delete from the middle of the 2nd batch
558
- maybe_skip_delete ! ( partition_client, offset_3) ;
586
+ partition_client
587
+ . delete_records ( offset_3, 1_000 )
588
+ . await
589
+ . unwrap ( ) ;
559
590
560
591
// fetching data before the record fails
561
592
let err = partition_client
0 commit comments