Skip to content

Commit b4d7f8e

Browse files
committed
Rate limit IO-based queue wakeups to linger.ms (#2509)
1 parent a12b909 commit b4d7f8e

File tree

3 files changed

+24
-9
lines changed

3 files changed

+24
-9
lines changed

src/rdkafka_partition.c

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -666,7 +666,7 @@ void rd_kafka_toppar_enq_msg (rd_kafka_toppar_t *rktp, rd_kafka_msg_t *rkm) {
666666
rd_kafka_toppar_unlock(rktp);
667667

668668
if (wakeup_q) {
669-
rd_kafka_q_yield(wakeup_q);
669+
rd_kafka_q_yield(wakeup_q, rd_true/*rate-limit*/);
670670
rd_kafka_q_destroy(wakeup_q);
671671
}
672672
}

src/rdkafka_queue.c

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -256,7 +256,7 @@ int rd_kafka_q_move_cnt (rd_kafka_q_t *dstq, rd_kafka_q_t *srcq,
256256

257257
if (!dstq->rkq_fwdq && !srcq->rkq_fwdq) {
258258
if (cnt > 0 && dstq->rkq_qlen == 0)
259-
rd_kafka_q_io_event(dstq);
259+
rd_kafka_q_io_event(dstq, rd_false/*no rate-limiting*/);
260260

261261
/* Optimization, if 'cnt' is equal/larger than all
262262
* items of 'srcq' we can move the entire queue. */
@@ -730,6 +730,8 @@ void rd_kafka_q_io_event_enable (rd_kafka_q_t *rkq, int fd,
730730
qio->fd = fd;
731731
qio->size = size;
732732
qio->payload = (void *)(qio+1);
733+
qio->ts_rate = rkq->rkq_rk->rk_conf.buffering_max_us;
734+
qio->ts_last = 0;
733735
qio->event_cb = NULL;
734736
qio->event_cb_opaque = NULL;
735737
memcpy(qio->payload, payload, size);

src/rdkafka_queue.h

Lines changed: 20 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -90,6 +90,8 @@ struct rd_kafka_q_io {
9090
int fd;
9191
void *payload;
9292
size_t size;
93+
rd_ts_t ts_rate; /**< How often the IO wakeup may be performed (us) */
94+
rd_ts_t ts_last; /**< Last IO wakeup */
9395
/* For callback-based signalling */
9496
void (*event_cb) (rd_kafka_t *rk, void *opaque);
9597
void *event_cb_opaque;
@@ -284,10 +286,12 @@ static RD_INLINE RD_UNUSED int rd_kafka_q_is_fwded (rd_kafka_q_t *rkq) {
284286
/**
285287
* @brief Trigger an IO event for this queue.
286288
*
289+
* @param rate_limit if true, rate limit IO-based wakeups.
290+
*
287291
* @remark Queue MUST be locked
288292
*/
289293
static RD_INLINE RD_UNUSED
290-
void rd_kafka_q_io_event (rd_kafka_q_t *rkq) {
294+
void rd_kafka_q_io_event (rd_kafka_q_t *rkq, rd_bool_t rate_limit) {
291295

292296
if (likely(!rkq->rkq_qio))
293297
return;
@@ -297,6 +301,15 @@ void rd_kafka_q_io_event (rd_kafka_q_t *rkq) {
297301
return;
298302
}
299303

304+
305+
if (rate_limit) {
306+
rd_ts_t now = rd_clock();
307+
if (likely(rkq->rkq_qio->ts_last + rkq->rkq_qio->ts_rate > now))
308+
return;
309+
310+
rkq->rkq_qio->ts_last = now;
311+
}
312+
300313
/* Ignore errors, not much to do anyway. */
301314
if (rd_write(rkq->rkq_qio->fd, rkq->rkq_qio->payload,
302315
(int)rkq->rkq_qio->size) == -1)
@@ -320,7 +333,7 @@ int rd_kafka_op_cmp_prio (const void *_a, const void *_b) {
320333
* @brief Wake up waiters without enqueuing an op.
321334
*/
322335
static RD_INLINE RD_UNUSED void
323-
rd_kafka_q_yield (rd_kafka_q_t *rkq) {
336+
rd_kafka_q_yield (rd_kafka_q_t *rkq, rd_bool_t rate_limit) {
324337
rd_kafka_q_t *fwdq;
325338

326339
mtx_lock(&rkq->rkq_lock);
@@ -337,12 +350,12 @@ rd_kafka_q_yield (rd_kafka_q_t *rkq) {
337350
rkq->rkq_flags |= RD_KAFKA_Q_F_YIELD;
338351
cnd_signal(&rkq->rkq_cond);
339352
if (rkq->rkq_qlen == 0)
340-
rd_kafka_q_io_event(rkq);
353+
rd_kafka_q_io_event(rkq, rate_limit);
341354

342355
mtx_unlock(&rkq->rkq_lock);
343356
} else {
344357
mtx_unlock(&rkq->rkq_lock);
345-
rd_kafka_q_yield(fwdq);
358+
rd_kafka_q_yield(fwdq, rate_limit);
346359
rd_kafka_q_destroy(fwdq);
347360
}
348361

@@ -413,7 +426,7 @@ int rd_kafka_q_enq1 (rd_kafka_q_t *rkq, rd_kafka_op_t *rko,
413426
rd_kafka_q_enq0(rkq, rko, at_head);
414427
cnd_signal(&rkq->rkq_cond);
415428
if (rkq->rkq_qlen == 1)
416-
rd_kafka_q_io_event(rkq);
429+
rd_kafka_q_io_event(rkq, rd_false/*no rate-limiting*/);
417430

418431
if (do_lock)
419432
mtx_unlock(&rkq->rkq_lock);
@@ -518,7 +531,7 @@ int rd_kafka_q_concat0 (rd_kafka_q_t *rkq, rd_kafka_q_t *srcq, int do_lock) {
518531

519532
TAILQ_CONCAT(&rkq->rkq_q, &srcq->rkq_q, rko_link);
520533
if (rkq->rkq_qlen == 0)
521-
rd_kafka_q_io_event(rkq);
534+
rd_kafka_q_io_event(rkq, rd_false/*no rate-limiting*/);
522535
rkq->rkq_qlen += srcq->rkq_qlen;
523536
rkq->rkq_qsize += srcq->rkq_qsize;
524537
cnd_signal(&rkq->rkq_cond);
@@ -559,7 +572,7 @@ void rd_kafka_q_prepend0 (rd_kafka_q_t *rkq, rd_kafka_q_t *srcq,
559572
/* Move srcq to rkq */
560573
TAILQ_MOVE(&rkq->rkq_q, &srcq->rkq_q, rko_link);
561574
if (rkq->rkq_qlen == 0 && srcq->rkq_qlen > 0)
562-
rd_kafka_q_io_event(rkq);
575+
rd_kafka_q_io_event(rkq, rd_false/*no rate-limiting*/);
563576
rkq->rkq_qlen += srcq->rkq_qlen;
564577
rkq->rkq_qsize += srcq->rkq_qsize;
565578

0 commit comments

Comments
 (0)