@@ -31,7 +31,7 @@ use std::{
31
31
use tokio:: sync:: Mutex ;
32
32
use tracing:: { error, info} ;
33
33
34
- use super :: error:: ServerErrorResponse ;
34
+ use super :: { error:: ServerErrorResponse , metadata_cache :: MetadataCacheGeneration } ;
35
35
36
36
/// How strongly a [`PartitionClient`] is bound to a partition.
37
37
///
@@ -99,6 +99,13 @@ pub enum OffsetAt {
99
99
Latest ,
100
100
}
101
101
102
+ #[ derive( Debug , Default ) ]
103
+ struct CurrentBroker {
104
+ broker : Option < BrokerConnection > ,
105
+ gen_leader_from_arbitrary : Option < MetadataCacheGeneration > ,
106
+ gen_leader_from_self : Option < MetadataCacheGeneration > ,
107
+ }
108
+
102
109
/// Many operations must be performed on the leader for a partition
103
110
///
104
111
/// Additionally a partition is the unit of concurrency within Kafka
@@ -117,7 +124,7 @@ pub struct PartitionClient {
117
124
backoff_config : BackoffConfig ,
118
125
119
126
/// Current broker connection if any
120
- current_broker : Mutex < Option < BrokerConnection > > ,
127
+ current_broker : Mutex < CurrentBroker > ,
121
128
122
129
unknown_topic_handling : UnknownTopicHandling ,
123
130
}
@@ -140,7 +147,7 @@ impl PartitionClient {
140
147
partition,
141
148
brokers : Arc :: clone ( & brokers) ,
142
149
backoff_config : Default :: default ( ) ,
143
- current_broker : Mutex :: new ( None ) ,
150
+ current_broker : Mutex :: new ( CurrentBroker :: default ( ) ) ,
144
151
unknown_topic_handling,
145
152
} ;
146
153
@@ -312,8 +319,11 @@ impl PartitionClient {
312
319
}
313
320
314
321
/// Retrieve the broker ID of the partition leader
315
- async fn get_leader ( & self , metadata_mode : MetadataLookupMode ) -> Result < i32 > {
316
- let metadata = self
322
+ async fn get_leader (
323
+ & self ,
324
+ metadata_mode : MetadataLookupMode ,
325
+ ) -> Result < ( i32 , Option < MetadataCacheGeneration > ) > {
326
+ let ( metadata, gen) = self
317
327
. brokers
318
328
. request_metadata ( metadata_mode, Some ( vec ! [ self . topic. clone( ) ] ) )
319
329
. await ?;
@@ -379,7 +389,7 @@ impl PartitionClient {
379
389
leader=partition. leader_id. 0 ,
380
390
"Detected leader" ,
381
391
) ;
382
- Ok ( partition. leader_id . 0 )
392
+ Ok ( ( partition. leader_id . 0 , gen ) )
383
393
}
384
394
}
385
395
@@ -391,7 +401,7 @@ impl BrokerCache for &PartitionClient {
391
401
392
402
async fn get ( & self ) -> Result < Arc < Self :: R > > {
393
403
let mut current_broker = self . current_broker . lock ( ) . await ;
394
- if let Some ( broker) = & * current_broker {
404
+ if let Some ( broker) = & current_broker. broker {
395
405
return Ok ( Arc :: clone ( broker) ) ;
396
406
}
397
407
@@ -410,22 +420,29 @@ impl BrokerCache for &PartitionClient {
410
420
// * A subsequent query is performed against the leader that does not use the cached entry - this validates
411
421
// the correctness of the cached entry.
412
422
//
413
- let leader = self . get_leader ( MetadataLookupMode :: CachedArbitrary ) . await ?;
423
+ let ( leader, gen_leader_from_arbitrary) =
424
+ self . get_leader ( MetadataLookupMode :: CachedArbitrary ) . await ?;
414
425
let broker = match self . brokers . connect ( leader) . await {
415
426
Ok ( Some ( c) ) => Ok ( c) ,
416
427
Ok ( None ) => {
417
- self . brokers . invalidate_metadata_cache (
418
- "partition client: broker that is leader is unknown" ,
419
- ) ;
428
+ if let Some ( gen) = gen_leader_from_arbitrary {
429
+ self . brokers . invalidate_metadata_cache (
430
+ "partition client: broker that is leader is unknown" ,
431
+ gen,
432
+ ) ;
433
+ }
420
434
Err ( Error :: InvalidResponse ( format ! (
421
435
"Partition leader {} not found in metadata response" ,
422
436
leader
423
437
) ) )
424
438
}
425
439
Err ( e) => {
426
- self . brokers . invalidate_metadata_cache (
427
- "partition client: error connecting to partition leader" ,
428
- ) ;
440
+ if let Some ( gen) = gen_leader_from_arbitrary {
441
+ self . brokers . invalidate_metadata_cache (
442
+ "partition client: error connecting to partition leader" ,
443
+ gen,
444
+ ) ;
445
+ }
429
446
Err ( e. into ( ) )
430
447
}
431
448
} ?;
@@ -441,14 +458,17 @@ impl BrokerCache for &PartitionClient {
441
458
//
442
459
// Because this check requires the most up-to-date metadata from a specific broker, do not accept a cached
443
460
// metadata response.
444
- let leader_self = self
461
+ let ( leader_self, gen_leader_from_self ) = self
445
462
. get_leader ( MetadataLookupMode :: SpecificBroker ( Arc :: clone ( & broker) ) )
446
463
. await ?;
447
464
if leader != leader_self {
448
- // The cached metadata identified an incorrect leader - it is stale and should be refreshed.
449
- self . brokers . invalidate_metadata_cache (
450
- "partition client: broker that should be leader does treat itself as a leader" ,
451
- ) ;
465
+ if let Some ( gen) = gen_leader_from_self {
466
+ // The cached metadata identified an incorrect leader - it is stale and should be refreshed.
467
+ self . brokers . invalidate_metadata_cache (
468
+ "partition client: broker that should be leader does treat itself as a leader" ,
469
+ gen,
470
+ ) ;
471
+ }
452
472
453
473
// this might happen if the leader changed after we got the hint from a arbitrary broker and this specific
454
474
// metadata call.
@@ -464,7 +484,11 @@ impl BrokerCache for &PartitionClient {
464
484
} ) ;
465
485
}
466
486
467
- * current_broker = Some ( Arc :: clone ( & broker) ) ;
487
+ * current_broker = CurrentBroker {
488
+ broker : Some ( Arc :: clone ( & broker) ) ,
489
+ gen_leader_from_arbitrary,
490
+ gen_leader_from_self,
491
+ } ;
468
492
469
493
info ! (
470
494
topic=%self . topic,
@@ -482,8 +506,17 @@ impl BrokerCache for &PartitionClient {
482
506
reason,
483
507
"Invaliding cached leader" ,
484
508
) ;
485
- self . brokers . invalidate_metadata_cache ( reason) ;
486
- * self . current_broker . lock ( ) . await = None
509
+
510
+ let mut current_broker = self . current_broker . lock ( ) . await ;
511
+
512
+ if let Some ( gen) = current_broker. gen_leader_from_arbitrary {
513
+ self . brokers . invalidate_metadata_cache ( reason, gen) ;
514
+ }
515
+ if let Some ( gen) = current_broker. gen_leader_from_self {
516
+ self . brokers . invalidate_metadata_cache ( reason, gen) ;
517
+ }
518
+
519
+ current_broker. broker = None
487
520
}
488
521
}
489
522
0 commit comments