-
Notifications
You must be signed in to change notification settings - Fork 44
feat: allow StreamConsumer
to start at "earliest"/"latest" offset
#104
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
b8875e3
to
14e701b
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looks good to me, had some minor comments
// Remember used offset (might be overwritten if there was any data) so we don't refetch the | ||
// earliest / latest offset for every try. Also fetching the latest offset might be racy otherwise, | ||
// since we'll never be in a position where the latest one can actually be fetched. | ||
*this.next_offset = Some(used_offset); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't understand this, I'm not sure under what circumstance Kafka would return no records, but if this did occur, wouldn't we just get stuck in an infinite loop?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
So the issue here is that "latest" offset: If you have a new or empty partition, then Kafka returns the start of that empty interval (e.g. 0
if it's an empty partition) if you ask it, also see
Lines 241 to 251 in 15f424f
assert_eq!( | |
partition_client | |
.get_offset(OffsetAt::Earliest) | |
.await | |
.unwrap(), | |
0 | |
); | |
assert_eq!( | |
partition_client.get_offset(OffsetAt::Latest).await.unwrap(), | |
0 | |
); |
Now if you don't remember that offset, the time your produce something to the partition the "latest" offset (which is the latest record + 1) will also increase, so you're always chasing the tail of the partition but never fetch any records. So when you first get an answer to "what's the earliest/latest offset?" you need to remember that (except when this turns out to be wrong due to "offset out of range").
When the partition stays empty, then sure this loops forever and asks Kafka again and again for data. However this is not a hot loop since the fetch request has a timeout attached, so the broker will wait some milliseconds until it tells us "there are no records here, like the last X time you've asked me".
stream: mpsc::Receiver<Record>, | ||
next_err: Option<Error>, | ||
buffer: Vec<Record>, | ||
range: (i64, i64), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The interaction with the offset tracking in buffer
is perhaps a little funky. Perhaps this might be easier to understand if these were tracked automatically, and we added a delete records API... I dunno 😅
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You mean you wanna mock a full Kafka broker? 😉
This is required to provide a race-free way to start at the earliest or latest offset, because fetching the offset and then creating a stream might fail because in the meantime the Kafka retention policy might already have removed the offsets in question.
Co-authored-by: Raphael Taylor-Davies <[email protected]>
1dc9501
to
36e89b2
Compare
36e89b2
to
51ac55a
Compare
This is required to provide a race-free way to start at the earliest or
latest offset, because fetching the offset and then creating a stream
might fail because in the meantime the Kafka retention policy might
already have removed the offsets in question.