Skip to content

Commit 0a6fd18

Browse files
committed
Don't let active logical broker connections hinder new connects
The logical broker connections (such as to the group coordinator) are reserved for specific use and shall not be reused for things like Metadata requests. The code to lookup a usable broker to send Metadata requests understood this, but the code to select a broker to create a new connection to if no usable connections are found (sparse connnections) did not understand this and already thought there was an active connection (the logical broker connection) and refused to set up a new one. This could lead to consumers stalling consumption if it was fetching from a single broker and that broker went down, no new connection to the cluster would be made to refresh metadata. (cherry picked from commit 3c7f14c)
1 parent 27e9186 commit 0a6fd18

File tree

3 files changed

+25
-7
lines changed

3 files changed

+25
-7
lines changed

src/rdkafka_broker.c

Lines changed: 17 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -317,11 +317,19 @@ void rd_kafka_broker_set_state (rd_kafka_broker_t *rkb, int state) {
317317

318318
if (rkb->rkb_source != RD_KAFKA_INTERNAL) {
319319
if (rd_kafka_broker_state_is_up(state) &&
320-
!rd_kafka_broker_state_is_up(rkb->rkb_state))
320+
!rd_kafka_broker_state_is_up(rkb->rkb_state)) {
321321
rd_atomic32_add(&rkb->rkb_rk->rk_broker_up_cnt, 1);
322-
else if (rd_kafka_broker_state_is_up(rkb->rkb_state) &&
323-
!rd_kafka_broker_state_is_up(state))
322+
if (RD_KAFKA_BROKER_IS_LOGICAL(rkb))
323+
rd_atomic32_add(&rkb->rkb_rk->
324+
rk_logical_broker_up_cnt, 1);
325+
326+
} else if (rd_kafka_broker_state_is_up(rkb->rkb_state) &&
327+
!rd_kafka_broker_state_is_up(state)) {
324328
rd_atomic32_sub(&rkb->rkb_rk->rk_broker_up_cnt, 1);
329+
if (RD_KAFKA_BROKER_IS_LOGICAL(rkb))
330+
rd_atomic32_sub(&rkb->rkb_rk->
331+
rk_logical_broker_up_cnt, 1);
332+
}
325333
}
326334

327335
rkb->rkb_state = state;
@@ -5234,7 +5242,12 @@ void rd_kafka_connect_any (rd_kafka_t *rk, const char *reason) {
52345242
rd_kafka_broker_t *rkb;
52355243
rd_ts_t suppr;
52365244

5237-
if (rd_atomic32_get(&rk->rk_broker_up_cnt) > 0 ||
5245+
/* Don't count connections to logical brokers since they serve
5246+
* a specific purpose (group coordinator) and their connections
5247+
* should not be reused for other purposes.
5248+
* rd_kafka_broker_random() will not return LOGICAL brokers. */
5249+
if (rd_atomic32_get(&rk->rk_broker_up_cnt) -
5250+
rd_atomic32_get(&rk->rk_logical_broker_up_cnt) > 0 ||
52385251
rd_atomic32_get(&rk->rk_broker_cnt) == 0)
52395252
return;
52405253

src/rdkafka_idempotence.c

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -143,7 +143,7 @@ int rd_kafka_idemp_request_pid (rd_kafka_t *rk, rd_kafka_broker_t *rkb,
143143
} else {
144144
/* Increase passed broker's refcount so we don't
145145
* have to check if rkb should be destroyed or not below
146-
* (broker_any_usable() returns a new reference). */
146+
* (broker_any() returns a new reference). */
147147
rd_kafka_broker_keep(rkb);
148148
}
149149

src/rdkafka_int.h

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -163,8 +163,13 @@ struct rd_kafka_s {
163163
TAILQ_HEAD(, rd_kafka_broker_s) rk_brokers;
164164
rd_list_t rk_broker_by_id; /* Fast id lookups. */
165165
rd_atomic32_t rk_broker_cnt;
166-
rd_atomic32_t rk_broker_up_cnt; /**< Number of brokers
167-
* in state >= UP */
166+
/**< Number of brokers in state >= UP */
167+
rd_atomic32_t rk_broker_up_cnt;
168+
/**< Number of logical brokers in state >= UP, this is a sub-set
169+
* of rk_broker_up_cnt. */
170+
rd_atomic32_t rk_logical_broker_up_cnt;
171+
/**< Number of brokers that are down, only includes brokers
172+
* that have had at least one connection attempt. */
168173
rd_atomic32_t rk_broker_down_cnt;
169174
/**< Logical brokers currently without an address.
170175
* Used for calculating ERR__ALL_BROKERS_DOWN. */

0 commit comments

Comments
 (0)