@@ -2,9 +2,12 @@ use std::sync::Arc;
2
2
use std:: time:: Duration ;
3
3
4
4
use assert_matches:: assert_matches;
5
+ use chrono:: { DateTime , Timelike , Utc } ;
5
6
use futures:: { Stream , StreamExt } ;
6
7
use tokio:: time:: timeout;
7
8
9
+ use rskafka:: client:: partition:: OffsetAt ;
10
+ use rskafka:: record:: Record ;
8
11
use rskafka:: {
9
12
client:: {
10
13
consumer:: { StartOffset , StreamConsumer , StreamConsumerBuilder } ,
@@ -415,6 +418,68 @@ async fn test_stream_consumer_start_at_latest_empty() {
415
418
assert_stream_pending ( & mut stream) . await ;
416
419
}
417
420
421
+ #[ tokio:: test]
422
+ async fn test_stream_consumer_start_timestamp_based_offset ( ) {
423
+ maybe_start_logging ( ) ;
424
+
425
+ let test_cfg = maybe_skip_kafka_integration ! ( ) ;
426
+ let client = ClientBuilder :: new ( test_cfg. bootstrap_brokers )
427
+ . build ( )
428
+ . await
429
+ . unwrap ( ) ;
430
+ let controller_client = client. controller_client ( ) . unwrap ( ) ;
431
+
432
+ let topic = random_topic_name ( ) ;
433
+ controller_client
434
+ . create_topic ( & topic, 1 , 1 , 5_000 )
435
+ . await
436
+ . unwrap ( ) ;
437
+
438
+ fn now ( ) -> DateTime < Utc > {
439
+ let now = std:: time:: SystemTime :: now ( )
440
+ . duration_since ( std:: time:: UNIX_EPOCH )
441
+ . expect ( "system time before Unix epoch" ) ;
442
+ DateTime :: from_timestamp ( now. as_secs ( ) as i64 , now. subsec_nanos ( ) ) . unwrap ( )
443
+ }
444
+
445
+ let partition_client = Arc :: new (
446
+ client
447
+ . partition_client ( & topic, 0 , UnknownTopicHandling :: Retry )
448
+ . await
449
+ . unwrap ( ) ,
450
+ ) ;
451
+ let record_1 = record_with_timestamp_milliseconds ( b"x" , now ( ) ) ;
452
+ partition_client
453
+ . produce ( vec ! [ record_1. clone( ) ] , Compression :: NoCompression )
454
+ . await
455
+ . unwrap ( ) ;
456
+ tokio:: time:: sleep ( Duration :: from_millis ( 100 ) ) . await ;
457
+ let ts = now ( ) ;
458
+
459
+ let record_2 = record_with_timestamp_milliseconds ( b"y" , now ( ) ) ;
460
+ partition_client
461
+ . produce ( vec ! [ record_2. clone( ) ] , Compression :: NoCompression )
462
+ . await
463
+ . unwrap ( ) ;
464
+
465
+ let offset = partition_client
466
+ . get_offset ( OffsetAt :: Timestamp ( ts) )
467
+ . await
468
+ . unwrap ( ) ;
469
+ assert_eq ! ( offset, 1 ) ;
470
+ let mut stream =
471
+ StreamConsumerBuilder :: new ( Arc :: clone ( & partition_client) , StartOffset :: At ( offset) )
472
+ . with_max_wait_ms ( 50 )
473
+ . build ( ) ;
474
+
475
+ // Get record
476
+ let ( record_and_offset, _) = assert_ok ( timeout ( TEST_TIMEOUT , stream. next ( ) ) . await ) ;
477
+ assert_eq ! ( record_and_offset. record, record_2) ;
478
+
479
+ // No further records
480
+ assert_stream_pending ( & mut stream) . await ;
481
+ }
482
+
418
483
fn assert_ok (
419
484
r : Result < Option < <StreamConsumer as Stream >:: Item > , tokio:: time:: error:: Elapsed > ,
420
485
) -> ( RecordAndOffset , i64 ) {
@@ -436,3 +501,14 @@ where
436
501
_ = tokio:: time:: sleep( Duration :: from_secs( 1 ) ) => { } ,
437
502
} ;
438
503
}
504
+
505
+ fn record_with_timestamp_milliseconds ( key : & [ u8 ] , timestamp : DateTime < Utc > ) -> Record {
506
+ Record {
507
+ key : Some ( key. to_vec ( ) ) ,
508
+ value : Some ( b"hello kafka" . to_vec ( ) ) ,
509
+ headers : std:: collections:: BTreeMap :: from ( [ ( "foo" . to_owned ( ) , b"bar" . to_vec ( ) ) ] ) ,
510
+ timestamp : timestamp
511
+ . with_nanosecond ( timestamp. nanosecond ( ) / 1_000_000 * 1_000_000 )
512
+ . unwrap ( ) ,
513
+ }
514
+ }
0 commit comments