Skip to content

Conversation

domodwyer
Copy link
Contributor

This is a big one that I couldn't break up into anything smaller - sorry!

Happy to talk through the changes if needed, if not, grab a cuppa! 🍵 Large portions of the change are comments, actual LoC change is a small percentage.


The main aims of this PR are outlined below, in order of priority:

Make aggregating batches, and writing already full batches asynchronous w.r.t each other

Currently this is not the case. A call to produce() with space in the aggregator goes like this:

  1. Acquires the lock covering the aggregator state
  2. Successfully adds the data to the aggregator
  3. Wait for the linger timeout
  4. Acquire the lock over the aggregator state again (same lock as 1.)
  5. Whilst holding the lock, flush the batch.

Because the lock is held while the flush happens, all callers to produce() begin queuing up, even though their payloads would be buffered. This means they needlessly incur the latency of the protocol overhead/serialisation/network I/O, only to then have to try and stuff as many payloads as possible into the buffer before the next 5ms linger timer blocks aggregation again.

In an environment where writing to kafka incurs a RTT of ~50ms, we spend ~5ms aggregating payloads before blocking for ~50ms, repeated indefinitely.

While most Kafka writes are quick, we observe long tail latencies:

image

After the changes, flushing happens asynchronously, without having to hold a lock over the aggregator state. This allows ~5ms of payloads to be batched up and dispatched, and immediately the next batch begins being built up, allowing near continuous buffer building. This should reduce the overall latency of calls to produce() and increase the throughput of messages to Kafka.

I've been careful to construct the buffering / flushing such that there is no need for an async mutex - given that the mutex is in the buffering hot path, replacing it with a non-async parking_lot mutex (which has far lower overhead) reduces the latency of calls that simply add the payload to an im-memory buffer. This should slightly increase the number of messages we can buffer in the ~5ms linger period.

Flush result readers do not contend with buffering produce() callers

Previously all calls to produce() would lead the task to waiting for either the linger to fire, or the flush to complete:

tokio::select! {
_ = tokio::time::sleep(self.linger) => {},
r = result_slot.receive() => return extract(r, tag),
}

However given the low default linger time (5ms) relative to the average RTT latency of a flush (especially under high load), it's probable most, if not all waiters will observe their linger timer expiring before the flush completes and the result is broadcast to the result_slot. I've observed many Already flushed logs in our prod, with no Insufficient capacity in aggregator - flushing to cause the flush in the first place, indicating this happening quite often.

Once the linger timer fires, all the callers proceed to queue up to acquire the aggregator mutex again, unnecessarily serialising readers when their result is available and adding further contention with callers of produce() hoping to buffer their payloads in parallel.

After this PR, only one caller will attempt to acquire the aggregator mutex to initiate a flush, and the rest of the callers can wait on their result broadcast only.

Removing this lock contention allows for more produce() calls to buffer their payloads within the 5ms linger, increasing the message sizes over the wire.

Reading the broadcast result has also been made cheaper by reducing contention of the BroadcastOnce primitive with many readers, switching an exclusive mutex to a many-readers lock - they now no longer have to queue up to sequentially read (+ clone) the results, which should marginally reduce the per-call latency to produce() too.

Useless Wakeups

In the above linger-expired-acquire-lock situation, the readers all .await on the timer, then .await on the aggregator mutex, only to then queue up on the mutex synchronising the peek() call. This causes many useless wakeups only to immediately hit the next Pending await point, as well as unnecessary thread blocking queuing on the non-async results mutex.

Reducing the number of awaits in this path should help reduce the (small) tokio overhead incurred when parking/waking tasks.

Behavioural Changes

None of these changes necessitate a breaking external API change 🎉

Back-pressure

Previously rskafka would allow one single buffer flush to be in-flight. This (very!) effectively applied back-pressure to clients, slowing them down.

After this change, there is no back-pressure and limiting the number of outstanding produce() calls is the responsibility of the caller. Apps can throw as much as they want at rskafka, and it'll do the best it can to keep up, with produce() latencies increasing as load increases.

It is, however, relatively simple to add back-pressure to rskafka using a semaphore if desired.

Tokio

This requires the is_finished() method on the task JoinHandle, which was added in 1.19 and required a min tokio version bump.


  • perf: async batch flushing & lock contention (9e9d848)

    Rewrites the management of the aggregation buffer to enable asynchronous 
    flushing of batched writes, while continuing to build up the next buffer of
    writes in parallel.
    
    A behavioural change of note: previously there was (synchronous) back-pressure
    applied when a batch was being flushed, and there could only ever be at most 1
    batch of writes outstanding. After this commit this back-pressure is removed
    as a side effect of async flushing, and there is no upper bound on the number
    of in-flight batch writes enforced. Back-pressure can easily be placed back
    into rskafka if it is deemed the responsibility of rskafka, rather than the
    caller, to limit the number of outstanding produce() calls.
    
    This commit also eliminates lock contention between the callers attempting to
    add to the batch, and those that are waiting for their write result, and also
    minimises the spurious task wakeups caused by all callers waiting on the
    linger timer, before (likely) moving on to wait on the async mutex, before
    finally being able to read their write result over the broadcast channel (also
    a mutex). Now one caller performs a linger dance, while all the others wait
    exclusively on their broadcast handle for the results, avoiding having to wait
    on the aggregation mutex.
    
  • perf: use RwLock for BroadcastOnce (1d5be60)

    Prior to this commit, all the readers (holders of BroadcastOnceReceiver) would
    contend with each other to read the value that has been broadcast. The
    BroadcastOnce type restricts usage to one writer, with many readers, making
    the case of >1 reader contending to read the result common by design.
    
    A RwLock eliminates the reader contention, preventing the readers from queuing
    up to read their value - it is now a pair of cheap atomic inc/sub to read the
    value, in parallel, for each reader.
    

