Skip to content

Commit 69b1865

Browse files
authored
Fix deadlock in rd_kafka_reset_any_broker_down_reported (#5156)
* Fix deadlock in `rd_kafka_reset_any_broker_down_reported` by releasing and reacquiring the broker lock, to respect lock ordering and avoid deadlocks * Moved resetting the broker down reported field after setting the broker state for consistency with rest of the counters when unlocking
1 parent af1cd3b commit 69b1865

File tree

1 file changed

+15
-6
lines changed

1 file changed

+15
-6
lines changed

src/rdkafka_broker.c

Lines changed: 15 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -346,8 +346,9 @@ rd_bool_t rd_kafka_broker_ApiVersion_at_least_no_lock(rd_kafka_broker_t *rkb,
346346
* @locality broker thread
347347
*/
348348
void rd_kafka_broker_set_state(rd_kafka_broker_t *rkb, int state) {
349-
rd_bool_t trigger_monitors = rd_false,
350-
skip_broker_down = rkb->rkb_c.skip_broker_down;
349+
rd_bool_t trigger_monitors = rd_false,
350+
reset_any_broker_down_reported = rd_false,
351+
skip_broker_down = rkb->rkb_c.skip_broker_down;
351352

352353
if ((int)rkb->rkb_state == state)
353354
return;
@@ -397,6 +398,9 @@ void rd_kafka_broker_set_state(rd_kafka_broker_t *rkb, int state) {
397398
!rd_kafka_broker_state_is_up(rkb->rkb_state)) {
398399
/* ~Up -> Up */
399400
if (!RD_KAFKA_BROKER_IS_LOGICAL(rkb)) {
401+
rd_atomic32_add(&rkb->rkb_rk->rk_broker_up_cnt,
402+
1);
403+
400404
/* If at least one broker connects we reset
401405
* the down counter to try again with rest of
402406
* brokers. Otherwise, a single broker
@@ -405,10 +409,7 @@ void rd_kafka_broker_set_state(rd_kafka_broker_t *rkb, int state) {
405409
* of brokers was restored but they didn't
406410
* attempt a re-connection because of sparse
407411
* broker connections. */
408-
rd_atomic32_add(&rkb->rkb_rk->rk_broker_up_cnt,
409-
1);
410-
rd_kafka_reset_any_broker_down_reported(
411-
rkb->rkb_rk);
412+
reset_any_broker_down_reported = rd_true;
412413
}
413414

414415
trigger_monitors = rd_true;
@@ -436,6 +437,14 @@ void rd_kafka_broker_set_state(rd_kafka_broker_t *rkb, int state) {
436437
rkb->rkb_state = state;
437438
rkb->rkb_ts_state = rd_clock();
438439

440+
if (reset_any_broker_down_reported) {
441+
rd_kafka_broker_unlock(rkb);
442+
/* Releases rkb->rkb_lock to respect
443+
* lock ordering and avoid deadlocks */
444+
rd_kafka_reset_any_broker_down_reported(rkb->rkb_rk);
445+
rd_kafka_broker_lock(rkb);
446+
}
447+
439448
if (trigger_monitors)
440449
rd_kafka_broker_trigger_monitors(rkb);
441450

0 commit comments

Comments
 (0)