1
1
use crate :: {
2
2
backoff:: { Backoff , BackoffConfig } ,
3
- client:: error:: { Error , Result } ,
3
+ client:: error:: { Error , Result , ServerErrorContext } ,
4
4
connection:: { BrokerCache , BrokerConnection , BrokerConnector , MessengerTransport } ,
5
5
messenger:: RequestError ,
6
6
protocol:: {
@@ -26,6 +26,8 @@ use time::OffsetDateTime;
26
26
use tokio:: sync:: Mutex ;
27
27
use tracing:: { error, info} ;
28
28
29
+ use super :: error:: ServerErrorPayload ;
30
+
29
31
#[ derive( Debug , Clone , Copy , PartialEq , Eq ) ]
30
32
pub enum Compression {
31
33
NoCompression ,
@@ -224,10 +226,13 @@ impl PartitionClient {
224
226
225
227
if let Some ( e) = topic. error {
226
228
// TODO: Add retry logic
227
- return Err ( Error :: ServerError (
228
- e,
229
- format ! ( "error getting metadata for topic \" {}\" " , self . topic) ,
230
- ) ) ;
229
+ return Err ( Error :: ServerError {
230
+ protocol_error : e,
231
+ error_message : None ,
232
+ context : Some ( ServerErrorContext :: Topic ( self . topic . clone ( ) ) ) ,
233
+ payload : None ,
234
+ is_virtual : false ,
235
+ } ) ;
231
236
}
232
237
233
238
let partition = topic
@@ -243,23 +248,29 @@ impl PartitionClient {
243
248
244
249
if let Some ( e) = partition. error {
245
250
// TODO: Add retry logic
246
- return Err ( Error :: ServerError (
247
- e,
248
- format ! (
249
- "error getting metadata for partition {} in topic \" {}\" " ,
250
- self . partition, self . topic
251
- ) ,
252
- ) ) ;
251
+ return Err ( Error :: ServerError {
252
+ protocol_error : e,
253
+ error_message : None ,
254
+ context : Some ( ServerErrorContext :: Partition (
255
+ self . topic . clone ( ) ,
256
+ self . partition ,
257
+ ) ) ,
258
+ payload : None ,
259
+ is_virtual : false ,
260
+ } ) ;
253
261
}
254
262
255
263
if partition. leader_id . 0 == -1 {
256
- return Err ( Error :: ServerError (
257
- ProtocolError :: LeaderNotAvailable ,
258
- format ! (
259
- "Leader unknown for partition {} and topic \" {}\" " ,
260
- self . partition, self . topic
261
- ) ,
262
- ) ) ;
264
+ return Err ( Error :: ServerError {
265
+ protocol_error : ProtocolError :: LeaderNotAvailable ,
266
+ error_message : None ,
267
+ context : Some ( ServerErrorContext :: Partition (
268
+ self . topic . clone ( ) ,
269
+ self . partition ,
270
+ ) ) ,
271
+ payload : None ,
272
+ is_virtual : true ,
273
+ } ) ;
263
274
}
264
275
265
276
info ! (
@@ -310,13 +321,19 @@ impl BrokerCache for &PartitionClient {
310
321
if leader != leader_self {
311
322
// this might happen if the leader changed after we got the hint from a arbitrary broker and this specific
312
323
// metadata call.
313
- return Err ( Error :: ServerError (
314
- ProtocolError :: NotLeaderOrFollower ,
315
- format ! (
316
- "Broker {} which we determined as leader thinks there is another leader {}" ,
317
- leader, leader_self
318
- ) ,
319
- ) ) ;
324
+ return Err ( Error :: ServerError {
325
+ protocol_error : ProtocolError :: NotLeaderOrFollower ,
326
+ error_message : None ,
327
+ context : Some ( ServerErrorContext :: Partition (
328
+ self . topic . clone ( ) ,
329
+ self . partition ,
330
+ ) ) ,
331
+ payload : Some ( ServerErrorPayload :: LeaderForward {
332
+ broker : leader,
333
+ new_leader : leader_self,
334
+ } ) ,
335
+ is_virtual : true ,
336
+ } ) ;
320
337
}
321
338
322
339
* current_broker = Some ( Arc :: clone ( & broker) ) ;
@@ -365,10 +382,17 @@ where
365
382
match error {
366
383
Error :: Request ( RequestError :: Poisoned ( _) | RequestError :: IO ( _) )
367
384
| Error :: Connection ( _) => broker_cache. invalidate ( ) . await ,
368
- Error :: ServerError ( ProtocolError :: InvalidReplicationFactor , _) => { }
369
- Error :: ServerError ( ProtocolError :: LeaderNotAvailable , _) => { }
370
- Error :: ServerError ( ProtocolError :: OffsetNotAvailable , _) => { }
371
- Error :: ServerError ( ProtocolError :: NotLeaderOrFollower , _) => {
385
+ Error :: ServerError {
386
+ protocol_error :
387
+ ProtocolError :: InvalidReplicationFactor
388
+ | ProtocolError :: LeaderNotAvailable
389
+ | ProtocolError :: OffsetNotAvailable ,
390
+ ..
391
+ } => { }
392
+ Error :: ServerError {
393
+ protocol_error : ProtocolError :: NotLeaderOrFollower ,
394
+ ..
395
+ } => {
372
396
broker_cache. invalidate ( ) . await ;
373
397
}
374
398
_ => {
@@ -490,7 +514,13 @@ fn process_produce_response(
490
514
}
491
515
492
516
match response. error {
493
- Some ( e) => Err ( Error :: ServerError ( e, Default :: default ( ) ) ) ,
517
+ Some ( e) => Err ( Error :: ServerError {
518
+ protocol_error : e,
519
+ error_message : None ,
520
+ context : Some ( ServerErrorContext :: Partition ( topic. to_owned ( ) , partition) ) ,
521
+ payload : None ,
522
+ is_virtual : false ,
523
+ } ) ,
494
524
None => Ok ( ( 0 ..num_records)
495
525
. map ( |x| x + response. base_offset . 0 )
496
526
. collect ( ) ) ,
@@ -551,7 +581,16 @@ fn process_fetch_response(
551
581
}
552
582
553
583
if let Some ( err) = response_partition. error_code {
554
- return Err ( Error :: ServerError ( err, String :: new ( ) ) ) ;
584
+ return Err ( Error :: ServerError {
585
+ protocol_error : err,
586
+ error_message : None ,
587
+ context : Some ( ServerErrorContext :: Partition ( topic. to_owned ( ) , partition) ) ,
588
+ payload : Some ( ServerErrorPayload :: FetchState {
589
+ high_watermark : response_partition. high_watermark . 0 ,
590
+ last_stable_offset : response_partition. last_stable_offset . map ( |x| x. 0 ) ,
591
+ } ) ,
592
+ is_virtual : false ,
593
+ } ) ;
555
594
}
556
595
557
596
Ok ( response_partition)
@@ -657,7 +696,13 @@ fn process_list_offsets_response(
657
696
}
658
697
659
698
match response_partition. error_code {
660
- Some ( err) => Err ( Error :: ServerError ( err, String :: new ( ) ) ) ,
699
+ Some ( err) => Err ( Error :: ServerError {
700
+ protocol_error : err,
701
+ error_message : None ,
702
+ context : Some ( ServerErrorContext :: Partition ( topic. to_owned ( ) , partition) ) ,
703
+ payload : None ,
704
+ is_virtual : false ,
705
+ } ) ,
661
706
None => Ok ( response_partition) ,
662
707
}
663
708
}
@@ -737,7 +782,13 @@ fn process_delete_records_response(
737
782
}
738
783
739
784
match response_partition. error {
740
- Some ( err) => Err ( Error :: ServerError ( err, String :: new ( ) ) ) ,
785
+ Some ( err) => Err ( Error :: ServerError {
786
+ protocol_error : err,
787
+ error_message : None ,
788
+ context : Some ( ServerErrorContext :: Partition ( topic. to_owned ( ) , partition) ) ,
789
+ payload : None ,
790
+ is_virtual : false ,
791
+ } ) ,
741
792
None => Ok ( response_partition) ,
742
793
}
743
794
}
0 commit comments