@@ -3485,7 +3485,6 @@ rd_kafka_broker_op_serve(rd_kafka_broker_t *rkb, rd_kafka_op_t *rko) {
3485
3485
break ;
3486
3486
3487
3487
case RD_KAFKA_OP_CONNECT :
3488
- rd_atomic32_sub (& rkb -> rkb_rk -> rk_scheduled_connections_cnt , 1 );
3489
3488
/* Sparse connections: connection requested, transition
3490
3489
* to TRY_CONNECT state to trigger new connection. */
3491
3490
if (rkb -> rkb_state == RD_KAFKA_BROKER_STATE_INIT ) {
@@ -5671,14 +5670,17 @@ static int rd_kafka_broker_filter_never_connected(rd_kafka_broker_t *rkb,
5671
5670
return rd_atomic32_get (& rkb -> rkb_c .connects );
5672
5671
}
5673
5672
5674
- /**
5675
- * @brief Filter out brokers that aren't connecting.
5676
- */
5677
- static int rd_kafka_broker_filter_not_connecting (rd_kafka_broker_t * rkb ,
5678
- void * opaque ) {
5679
- return rkb -> rkb_state <= RD_KAFKA_BROKER_STATE_DOWN ;
5680
- }
5673
+ static void rd_kafka_connect_any_timer_cb (rd_kafka_timers_t * rkts , void * arg ) {
5674
+ const char * reason = (const char * )arg ;
5675
+ rd_kafka_t * rk = rkts -> rkts_rk ;
5676
+ if (rd_kafka_terminating (rk ))
5677
+ return ;
5681
5678
5679
+ /* Acquire the read lock for `rd_kafka_connect_any` */
5680
+ rd_kafka_rdlock (rk );
5681
+ rd_kafka_connect_any (rk , reason );
5682
+ rd_kafka_rdunlock (rk );
5683
+ }
5682
5684
5683
5685
/**
5684
5686
* @brief Sparse connections:
@@ -5693,8 +5695,6 @@ static int rd_kafka_broker_filter_not_connecting(rd_kafka_broker_t *rkb,
5693
5695
void rd_kafka_connect_any (rd_kafka_t * rk , const char * reason ) {
5694
5696
rd_kafka_broker_t * rkb ;
5695
5697
rd_ts_t suppr ;
5696
- rd_bool_t any_connecting = rd_true ;
5697
- int scheduled_connections ;
5698
5698
5699
5699
/* Don't count connections to logical brokers since they serve
5700
5700
* a specific purpose (group coordinator) and their connections
@@ -5707,33 +5707,22 @@ void rd_kafka_connect_any(rd_kafka_t *rk, const char *reason) {
5707
5707
return ;
5708
5708
5709
5709
mtx_lock (& rk -> rk_suppress .sparse_connect_lock );
5710
- rkb = rd_kafka_broker_random (
5711
- rk , -1 /*any state*/ , rd_kafka_broker_filter_not_connecting , NULL );
5712
- if (rkb )
5713
- rd_kafka_broker_destroy (
5714
- rkb ); /* refcnt from ..broker_random() */
5715
- else
5716
- any_connecting = rd_false ;
5717
5710
5718
- scheduled_connections =
5719
- rd_atomic32_get (& rk -> rk_scheduled_connections_cnt );
5720
-
5721
- if (!any_connecting && scheduled_connections == 0 )
5722
- /* Skip interval */
5723
- rd_interval_reset (& rk -> rk_suppress .sparse_connect_random );
5724
5711
suppr = rd_interval (& rk -> rk_suppress .sparse_connect_random ,
5725
5712
rk -> rk_conf .sparse_connect_intvl * 1000 , 0 );
5713
+ mtx_unlock (& rk -> rk_suppress .sparse_connect_lock );
5726
5714
5727
5715
if (suppr <= 0 ) {
5728
5716
rd_kafka_dbg (rk , BROKER | RD_KAFKA_DBG_GENERIC , "CONNECT" ,
5729
5717
"Not selecting any broker for cluster connection: "
5730
- "still suppressed for %" PRId64
5731
- "ms, "
5732
- "any broker connecting: %s, "
5733
- "scheduled connections %d: %s" ,
5734
- - suppr / 1000 , RD_STR_ToF (any_connecting ),
5735
- scheduled_connections , reason );
5736
- goto done ;
5718
+ "still suppressed for %" PRId64 "ms: %s" ,
5719
+ - suppr / 1000 , reason );
5720
+ /* Retry after interval + 1ms has passed */
5721
+ rd_kafka_timer_start_oneshot (
5722
+ & rk -> rk_timers , & rk -> rk_suppress .sparse_connect_random_tmr ,
5723
+ rd_false /* don't restart */ , 1000LL - suppr ,
5724
+ rd_kafka_connect_any_timer_cb , (void * )reason );
5725
+ return ;
5737
5726
}
5738
5727
5739
5728
/* First pass: only match brokers never connected to,
@@ -5755,7 +5744,7 @@ void rd_kafka_connect_any(rd_kafka_t *rk, const char *reason) {
5755
5744
rd_kafka_dbg (rk , BROKER | RD_KAFKA_DBG_GENERIC , "CONNECT" ,
5756
5745
"Cluster connection already in progress: %s" ,
5757
5746
reason );
5758
- goto done ;
5747
+ return ;
5759
5748
}
5760
5749
5761
5750
rd_rkb_dbg (rkb , BROKER | RD_KAFKA_DBG_GENERIC , "CONNECT" ,
@@ -5766,8 +5755,6 @@ void rd_kafka_connect_any(rd_kafka_t *rk, const char *reason) {
5766
5755
rd_kafka_broker_schedule_connection (rkb );
5767
5756
5768
5757
rd_kafka_broker_destroy (rkb ); /* refcnt from ..broker_random() */
5769
- done :
5770
- mtx_unlock (& rk -> rk_suppress .sparse_connect_lock );
5771
5758
}
5772
5759
5773
5760
@@ -5957,11 +5944,9 @@ void rd_kafka_broker_active_toppar_del(rd_kafka_broker_t *rkb,
5957
5944
*/
5958
5945
void rd_kafka_broker_schedule_connection (rd_kafka_broker_t * rkb ) {
5959
5946
rd_kafka_op_t * rko ;
5960
- rd_atomic32_add (& rkb -> rkb_rk -> rk_scheduled_connections_cnt , 1 );
5961
5947
rko = rd_kafka_op_new (RD_KAFKA_OP_CONNECT );
5962
5948
rd_kafka_op_set_prio (rko , RD_KAFKA_PRIO_FLASH );
5963
- if (!rd_kafka_q_enq (rkb -> rkb_ops , rko ))
5964
- rd_atomic32_sub (& rkb -> rkb_rk -> rk_scheduled_connections_cnt , 1 );
5949
+ rd_kafka_q_enq (rkb -> rkb_ops , rko );
5965
5950
}
5966
5951
5967
5952
0 commit comments