Skip to content

Commit 9f1e36e

Browse files
Merge pull request #163 from influxdata/crepererum/offset_in_error
feat: add request offset to failed fetch requests
2 parents bdd816e + 51658e4 commit 9f1e36e

File tree

2 files changed

+20
-2
lines changed

2 files changed

+20
-2
lines changed

src/client/error.rs

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,19 @@ pub enum RequestContext {
1212

1313
/// Error is specific to a partition (indexed via topic name and partition ID).
1414
Partition(String, i32),
15+
16+
/// Error is specific to a fetch request.
17+
#[non_exhaustive]
18+
Fetch {
19+
/// Topic name.
20+
topic_name: String,
21+
22+
/// Partition ID.
23+
partition_id: i32,
24+
25+
/// Offset used during the request.
26+
offset: i64,
27+
},
1528
}
1629

1730
/// Usable broker data for [`Error::ServerError`].

src/client/partition.rs

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -154,7 +154,7 @@ impl PartitionClient {
154154

155155
let partition = maybe_retry(&self.backoff_config, self, "fetch_records", || async move {
156156
let response = self.get().await?.request(&request).await?;
157-
process_fetch_response(self.partition, &self.topic, response)
157+
process_fetch_response(self.partition, &self.topic, response, offset)
158158
})
159159
.await?;
160160

@@ -546,6 +546,7 @@ fn process_fetch_response(
546546
partition: i32,
547547
topic: &str,
548548
response: FetchResponse,
549+
request_offset: i64,
549550
) -> Result<FetchResponsePartition> {
550551
let response_topic = response
551552
.responses
@@ -575,7 +576,11 @@ fn process_fetch_response(
575576
return Err(Error::ServerError {
576577
protocol_error: err,
577578
error_message: None,
578-
request: RequestContext::Partition(topic.to_owned(), partition),
579+
request: RequestContext::Fetch {
580+
topic_name: topic.to_owned(),
581+
partition_id: partition,
582+
offset: request_offset,
583+
},
579584
response: Some(ServerErrorResponse::PartitionFetchState {
580585
high_watermark: response_partition.high_watermark.0,
581586
last_stable_offset: response_partition.last_stable_offset.map(|x| x.0),

0 commit comments

Comments
 (0)