@@ -2,9 +2,12 @@ use std::sync::Arc;
2
2
use std:: time:: Duration ;
3
3
4
4
use assert_matches:: assert_matches;
5
+ use chrono:: { DateTime , TimeZone , Utc } ;
5
6
use futures:: { Stream , StreamExt } ;
6
7
use tokio:: time:: timeout;
7
8
9
+ use rskafka:: client:: partition:: OffsetAt ;
10
+ use rskafka:: record:: Record ;
8
11
use rskafka:: {
9
12
client:: {
10
13
consumer:: { StartOffset , StreamConsumer , StreamConsumerBuilder } ,
@@ -415,6 +418,60 @@ async fn test_stream_consumer_start_at_latest_empty() {
415
418
assert_stream_pending ( & mut stream) . await ;
416
419
}
417
420
421
+ #[ tokio:: test]
422
+ async fn test_stream_consumer_start_timestamp_based_offset ( ) {
423
+ maybe_start_logging ( ) ;
424
+
425
+ let test_cfg = maybe_skip_kafka_integration ! ( ) ;
426
+ let client = ClientBuilder :: new ( test_cfg. bootstrap_brokers )
427
+ . build ( )
428
+ . await
429
+ . unwrap ( ) ;
430
+ let controller_client = client. controller_client ( ) . unwrap ( ) ;
431
+
432
+ let topic = random_topic_name ( ) ;
433
+ controller_client
434
+ . create_topic ( & topic, 1 , 1 , 5_000 )
435
+ . await
436
+ . unwrap ( ) ;
437
+
438
+ let partition_client = Arc :: new (
439
+ client
440
+ . partition_client ( & topic, 0 , UnknownTopicHandling :: Retry )
441
+ . await
442
+ . unwrap ( ) ,
443
+ ) ;
444
+ let ts = Utc . timestamp_millis_opt ( 1337 ) . unwrap ( ) ;
445
+ let record_1 = record_with_timestamp_milliseconds ( b"x" , ts) ;
446
+ let record_2 = record_with_timestamp_milliseconds ( b"y" , ts + Duration :: from_millis ( 100 ) ) ;
447
+ partition_client
448
+ . produce ( vec ! [ record_1. clone( ) ] , Compression :: NoCompression )
449
+ . await
450
+ . unwrap ( ) ;
451
+
452
+ partition_client
453
+ . produce ( vec ! [ record_2. clone( ) ] , Compression :: NoCompression )
454
+ . await
455
+ . unwrap ( ) ;
456
+
457
+ let offset = partition_client
458
+ . get_offset ( OffsetAt :: Timestamp ( ts + Duration :: from_millis ( 100 ) ) )
459
+ . await
460
+ . unwrap ( ) ;
461
+ assert_eq ! ( offset, 1 ) ;
462
+ let mut stream =
463
+ StreamConsumerBuilder :: new ( Arc :: clone ( & partition_client) , StartOffset :: At ( offset) )
464
+ . with_max_wait_ms ( 50 )
465
+ . build ( ) ;
466
+
467
+ // Get record
468
+ let ( record_and_offset, _) = assert_ok ( timeout ( TEST_TIMEOUT , stream. next ( ) ) . await ) ;
469
+ assert_eq ! ( record_and_offset. record, record_2) ;
470
+
471
+ // No further records
472
+ assert_stream_pending ( & mut stream) . await ;
473
+ }
474
+
418
475
fn assert_ok (
419
476
r : Result < Option < <StreamConsumer as Stream >:: Item > , tokio:: time:: error:: Elapsed > ,
420
477
) -> ( RecordAndOffset , i64 ) {
@@ -436,3 +493,12 @@ where
436
493
_ = tokio:: time:: sleep( Duration :: from_secs( 1 ) ) => { } ,
437
494
} ;
438
495
}
496
+
497
+ fn record_with_timestamp_milliseconds ( key : & [ u8 ] , timestamp : DateTime < Utc > ) -> Record {
498
+ Record {
499
+ key : Some ( key. to_vec ( ) ) ,
500
+ value : Some ( b"hello kafka" . to_vec ( ) ) ,
501
+ headers : std:: collections:: BTreeMap :: from ( [ ( "foo" . to_owned ( ) , b"bar" . to_vec ( ) ) ] ) ,
502
+ timestamp,
503
+ }
504
+ }
0 commit comments