@@ -4,7 +4,7 @@ use std::collections::HashMap;
4
4
use std:: error:: Error ;
5
5
use std:: sync:: Arc ;
6
6
7
- use futures:: future:: { self , FutureExt } ;
7
+ use futures:: future;
8
8
use futures:: stream:: StreamExt ;
9
9
use maplit:: hashmap;
10
10
use rdkafka_sys:: RDKafkaErrorCode ;
@@ -491,7 +491,7 @@ async fn test_consumer_commit_metadata() -> Result<(), Box<dyn Error>> {
491
491
Ok ( ( ) )
492
492
}
493
493
494
- #[ tokio:: test]
494
+ #[ tokio:: test( flavor = "multi_thread" ) ]
495
495
async fn test_consume_partition_order ( ) {
496
496
let _r = env_logger:: try_init ( ) ;
497
497
@@ -545,8 +545,8 @@ async fn test_consume_partition_order() {
545
545
let partition1 = consumer. split_partition_queue ( & topic_name, 1 ) . unwrap ( ) ;
546
546
547
547
let mut i = 0 ;
548
- while i < 12 {
549
- if let Some ( m) = consumer. recv ( ) . now_or_never ( ) {
548
+ while i < 5 {
549
+ if let Ok ( m) = time :: timeout ( Duration :: from_millis ( 1000 ) , consumer. recv ( ) ) . await {
550
550
// retry on transient errors until we get a message
551
551
let m = match m {
552
552
Err ( KafkaError :: MessageConsumption (
@@ -564,9 +564,11 @@ async fn test_consume_partition_order() {
564
564
let partition: i32 = m. partition ( ) ;
565
565
assert ! ( partition == 0 || partition == 2 ) ;
566
566
i += 1 ;
567
+ } else {
568
+ panic ! ( "Timeout receiving message" ) ;
567
569
}
568
570
569
- if let Some ( m) = partition1. recv ( ) . now_or_never ( ) {
571
+ if let Ok ( m) = time :: timeout ( Duration :: from_millis ( 1000 ) , partition1. recv ( ) ) . await {
570
572
// retry on transient errors until we get a message
571
573
let m = match m {
572
574
Err ( KafkaError :: MessageConsumption (
@@ -583,6 +585,8 @@ async fn test_consume_partition_order() {
583
585
} ;
584
586
assert_eq ! ( m. partition( ) , 1 ) ;
585
587
i += 1 ;
588
+ } else {
589
+ panic ! ( "Timeout receiving message" ) ;
586
590
}
587
591
}
588
592
}
0 commit comments