Skip to content

Commit 7c74d43

Browse files
Merge pull request #100 from influxdata/crepererum/get_offset
feat: extend watermark API to get both low/high offset
2 parents 6c47b5b + 978f31f commit 7c74d43

File tree

3 files changed

+84
-25
lines changed

3 files changed

+84
-25
lines changed

src/client/partition.rs

Lines changed: 42 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -43,6 +43,24 @@ impl Default for Compression {
4343
}
4444
}
4545

46+
/// Which type of offset should be requested by [`PartitionClient::get_offset`].
47+
///
48+
/// # Timestamp-based Queries
49+
/// In theory the Kafka API would also support querying an offset based on a timestamp, but the behavior seems to be
50+
/// semi-defined, unintuitive (even within Apache Kafka) and inconsistent between Apache Kafka and Redpanda. So we
51+
/// decided to NOT expose this option.
52+
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
53+
pub enum OffsetAt {
54+
/// Earliest existing record.
55+
///
56+
/// This is NOT the earliest produced record but the earliest record that is still kept, i.e. the offset might
57+
/// change if records are pruned by Kafka (retention policy) or if the are deleted.
58+
Earliest,
59+
60+
/// The latest existing record.
61+
Latest,
62+
}
63+
4664
/// Many operations must be performed on the leader for a partition
4765
///
4866
/// Additionally a partition is the unit of concurrency within Kafka
@@ -110,7 +128,8 @@ impl PartitionClient {
110128
///
111129
/// # Error Handling
112130
/// Fetching records outside the range known the to broker (marked by low and high watermark) will lead to a
113-
/// [`ServerError`](Error::ServerError) with [`OffsetOutOfRange`](ProtocolError::OffsetOutOfRange).
131+
/// [`ServerError`](Error::ServerError) with [`OffsetOutOfRange`](ProtocolError::OffsetOutOfRange). You may use
132+
/// [`get_offset`](Self::get_offset) to determine the offset range.
114133
pub async fn fetch_records(
115134
&self,
116135
offset: i64,
@@ -142,22 +161,23 @@ impl PartitionClient {
142161
Ok((records, partition.high_watermark.0))
143162
}
144163

145-
/// Get high watermark for this partition.
146-
pub async fn get_high_watermark(&self) -> Result<i64> {
147-
let request = &build_list_offsets_request(self.partition, &self.topic);
164+
/// Get offset for this partition.
165+
///
166+
/// Note that the value returned by this method should be considered stale data, since:
167+
///
168+
/// - **[`OffsetAt::Earliest`]:** Might be change at any time due to the Kafka retention policy or by
169+
/// [deleting records](Self::delete_records).
170+
/// - **[`OffsetAt::Latest`]:** Might be change at any time by [producing records](Self::produce).
171+
pub async fn get_offset(&self, at: OffsetAt) -> Result<i64> {
172+
let request = &build_list_offsets_request(self.partition, &self.topic, at);
148173

149-
let partition = maybe_retry(
150-
&self.backoff_config,
151-
self,
152-
"get_high_watermark",
153-
|| async move {
154-
let response = self.get().await?.request(&request).await?;
155-
process_list_offsets_response(self.partition, &self.topic, response)
156-
},
157-
)
174+
let partition = maybe_retry(&self.backoff_config, self, "get_offset", || async move {
175+
let response = self.get().await?.request(&request).await?;
176+
process_list_offsets_response(self.partition, &self.topic, response)
177+
})
158178
.await?;
159179

160-
extract_high_watermark(partition)
180+
extract_offset(partition)
161181
}
162182

163183
/// Delete records whose offset is smaller than the given offset.
@@ -585,16 +605,20 @@ fn extract_records(
585605
Ok(records)
586606
}
587607

588-
fn build_list_offsets_request(partition: i32, topic: &str) -> ListOffsetsRequest {
608+
fn build_list_offsets_request(partition: i32, topic: &str, at: OffsetAt) -> ListOffsetsRequest {
609+
let timestamp = match at {
610+
OffsetAt::Earliest => -2,
611+
OffsetAt::Latest => -1,
612+
};
613+
589614
ListOffsetsRequest {
590615
replica_id: NORMAL_CONSUMER,
591616
isolation_level: Some(IsolationLevel::ReadCommitted),
592617
topics: vec![ListOffsetsRequestTopic {
593618
name: String_(topic.to_owned()),
594619
partitions: vec![ListOffsetsRequestPartition {
595620
partition_index: Int32(partition),
596-
// latest offset
597-
timestamp: Int64(-1),
621+
timestamp: Int64(timestamp),
598622
max_num_offsets: Some(Int32(1)),
599623
}],
600624
}],
@@ -636,7 +660,7 @@ fn process_list_offsets_response(
636660
}
637661
}
638662

639-
fn extract_high_watermark(partition: ListOffsetsResponsePartition) -> Result<i64> {
663+
fn extract_offset(partition: ListOffsetsResponsePartition) -> Result<i64> {
640664
match (
641665
partition.old_style_offsets.as_ref(),
642666
partition.offset.as_ref(),

src/protocol/messages/list_offsets.rs

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -77,6 +77,8 @@ pub struct ListOffsetsRequestTopic {
7777
pub name: String_,
7878

7979
/// Each partition in the request.
80+
///
81+
/// Note: A partition may only appear once within the request.
8082
pub partitions: Vec<ListOffsetsRequestPartition>,
8183
}
8284

@@ -120,6 +122,8 @@ pub struct ListOffsetsRequest {
120122
pub isolation_level: Option<IsolationLevel>,
121123

122124
/// Each topic in the request.
125+
///
126+
/// Note: A topic may only appear once within the request.
123127
pub topics: Vec<ListOffsetsRequestTopic>,
124128
}
125129

tests/client.rs

Lines changed: 38 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,7 @@ use assert_matches::assert_matches;
22
use rskafka::{
33
client::{
44
error::{Error as ClientError, ProtocolError, RequestError},
5-
partition::Compression,
5+
partition::{Compression, OffsetAt},
66
ClientBuilder,
77
},
88
record::{Record, RecordAndOffset},
@@ -216,7 +216,7 @@ async fn test_consume_offset_out_of_range() {
216216
}
217217

218218
#[tokio::test]
219-
async fn test_get_high_watermark() {
219+
async fn test_get_offset() {
220220
maybe_start_logging();
221221

222222
let connection = maybe_skip_kafka_integration!();
@@ -238,7 +238,17 @@ async fn test_get_high_watermark() {
238238
.await
239239
.unwrap();
240240

241-
assert_eq!(partition_client.get_high_watermark().await.unwrap(), 0);
241+
assert_eq!(
242+
partition_client
243+
.get_offset(OffsetAt::Earliest)
244+
.await
245+
.unwrap(),
246+
0
247+
);
248+
assert_eq!(
249+
partition_client.get_offset(OffsetAt::Latest).await.unwrap(),
250+
0
251+
);
242252

243253
// add some data
244254
// use out-of order timestamps to ensure our "lastest offset" logic works
@@ -247,21 +257,29 @@ async fn test_get_high_watermark() {
247257
timestamp: record_early.timestamp + time::Duration::SECOND,
248258
..record_early.clone()
249259
};
250-
partition_client
260+
let offsets = partition_client
251261
.produce(vec![record_late.clone()], Compression::NoCompression)
252262
.await
253263
.unwrap();
264+
assert_eq!(offsets[0], 0);
254265

255266
let offsets = partition_client
256267
.produce(vec![record_early.clone()], Compression::NoCompression)
257268
.await
258269
.unwrap();
259270
assert_eq!(offsets.len(), 1);
260-
let expected = offsets[0] + 1;
271+
assert_eq!(offsets[0], 1);
261272

262273
assert_eq!(
263-
partition_client.get_high_watermark().await.unwrap(),
264-
expected
274+
partition_client
275+
.get_offset(OffsetAt::Earliest)
276+
.await
277+
.unwrap(),
278+
0
279+
);
280+
assert_eq!(
281+
partition_client.get_offset(OffsetAt::Latest).await.unwrap(),
282+
2
265283
);
266284
}
267285

@@ -497,6 +515,19 @@ async fn test_delete_records() {
497515
},
498516
],
499517
);
518+
519+
// offsets reflect deletion
520+
assert_eq!(
521+
partition_client
522+
.get_offset(OffsetAt::Earliest)
523+
.await
524+
.unwrap(),
525+
offset_3
526+
);
527+
assert_eq!(
528+
partition_client.get_offset(OffsetAt::Latest).await.unwrap(),
529+
offset_4 + 1
530+
);
500531
}
501532

502533
pub fn large_record() -> Record {

0 commit comments

Comments
 (0)