@@ -41,11 +41,12 @@ use std::ops::Range;
41
41
use std:: pin:: Pin ;
42
42
use std:: sync:: Arc ;
43
43
use std:: task:: { Context , Poll } ;
44
+ use std:: time:: Duration ;
44
45
45
46
use futures:: future:: { BoxFuture , Fuse , FusedFuture , FutureExt } ;
46
47
use futures:: Stream ;
47
48
use pin_project_lite:: pin_project;
48
- use tracing:: trace;
49
+ use tracing:: { trace, warn } ;
49
50
50
51
use crate :: {
51
52
client:: {
@@ -139,6 +140,7 @@ impl StreamConsumerBuilder {
139
140
min_batch_size : self . min_batch_size ,
140
141
max_batch_size : self . max_batch_size ,
141
142
next_offset : None ,
143
+ next_backoff : None ,
142
144
start_offset : self . start_offset ,
143
145
terminated : false ,
144
146
last_high_watermark : -1 ,
@@ -208,6 +210,8 @@ pin_project! {
208
210
209
211
next_offset: Option <i64 >,
210
212
213
+ next_backoff: Option <Duration >,
214
+
211
215
terminated: bool ,
212
216
213
217
last_high_watermark: i64 ,
@@ -236,16 +240,21 @@ impl Stream for StreamConsumer {
236
240
let start_offset = * this. start_offset ;
237
241
let bytes = ( * this. min_batch_size ) ..( * this. max_batch_size ) ;
238
242
let max_wait_ms = * this. max_wait_ms ;
243
+ let next_backoff = std:: mem:: take ( this. next_backoff ) ;
239
244
let client = Arc :: clone ( this. client ) ;
240
245
241
246
trace ! ( ?start_offset, ?next_offset, "Fetching records at offset" ) ;
242
247
243
248
* this. fetch_fut = FutureExt :: fuse ( Box :: pin ( async move {
249
+ if let Some ( backoff) = next_backoff {
250
+ tokio:: time:: sleep ( backoff) . await ;
251
+ }
252
+
244
253
let offset = match next_offset {
245
254
Some ( x) => x,
246
255
None => match start_offset {
247
256
StartOffset :: Earliest => client. get_offset ( OffsetAt :: Earliest ) . await ?,
248
- StartOffset :: Latest => dbg ! ( client. get_offset( OffsetAt :: Latest ) . await ?) ,
257
+ StartOffset :: Latest => client. get_offset ( OffsetAt :: Latest ) . await ?,
249
258
StartOffset :: At ( x) => x,
250
259
} ,
251
260
} ;
@@ -296,6 +305,19 @@ impl Stream for StreamConsumer {
296
305
) => {
297
306
// wipe offset and try again
298
307
* this. next_offset = None ;
308
+
309
+ // This will only happen if retention / deletions happen after we've asked for the earliest/latest
310
+ // offset and our "fetch" request. This should be a rather rare event, but if something is horrible
311
+ // wrong in our cluster (e.g. some actor is spamming "delete" requests) then let's at least backoff
312
+ // a bit.
313
+ let backoff_secs = 1 ;
314
+ warn ! (
315
+ start_offset=?this. start_offset,
316
+ backoff_secs,
317
+ "Records are gone between ListOffsets and Fetch, backoff a bit" ,
318
+ ) ;
319
+ * this. next_backoff = Some ( Duration :: from_secs ( backoff_secs) ) ;
320
+
299
321
continue ;
300
322
}
301
323
// if we have an offset, terminate the stream
@@ -661,8 +683,9 @@ mod tests {
661
683
662
684
let unwrap = |e : Result < Option < Result < _ , _ > > , _ > | e. unwrap ( ) . unwrap ( ) . unwrap ( ) ;
663
685
686
+ // need a solid timeout here because we have simulated an error that caused a backoff
664
687
let ( record_and_offset, high_watermark) =
665
- unwrap ( tokio:: time:: timeout ( Duration :: from_micros ( 10 ) , stream. next ( ) ) . await ) ;
688
+ unwrap ( tokio:: time:: timeout ( Duration :: from_secs ( 2 ) , stream. next ( ) ) . await ) ;
666
689
667
690
assert_eq ! ( record_and_offset. offset, 2 ) ;
668
691
assert_eq ! ( high_watermark, 2 ) ;
@@ -704,8 +727,9 @@ mod tests {
704
727
705
728
let unwrap = |e : Result < Option < Result < _ , _ > > , _ > | e. unwrap ( ) . unwrap ( ) . unwrap ( ) ;
706
729
730
+ // need a solid timeout here because we have simulated an error that caused a backoff
707
731
let ( record_and_offset, high_watermark) =
708
- unwrap ( tokio:: time:: timeout ( Duration :: from_micros ( 10 ) , stream. next ( ) ) . await ) ;
732
+ unwrap ( tokio:: time:: timeout ( Duration :: from_secs ( 2 ) , stream. next ( ) ) . await ) ;
709
733
710
734
assert_eq ! ( record_and_offset. offset, 2 ) ;
711
735
assert_eq ! ( high_watermark, 2 ) ;
0 commit comments