@@ -50,7 +50,6 @@ use std::time::Duration;
50
50
51
51
use futures:: future:: { BoxFuture , Fuse , FusedFuture , FutureExt } ;
52
52
use futures:: Stream ;
53
- use pin_project_lite:: pin_project;
54
53
use tracing:: { debug, trace, warn} ;
55
54
56
55
use crate :: {
@@ -196,61 +195,58 @@ impl FetchClient for PartitionClient {
196
195
}
197
196
}
198
197
199
- pin_project ! {
200
- /// Stream consuming data from start offset.
201
- ///
202
- /// # Error Handling
203
- /// If an error is returned by [`fetch_records`](`FetchClient::fetch_records`) then the stream will emit this error
204
- /// once and will terminate afterwards.
205
- pub struct StreamConsumer {
206
- client: Arc <dyn FetchClient >,
198
+ /// Stream consuming data from start offset.
199
+ ///
200
+ /// # Error Handling
201
+ /// If an error is returned by [`fetch_records`](`PartitionClient::fetch_records`) then the stream will emit this error
202
+ /// once and will terminate afterwards.
203
+ pub struct StreamConsumer {
204
+ client : Arc < dyn FetchClient > ,
207
205
208
- min_batch_size: i32 ,
206
+ min_batch_size : i32 ,
209
207
210
- max_batch_size: i32 ,
208
+ max_batch_size : i32 ,
211
209
212
- max_wait_ms: i32 ,
210
+ max_wait_ms : i32 ,
213
211
214
- start_offset: StartOffset ,
212
+ start_offset : StartOffset ,
215
213
216
- next_offset: Option <i64 >,
214
+ next_offset : Option < i64 > ,
217
215
218
- next_backoff: Option <Duration >,
216
+ next_backoff : Option < Duration > ,
219
217
220
- terminated: bool ,
218
+ terminated : bool ,
221
219
222
- last_high_watermark: i64 ,
220
+ last_high_watermark : i64 ,
223
221
224
- buffer: VecDeque <RecordAndOffset >,
222
+ buffer : VecDeque < RecordAndOffset > ,
225
223
226
- fetch_fut: Fuse <BoxFuture <' static , FetchResult >>,
227
- }
224
+ fetch_fut : Fuse < BoxFuture < ' static , FetchResult > > ,
228
225
}
229
226
230
227
impl Stream for StreamConsumer {
231
228
type Item = Result < ( RecordAndOffset , i64 ) > ;
232
229
233
- fn poll_next ( self : Pin < & mut Self > , cx : & mut Context < ' _ > ) -> Poll < Option < Self :: Item > > {
234
- let this = self . project ( ) ;
230
+ fn poll_next ( mut self : Pin < & mut Self > , cx : & mut Context < ' _ > ) -> Poll < Option < Self :: Item > > {
235
231
loop {
236
- if * this . terminated {
232
+ if self . terminated {
237
233
return Poll :: Ready ( None ) ;
238
234
}
239
- if let Some ( x) = this . buffer . pop_front ( ) {
240
- return Poll :: Ready ( Some ( Ok ( ( x, * this . last_high_watermark ) ) ) ) ;
235
+ if let Some ( x) = self . buffer . pop_front ( ) {
236
+ return Poll :: Ready ( Some ( Ok ( ( x, self . last_high_watermark ) ) ) ) ;
241
237
}
242
238
243
- if this . fetch_fut . is_terminated ( ) {
244
- let next_offset = * this . next_offset ;
245
- let start_offset = * this . start_offset ;
246
- let bytes = ( * this . min_batch_size ) ..( * this . max_batch_size ) ;
247
- let max_wait_ms = * this . max_wait_ms ;
248
- let next_backoff = std:: mem:: take ( this . next_backoff ) ;
249
- let client = Arc :: clone ( this . client ) ;
239
+ if self . fetch_fut . is_terminated ( ) {
240
+ let next_offset = self . next_offset ;
241
+ let start_offset = self . start_offset ;
242
+ let bytes = ( self . min_batch_size ) ..( self . max_batch_size ) ;
243
+ let max_wait_ms = self . max_wait_ms ;
244
+ let next_backoff = std:: mem:: take ( & mut self . next_backoff ) ;
245
+ let client = Arc :: clone ( & self . client ) ;
250
246
251
247
trace ! ( ?start_offset, ?next_offset, "Fetching records at offset" ) ;
252
248
253
- * this . fetch_fut = FutureExt :: fuse ( Box :: pin ( async move {
249
+ self . fetch_fut = FutureExt :: fuse ( Box :: pin ( async move {
254
250
if let Some ( backoff) = next_backoff {
255
251
tokio:: time:: sleep ( backoff) . await ;
256
252
}
@@ -282,9 +278,9 @@ impl Stream for StreamConsumer {
282
278
} ) ) ;
283
279
}
284
280
285
- let data: FetchResult = futures:: ready!( this . fetch_fut. poll_unpin( cx) ) ;
281
+ let data: FetchResult = futures:: ready!( self . fetch_fut. poll_unpin( cx) ) ;
286
282
287
- match ( data, * this . start_offset ) {
283
+ match ( data, self . start_offset ) {
288
284
( Ok ( inner) , _) => {
289
285
let FetchResultOk {
290
286
mut records_and_offsets,
@@ -300,14 +296,14 @@ impl Stream for StreamConsumer {
300
296
// Remember used offset (might be overwritten if there was any data) so we don't refetch the
301
297
// earliest / latest offset for every try. Also fetching the latest offset might be racy otherwise,
302
298
// since we'll never be in a position where the latest one can actually be fetched.
303
- * this . next_offset = Some ( used_offset) ;
299
+ self . next_offset = Some ( used_offset) ;
304
300
305
301
// Sort records by offset in case they aren't in order
306
302
records_and_offsets. sort_by_key ( |x| x. offset ) ;
307
- * this . last_high_watermark = watermark;
303
+ self . last_high_watermark = watermark;
308
304
if let Some ( x) = records_and_offsets. last ( ) {
309
- * this . next_offset = Some ( x. offset + 1 ) ;
310
- this . buffer . extend ( records_and_offsets)
305
+ self . next_offset = Some ( x. offset + 1 ) ;
306
+ self . buffer . extend ( records_and_offsets)
311
307
}
312
308
continue ;
313
309
}
@@ -320,25 +316,25 @@ impl Stream for StreamConsumer {
320
316
StartOffset :: Earliest | StartOffset :: Latest ,
321
317
) => {
322
318
// wipe offset and try again
323
- * this . next_offset = None ;
319
+ self . next_offset = None ;
324
320
325
321
// This will only happen if retention / deletions happen after we've asked for the earliest/latest
326
322
// offset and our "fetch" request. This should be a rather rare event, but if something is horrible
327
323
// wrong in our cluster (e.g. some actor is spamming "delete" requests) then let's at least backoff
328
324
// a bit.
329
325
let backoff_secs = 1 ;
330
326
warn ! (
331
- start_offset=?this . start_offset,
327
+ start_offset=?self . start_offset,
332
328
backoff_secs,
333
329
"Records are gone between ListOffsets and Fetch, backoff a bit" ,
334
330
) ;
335
- * this . next_backoff = Some ( Duration :: from_secs ( backoff_secs) ) ;
331
+ self . next_backoff = Some ( Duration :: from_secs ( backoff_secs) ) ;
336
332
337
333
continue ;
338
334
}
339
335
// if we have an offset, terminate the stream
340
336
( Err ( e) , _) => {
341
- * this . terminated = true ;
337
+ self . terminated = true ;
342
338
343
339
// report error once
344
340
return Poll :: Ready ( Some ( Err ( e) ) ) ;
0 commit comments