Skip to content

Commit b2ee79a

Browse files
committed
feat: add timestamp-query feature to allow timestamp offset query
1 parent 8eb8efc commit b2ee79a

File tree

3 files changed

+78
-6
lines changed

3 files changed

+78
-6
lines changed

src/client/consumer.rs

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -502,6 +502,9 @@ mod tests {
502502
match at {
503503
OffsetAt::Earliest => Ok(inner.lock().await.range.0),
504504
OffsetAt::Latest => Ok(inner.lock().await.range.1),
505+
OffsetAt::Timestamp(_) => {
506+
unreachable!("timestamp based offset is tested in e2e test")
507+
}
505508
}
506509
})
507510
}

src/client/partition.rs

Lines changed: 9 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,7 @@ use crate::{
2323
throttle::maybe_throttle,
2424
validation::ExactlyOne,
2525
};
26-
use chrono::{LocalResult, TimeZone, Utc};
26+
use chrono::{DateTime, LocalResult, TimeZone, Utc};
2727
use std::{
2828
ops::{ControlFlow, Deref, Range},
2929
sync::Arc,
@@ -82,11 +82,6 @@ pub enum Compression {
8282
}
8383

8484
/// Which type of offset should be requested by [`PartitionClient::get_offset`].
85-
///
86-
/// # Timestamp-based Queries
87-
/// In theory the Kafka API would also support querying an offset based on a timestamp, but the behavior seems to be
88-
/// semi-defined, unintuitive (even within Apache Kafka) and inconsistent between Apache Kafka and Redpanda. So we
89-
/// decided to NOT expose this option.
9085
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
9186
pub enum OffsetAt {
9287
/// Earliest existing record.
@@ -97,6 +92,13 @@ pub enum OffsetAt {
9792

9893
/// The latest existing record.
9994
Latest,
95+
96+
/// Timestamp
97+
///
98+
/// The Kafka API supports querying an offset based on a timestamp, but the behavior seems to be
99+
/// semi-defined, unintuitive (even within Apache Kafka) and inconsistent between Apache Kafka and Redpanda. Mostly it
100+
/// offers timestamp based querying with millisecond precision.
101+
Timestamp(DateTime<Utc>),
100102
}
101103

102104
#[derive(Debug)]
@@ -902,6 +904,7 @@ fn build_list_offsets_request(partition: i32, topic: &str, at: OffsetAt) -> List
902904
let timestamp = match at {
903905
OffsetAt::Earliest => -2,
904906
OffsetAt::Latest => -1,
907+
OffsetAt::Timestamp(ts) => ts.timestamp_millis(),
905908
};
906909

907910
ListOffsetsRequest {

tests/consumer.rs

Lines changed: 66 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,9 +2,12 @@ use std::sync::Arc;
22
use std::time::Duration;
33

44
use assert_matches::assert_matches;
5+
use chrono::{DateTime, TimeZone, Utc};
56
use futures::{Stream, StreamExt};
67
use tokio::time::timeout;
78

9+
use rskafka::client::partition::OffsetAt;
10+
use rskafka::record::Record;
811
use rskafka::{
912
client::{
1013
consumer::{StartOffset, StreamConsumer, StreamConsumerBuilder},
@@ -415,6 +418,60 @@ async fn test_stream_consumer_start_at_latest_empty() {
415418
assert_stream_pending(&mut stream).await;
416419
}
417420

421+
#[tokio::test]
422+
async fn test_stream_consumer_start_timestamp_based_offset() {
423+
maybe_start_logging();
424+
425+
let test_cfg = maybe_skip_kafka_integration!();
426+
let client = ClientBuilder::new(test_cfg.bootstrap_brokers)
427+
.build()
428+
.await
429+
.unwrap();
430+
let controller_client = client.controller_client().unwrap();
431+
432+
let topic = random_topic_name();
433+
controller_client
434+
.create_topic(&topic, 1, 1, 5_000)
435+
.await
436+
.unwrap();
437+
438+
let partition_client = Arc::new(
439+
client
440+
.partition_client(&topic, 0, UnknownTopicHandling::Retry)
441+
.await
442+
.unwrap(),
443+
);
444+
let ts = Utc.timestamp_millis_opt(1337).unwrap();
445+
let record_1 = record_with_timestamp_milliseconds(b"x", ts);
446+
let record_2 = record_with_timestamp_milliseconds(b"y", ts + Duration::from_millis(100));
447+
partition_client
448+
.produce(vec![record_1.clone()], Compression::NoCompression)
449+
.await
450+
.unwrap();
451+
452+
partition_client
453+
.produce(vec![record_2.clone()], Compression::NoCompression)
454+
.await
455+
.unwrap();
456+
457+
let offset = partition_client
458+
.get_offset(OffsetAt::Timestamp(ts + Duration::from_millis(100)))
459+
.await
460+
.unwrap();
461+
assert_eq!(offset, 1);
462+
let mut stream =
463+
StreamConsumerBuilder::new(Arc::clone(&partition_client), StartOffset::At(offset))
464+
.with_max_wait_ms(50)
465+
.build();
466+
467+
// Get record
468+
let (record_and_offset, _) = assert_ok(timeout(TEST_TIMEOUT, stream.next()).await);
469+
assert_eq!(record_and_offset.record, record_2);
470+
471+
// No further records
472+
assert_stream_pending(&mut stream).await;
473+
}
474+
418475
fn assert_ok(
419476
r: Result<Option<<StreamConsumer as Stream>::Item>, tokio::time::error::Elapsed>,
420477
) -> (RecordAndOffset, i64) {
@@ -436,3 +493,12 @@ where
436493
_ = tokio::time::sleep(Duration::from_secs(1)) => {},
437494
};
438495
}
496+
497+
fn record_with_timestamp_milliseconds(key: &[u8], timestamp: DateTime<Utc>) -> Record {
498+
Record {
499+
key: Some(key.to_vec()),
500+
value: Some(b"hello kafka".to_vec()),
501+
headers: std::collections::BTreeMap::from([("foo".to_owned(), b"bar".to_vec())]),
502+
timestamp,
503+
}
504+
}

0 commit comments

Comments
 (0)