Skip to content

Commit a5a3fbf

Browse files
committed
feat: extend ServerError with helpful data
Closes #148.
1 parent 67d3ce8 commit a5a3fbf

File tree

7 files changed

+212
-62
lines changed

7 files changed

+212
-62
lines changed

src/client/consumer.rs

Lines changed: 29 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -300,7 +300,10 @@ impl Stream for StreamConsumer {
300300
}
301301
// if we don't have an offset, try again because fetching the offset is racy
302302
(
303-
Err(Error::ServerError(ProtocolError::OffsetOutOfRange, _)),
303+
Err(Error::ServerError {
304+
protocol_error: ProtocolError::OffsetOutOfRange,
305+
..
306+
}),
304307
StartOffset::Earliest | StartOffset::Latest,
305308
) => {
306309
// wipe offset and try again
@@ -627,10 +630,13 @@ mod tests {
627630

628631
#[tokio::test]
629632
async fn test_consumer_terminate() {
630-
let e = Error::ServerError(
631-
ProtocolError::OffsetOutOfRange,
632-
String::from("offset out of range"),
633-
);
633+
let e = Error::ServerError {
634+
protocol_error: ProtocolError::OffsetOutOfRange,
635+
error_message: None,
636+
context: None,
637+
payload: None,
638+
is_virtual: true,
639+
};
634640
let (_sender, receiver) = mpsc::channel(10);
635641
let consumer = Arc::new(MockFetch::new(receiver, Some(e), (0, 1_000)));
636642

@@ -640,7 +646,10 @@ mod tests {
640646
let error = stream.next().await.expect("stream not empty").unwrap_err();
641647
assert_matches!(
642648
error,
643-
Error::ServerError(ProtocolError::OffsetOutOfRange, _)
649+
Error::ServerError {
650+
protocol_error: ProtocolError::OffsetOutOfRange,
651+
..
652+
}
644653
);
645654

646655
// stream ends
@@ -657,10 +666,13 @@ mod tests {
657666
};
658667

659668
// Simulate an error on first fetch to encourage an offset update
660-
let e = Error::ServerError(
661-
ProtocolError::OffsetOutOfRange,
662-
String::from("offset out of range"),
663-
);
669+
let e = Error::ServerError {
670+
protocol_error: ProtocolError::OffsetOutOfRange,
671+
error_message: None,
672+
context: None,
673+
payload: None,
674+
is_virtual: true,
675+
};
664676

665677
let (sender, receiver) = mpsc::channel(10);
666678
let consumer = Arc::new(MockFetch::new(receiver, Some(e), (2, 1_000)));
@@ -701,10 +713,13 @@ mod tests {
701713
};
702714

703715
// Simulate an error on first fetch to encourage an offset update
704-
let e = Error::ServerError(
705-
ProtocolError::OffsetOutOfRange,
706-
String::from("offset out of range"),
707-
);
716+
let e = Error::ServerError {
717+
protocol_error: ProtocolError::OffsetOutOfRange,
718+
error_message: None,
719+
context: None,
720+
payload: None,
721+
is_virtual: true,
722+
};
708723

709724
let (sender, receiver) = mpsc::channel(10);
710725
let consumer = Arc::new(MockFetch::new(receiver, Some(e), (0, 2)));

src/client/controller.rs

Lines changed: 14 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -12,11 +12,13 @@ use crate::{
1212
protocol::{
1313
error::Error as ProtocolError,
1414
messages::{CreateTopicRequest, CreateTopicsRequest},
15-
primitives::{Int16, Int32, NullableString, String_},
15+
primitives::{Int16, Int32, String_},
1616
},
1717
validation::ExactlyOne,
1818
};
1919

20+
use super::error::ServerErrorContext;
21+
2022
#[derive(Debug)]
2123
pub struct ControllerClient {
2224
brokers: Arc<BrokerConnector>,
@@ -69,10 +71,13 @@ impl ControllerClient {
6971

7072
match topic.error {
7173
None => Ok(()),
72-
Some(protocol_error) => match topic.error_message {
73-
Some(NullableString(Some(msg))) => Err(Error::ServerError(protocol_error, msg)),
74-
_ => Err(Error::ServerError(protocol_error, Default::default())),
75-
},
74+
Some(protocol_error) => Err(Error::ServerError {
75+
protocol_error,
76+
error_message: topic.error_message.and_then(|s| s.0),
77+
context: Some(ServerErrorContext::Topic(topic.name.0)),
78+
payload: None,
79+
is_virtual: false,
80+
}),
7681
}
7782
})
7883
.await
@@ -151,7 +156,10 @@ where
151156
| Error::Connection(_) => broker_cache.invalidate().await,
152157

153158
// our broker is actually not the controller
154-
Error::ServerError(ProtocolError::NotController, _) => {
159+
Error::ServerError {
160+
protocol_error: ProtocolError::NotController,
161+
..
162+
} => {
155163
broker_cache.invalidate().await;
156164
}
157165

src/client/error.rs

Lines changed: 57 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,31 @@ use thiserror::Error;
33
pub use crate::messenger::RequestError;
44
pub use crate::protocol::error::Error as ProtocolError;
55

6+
/// Context for [`Error::ServerError`].
7+
#[derive(Debug)]
8+
#[non_exhaustive]
9+
pub enum ServerErrorContext {
10+
Topic(String),
11+
Partition(String, i32),
12+
}
13+
14+
/// Payload for [`Error::ServerError`].
15+
///
16+
/// This is data that the server sent and that is still usable despite the error.
17+
#[derive(Debug)]
18+
#[allow(missing_copy_implementations)] // wanna extend this later
19+
#[non_exhaustive]
20+
pub enum ServerErrorPayload {
21+
LeaderForward {
22+
broker: i32,
23+
new_leader: i32,
24+
},
25+
FetchState {
26+
high_watermark: i64,
27+
last_stable_offset: Option<i64>,
28+
},
29+
}
30+
631
#[derive(Error, Debug)]
732
#[non_exhaustive]
833
pub enum Error {
@@ -15,8 +40,30 @@ pub enum Error {
1540
#[error("Invalid response: {0}")]
1641
InvalidResponse(String),
1742

18-
#[error("Server error {0:?} with message \"{1}\"")]
19-
ServerError(ProtocolError, String),
43+
#[error(
44+
"Server error {} with message \"{}\", context: {:?}, payload: {:?}, virtual: {}",
45+
protocol_error,
46+
string_or_na(error_message),
47+
context,
48+
payload,
49+
is_virtual
50+
)]
51+
ServerError {
52+
/// Protocol-level error message.
53+
protocol_error: ProtocolError,
54+
55+
/// Server message provided by the broker, if any.
56+
error_message: Option<String>,
57+
58+
/// Additional context that we can tell the user about.
59+
context: Option<ServerErrorContext>,
60+
61+
/// Additional payload that we can provide the user.
62+
payload: Option<ServerErrorPayload>,
63+
64+
/// Flags if the error was generated by the client to simulate some server behavior or workaround a bug.
65+
is_virtual: bool,
66+
},
2067

2168
#[error("All retries failed: {0}")]
2269
RetryFailed(#[from] crate::backoff::BackoffError),
@@ -35,3 +82,11 @@ impl Error {
3582
}
3683

3784
pub type Result<T, E = Error> = std::result::Result<T, E>;
85+
86+
/// Simple formatting function the replaces `None` with `"n/a"`.
87+
fn string_or_na(s: &Option<String>) -> &str {
88+
match s {
89+
Some(s) => s.as_str(),
90+
None => "n/a",
91+
}
92+
}

src/client/partition.rs

Lines changed: 85 additions & 34 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
use crate::{
22
backoff::{Backoff, BackoffConfig},
3-
client::error::{Error, Result},
3+
client::error::{Error, Result, ServerErrorContext},
44
connection::{BrokerCache, BrokerConnection, BrokerConnector, MessengerTransport},
55
messenger::RequestError,
66
protocol::{
@@ -26,6 +26,8 @@ use time::OffsetDateTime;
2626
use tokio::sync::Mutex;
2727
use tracing::{error, info};
2828

29+
use super::error::ServerErrorPayload;
30+
2931
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
3032
pub enum Compression {
3133
NoCompression,
@@ -224,10 +226,13 @@ impl PartitionClient {
224226

225227
if let Some(e) = topic.error {
226228
// TODO: Add retry logic
227-
return Err(Error::ServerError(
228-
e,
229-
format!("error getting metadata for topic \"{}\"", self.topic),
230-
));
229+
return Err(Error::ServerError {
230+
protocol_error: e,
231+
error_message: None,
232+
context: Some(ServerErrorContext::Topic(self.topic.clone())),
233+
payload: None,
234+
is_virtual: false,
235+
});
231236
}
232237

233238
let partition = topic
@@ -243,23 +248,29 @@ impl PartitionClient {
243248

244249
if let Some(e) = partition.error {
245250
// TODO: Add retry logic
246-
return Err(Error::ServerError(
247-
e,
248-
format!(
249-
"error getting metadata for partition {} in topic \"{}\"",
250-
self.partition, self.topic
251-
),
252-
));
251+
return Err(Error::ServerError {
252+
protocol_error: e,
253+
error_message: None,
254+
context: Some(ServerErrorContext::Partition(
255+
self.topic.clone(),
256+
self.partition,
257+
)),
258+
payload: None,
259+
is_virtual: false,
260+
});
253261
}
254262

255263
if partition.leader_id.0 == -1 {
256-
return Err(Error::ServerError(
257-
ProtocolError::LeaderNotAvailable,
258-
format!(
259-
"Leader unknown for partition {} and topic \"{}\"",
260-
self.partition, self.topic
261-
),
262-
));
264+
return Err(Error::ServerError {
265+
protocol_error: ProtocolError::LeaderNotAvailable,
266+
error_message: None,
267+
context: Some(ServerErrorContext::Partition(
268+
self.topic.clone(),
269+
self.partition,
270+
)),
271+
payload: None,
272+
is_virtual: true,
273+
});
263274
}
264275

265276
info!(
@@ -310,13 +321,19 @@ impl BrokerCache for &PartitionClient {
310321
if leader != leader_self {
311322
// this might happen if the leader changed after we got the hint from a arbitrary broker and this specific
312323
// metadata call.
313-
return Err(Error::ServerError(
314-
ProtocolError::NotLeaderOrFollower,
315-
format!(
316-
"Broker {} which we determined as leader thinks there is another leader {}",
317-
leader, leader_self
318-
),
319-
));
324+
return Err(Error::ServerError {
325+
protocol_error: ProtocolError::NotLeaderOrFollower,
326+
error_message: None,
327+
context: Some(ServerErrorContext::Partition(
328+
self.topic.clone(),
329+
self.partition,
330+
)),
331+
payload: Some(ServerErrorPayload::LeaderForward {
332+
broker: leader,
333+
new_leader: leader_self,
334+
}),
335+
is_virtual: true,
336+
});
320337
}
321338

322339
*current_broker = Some(Arc::clone(&broker));
@@ -365,10 +382,17 @@ where
365382
match error {
366383
Error::Request(RequestError::Poisoned(_) | RequestError::IO(_))
367384
| Error::Connection(_) => broker_cache.invalidate().await,
368-
Error::ServerError(ProtocolError::InvalidReplicationFactor, _) => {}
369-
Error::ServerError(ProtocolError::LeaderNotAvailable, _) => {}
370-
Error::ServerError(ProtocolError::OffsetNotAvailable, _) => {}
371-
Error::ServerError(ProtocolError::NotLeaderOrFollower, _) => {
385+
Error::ServerError {
386+
protocol_error:
387+
ProtocolError::InvalidReplicationFactor
388+
| ProtocolError::LeaderNotAvailable
389+
| ProtocolError::OffsetNotAvailable,
390+
..
391+
} => {}
392+
Error::ServerError {
393+
protocol_error: ProtocolError::NotLeaderOrFollower,
394+
..
395+
} => {
372396
broker_cache.invalidate().await;
373397
}
374398
_ => {
@@ -490,7 +514,13 @@ fn process_produce_response(
490514
}
491515

492516
match response.error {
493-
Some(e) => Err(Error::ServerError(e, Default::default())),
517+
Some(e) => Err(Error::ServerError {
518+
protocol_error: e,
519+
error_message: None,
520+
context: Some(ServerErrorContext::Partition(topic.to_owned(), partition)),
521+
payload: None,
522+
is_virtual: false,
523+
}),
494524
None => Ok((0..num_records)
495525
.map(|x| x + response.base_offset.0)
496526
.collect()),
@@ -551,7 +581,16 @@ fn process_fetch_response(
551581
}
552582

553583
if let Some(err) = response_partition.error_code {
554-
return Err(Error::ServerError(err, String::new()));
584+
return Err(Error::ServerError {
585+
protocol_error: err,
586+
error_message: None,
587+
context: Some(ServerErrorContext::Partition(topic.to_owned(), partition)),
588+
payload: Some(ServerErrorPayload::FetchState {
589+
high_watermark: response_partition.high_watermark.0,
590+
last_stable_offset: response_partition.last_stable_offset.map(|x| x.0),
591+
}),
592+
is_virtual: false,
593+
});
555594
}
556595

557596
Ok(response_partition)
@@ -657,7 +696,13 @@ fn process_list_offsets_response(
657696
}
658697

659698
match response_partition.error_code {
660-
Some(err) => Err(Error::ServerError(err, String::new())),
699+
Some(err) => Err(Error::ServerError {
700+
protocol_error: err,
701+
error_message: None,
702+
context: Some(ServerErrorContext::Partition(topic.to_owned(), partition)),
703+
payload: None,
704+
is_virtual: false,
705+
}),
661706
None => Ok(response_partition),
662707
}
663708
}
@@ -737,7 +782,13 @@ fn process_delete_records_response(
737782
}
738783

739784
match response_partition.error {
740-
Some(err) => Err(Error::ServerError(err, String::new())),
785+
Some(err) => Err(Error::ServerError {
786+
protocol_error: err,
787+
error_message: None,
788+
context: Some(ServerErrorContext::Partition(topic.to_owned(), partition)),
789+
payload: None,
790+
is_virtual: false,
791+
}),
741792
None => Ok(response_partition),
742793
}
743794
}

0 commit comments

Comments
 (0)