@domodwyer domodwyer requested a review from crepererum August 25, 2022 14:42
@domodwyer domodwyer self-assigned this Aug 25, 2022
Rewrites the management of the aggregation buffer to enable asynchronous
flushing of batched writes, while continuing to build up the next buffer
of writes in parallel.

A behavioural change of note: previously there was (synchronous)
back-pressure applied when a batch was being flushed, and there could
only ever be at most 1 batch of writes outstanding. After this commit
this back-pressure is removed as a side effect of async flushing, and
there is no upper bound on the number of in-flight batch writes
enforced. Back-pressure can easily be placed back into rskafka if it is
deemed the responsibility of rskafka, rather than the caller, to limit
the number of outstanding produce() calls.

This commit also eliminates lock contention between the callers
attempting to add to the batch, and those that are waiting for their
write result, and also minimises the spurious task wakeups caused by all
callers waiting on the linger timer, before (likely) moving on to wait
on the async mutex, before finally being able to read their write result
over the broadcast channel (also a mutex). Now one caller performs a
linger dance, while all the others wait exclusively on their broadcast
handle for the results, avoiding having to wait on the aggregation
mutex.
Prior to this commit, all the readers (holders of BroadcastOnceReceiver)
would contend with each other to read the value that has been broadcast.
The BroadcastOnce type restricts usage to one writer, with many readers,
making the case of >1 reader contending to read the result common by
design.

A RwLock eliminates the reader contention, preventing the readers from
queuing up to read their value - it is now a pair of cheap atomic
inc/sub to read the value, in parallel, for each reader.
Copy link
Collaborator

@crepererum crepererum left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I like the split between "aggregate" and "send batch" that you perform here. I have a couple of questions and remarks.

Comment on lines +477 to +478
// Remove the batch, temporarily swapping it for a None until a new
// batch is built.
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is NOT panic-safe. If batch.background_flush which calls aggregator.flush panics than the entire aggregator is poisoned forever1. I think we should allow the aggregator to panic and only error the affected inputs instead of all future actions.

Footnotes

  1. This BTW is the reason why the stdlib mutexes implement poisoning.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That's right - I did consider panics here, but took the view that a panicking aggregator is an exceptional event (i.e. not an recoverable error) and should be propagated up.

We're effectively implementing poisoning here by having the next call take() a None and exploding ourselves - I think it's reasonable to expect panics to be exceptional / fatal, but open to ideas.

// batch, verify this BatchBuilder is the batch it is indenting to
// flush.
if let Some(token) = flusher_token {
if token != self.flush_clock {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can this only happen when there are manual flushes involved or are there other cases as well?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This block of code intentionally doesn't care why the tokens don't match!

But to answer properly, this can happen two ways:

return FlushResult::Ok(Self::new(self.aggregator), None);
}

let handle = tokio::spawn({
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why do we need yet another task here? Isn't there already one that drives the linger and the flushing? Or is this so we have also cancellation safety if there are manual flushes?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This decouples the flushing I/O (in the task), and the instantiation of the new batch using the aggregator (in the return below, with the task handle). This lets all new produce() calls progress. The task handle is stored, and later used to wait for all batches to complete flushing when calling the manual BatchProducer::flush().

We need the buffer to be ready immediately to begin accruing payloads, but do not wish to wait for the flush to complete. Unfortunately having the aggregator internal to the client makes this dance tricky.

//! ```
use std::sync::Arc;
use std::time::Duration;

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There is a flow-chart on the top of this file (GitHub doesn't let me comment there). This one only shows the user-visible data flow and this is all good. From an rskafka developer PoV, I would welcome if the interplay between the producer, the aggregator and the batches (which now contain their own background task) would be illustrated somewhere.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'll knock one up this afternoon and PR it - I think it'll take me a bit of digging around (you're right, the interplay is complex!)

When dropping the ProducerInner (likely caused by dropping the producer
itself), terminate all background flush tasks to avoid them outliving
the producer itself.
@domodwyer domodwyer merged commit 6c887e5 into main Aug 29, 2022
@domodwyer domodwyer deleted the dom/async-flush branch August 29, 2022 12:18
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants