@@ -4,24 +4,21 @@ use std::collections::VecDeque;
44use std:: fs:: File as StdFile ;
55use std:: path:: { Path , PathBuf } ;
66use std:: sync:: Arc ;
7- use std:: sync:: Mutex as SyncMutex ;
87
98use crossbeam:: utils:: CachePadded ;
109use fail:: fail_point;
11- use libc:: aiocb;
1210use log:: error;
1311use parking_lot:: { Mutex , MutexGuard , RwLock } ;
14- use protobuf:: { parse_from_bytes, Message } ;
1512
1613use crate :: config:: Config ;
17- use crate :: env:: { AioContext , AsyncContext , DefaultFileSystem , FileSystem } ;
14+ use crate :: env:: FileSystem ;
1815use crate :: event_listener:: EventListener ;
1916use crate :: memtable:: EntryIndex ;
2017use crate :: metrics:: * ;
2118use crate :: pipe_log:: {
2219 FileBlockHandle , FileId , FileSeq , LogFileContext , LogQueue , PipeLog , ReactiveBytes ,
2320} ;
24- use crate :: { perf_context, Error , LogBatch , MessageExt , Result } ;
21+ use crate :: { perf_context, Error , Result } ;
2522
2623use super :: format:: { build_recycled_file_name, FileNameExt , LogFileFormat } ;
2724use super :: log_file:: build_file_reader;
@@ -453,87 +450,7 @@ impl<F: FileSystem> PipeLog for DualPipes<F> {
453450 fn read_bytes ( & self , handle : FileBlockHandle ) -> Result < Vec < u8 > > {
454451 self . pipes [ handle. id . queue as usize ] . read_bytes ( handle)
455452 }
456- // #[inline]
457- // fn async_entry_read<M: Message + MessageExt<Entry = M>>(
458- // &self,
459- // ents_idx: &mut Vec<EntryIndex>,
460- // vec: &mut Vec<M::Entry>,
461- // ) -> Result<()> {
462- // let mut handles: Vec<FileBlockHandle> = vec![];
463- // for (t, i) in ents_idx.iter().enumerate() {
464- // if t == 0 || (i.entries.unwrap() != ents_idx[t - 1].entries.unwrap())
465- // { handles.push(i.entries.unwrap());
466- // }
467- // }
468-
469- // let mut ctx_append = self.pipes[LogQueue::Append as usize]
470- // .file_system
471- // .new_async_io_context(handles.len() as usize)
472- // .unwrap();
473- // let mut ctx_rewrite = self.pipes[LogQueue::Rewrite as usize]
474- // .file_system
475- // .new_async_io_context(handles.len() as usize)
476- // .unwrap();
477-
478- // for handle in handles.iter_mut() {
479- // match handle.id.queue {
480- // LogQueue::Append => {
481- // self.pipes[LogQueue::Append as usize].submit_read_req(
482- // handle,
483- // &mut ctx_append,
484- // );
485- // }
486- // LogQueue::Rewrite => {
487- // self.pipes[LogQueue::Rewrite as usize].submit_read_req(
488- // handle,
489- // &mut ctx_rewrite,
490- // );
491- // }
492- // }
493- // }
494-
495- // let mut decode_buf = vec![];
496- // let mut seq_append: i32 = -1;
497- // let mut seq_rewrite: i32 = -1;
498-
499- // for (t, i) in ents_idx.iter().enumerate() {
500- // decode_buf =
501- // match t == 0 || ents_idx[t - 1].entries.unwrap() !=
502- // ents_idx[t].entries.unwrap() { true => match
503- // ents_idx[t].entries.unwrap().id.queue {
504- // LogQueue::Append => { seq_append += 1;
505- // ctx_append.single_wait(seq_append as usize).unwrap();
506- // LogBatch::decode_entries_block(
507- // &ctx_append.data(seq_append as usize),
508- // i.entries.unwrap(),
509- // i.compression_type,
510- // )
511- // .unwrap()
512- // }
513- // LogQueue::Rewrite => {
514- // seq_rewrite += 1;
515- // ctx_rewrite.single_wait(seq_rewrite as
516- // usize).unwrap(); LogBatch::decode_entries_block(
517- // &ctx_rewrite.data(seq_rewrite as usize),
518- // i.entries.unwrap(),
519- // i.compression_type,
520- // )
521- // .unwrap()
522- // }
523- // },
524- // false => decode_buf,
525- // };
526-
527- // vec.push(
528- // parse_from_bytes::<M>(
529- // &mut decode_buf
530- // [(i.entry_offset) as usize..(i.entry_offset +
531- // i.entry_len) as usize], )
532- // .unwrap(),
533- // );
534- // }
535- // Ok(())
536- // }
453+
537454 #[ inline]
538455 fn async_read_bytes ( & self , ents_idx : & mut Vec < EntryIndex > ) -> Result < Vec < Vec < u8 > > > {
539456 let mut blocks: Vec < FileBlockHandle > = vec ! [ ] ;
@@ -542,12 +459,11 @@ impl<F: FileSystem> PipeLog for DualPipes<F> {
542459 blocks. push ( i. entries . unwrap ( ) ) ;
543460 }
544461 }
545- let mut res: Vec < Vec < u8 > > = vec ! [ ] ;
546462
547463 let fs = & self . pipes [ LogQueue :: Append as usize ] . file_system ;
548464 let mut ctx = fs. new_async_io_context ( blocks. len ( ) ) . unwrap ( ) ;
549465
550- for ( seq , block) in blocks. iter_mut ( ) . enumerate ( ) {
466+ for block in blocks. iter_mut ( ) {
551467 match block. id . queue {
552468 LogQueue :: Append => {
553469 self . pipes [ LogQueue :: Append as usize ] . async_read ( block, & mut ctx) ;
0 commit comments