-
Notifications
You must be signed in to change notification settings - Fork 44
feat: producer cancellation and panic safety #113
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
chore: update to rust 1.59
) -> Result<OwnedMutexGuard<ProducerInner<A>>> { | ||
debug!(?client, "Flushing batch producer"); | ||
|
||
// Spawn a task to provide cancellation safety |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I recommend looking at the diff without whitespace https://github.com/influxdata/rskafka/pull/113/files?w=1
thiserror = "1.0" | ||
time = "0.3" | ||
tokio = { version = "1.14", default-features = false, features = ["io-util", "net", "rt", "sync", "time"] } | ||
tokio = { version = "1.14", default-features = false, features = ["io-util", "net", "rt", "sync", "time", "macros"] } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is needed for tokio::select
/// - Receivers can be created with [`BroadcastOnce::receiver`] | ||
/// - The value can be produced with [`BroadcastOnce::broadcast`] | ||
#[derive(Debug)] | ||
pub struct BroadcastOnce<T> { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
As described in #111 Shared::peak
has some quirks when used with interior mutable futures, these can be worked around, but in the interest of keeping things simple I just switch to using Notify and a shared Option
// implementation will still signal the result slot preventing flushing twice | ||
let slot = std::mem::take(&mut inner.result_slot); | ||
|
||
let (output, status_deagg) = match inner.aggregator.flush() { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We still have an issue if the aggregator panics and leaves itself in an undefined state, but that's something for a future PR
compression: Compression, | ||
) { | ||
trace!("Flushing batch producer"); | ||
) -> Result<OwnedMutexGuard<ProducerInner<A>>> { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This dance passing a lock in and out of the future is a bit meh, but it is necessary for the correctness of the too large record detection - it needs to flush the aggregator, and then try to push the record again without allowing something else to push a new record in the intervening time.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Only some small nitpicks. Good job! 💪
/// The returned future is cancellation safe in that it won't leave the [`BatchProducer`] | ||
/// in an inconsistent state, however, the provided data may or may not be produced |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
So if you have multiple produce
call sharing one aggregated write, does canceling one of the produce
calls have a side effect on the other calls? I THINK (from reading the code and our call yesterday) the answer is "no", but I wanted to double-check.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Correct, cancellation should not be able to side-effect on others anymore
Co-authored-by: Marco Neumann <[email protected]>
Builds on and is intended to replace #112. This will also replace #111 as it reworks the
BroadcastOnce
to no longer useShared
as its peek implementation is just unhelpful.As reported in https://github.com/influxdata/influxdb_iox/issues/3805 we've been observing occasional panics in production. I eventually tracked this down to be due to a lack of cancellation safety within
produce
, in particular if the write request times out the future will be cancelled (dropped), causing it to drop the lock and leave the aggregator in an inconsistent state.