Skip to content
124 changes: 122 additions & 2 deletions src/engine.rs
Original file line number Diff line number Diff line change
Expand Up @@ -304,10 +304,29 @@ where
memtable
.read()
.fetch_entries_to(begin, end, max_size, &mut ents_idx)?;
for i in ents_idx.iter() {
vec.push(read_entry_from_file::<M, _>(self.pipe_log.as_ref(), i)?);

let mut blocks: Vec<FileBlockHandle> = Vec::new();
let mut total_bytes = 0;
for (t, i) in ents_idx.iter().enumerate() {
if t == 0 || (i.entries.unwrap() != ents_idx[t - 1].entries.unwrap()) {
blocks.push(i.entries.unwrap());
total_bytes += i.entries.unwrap().len;
}
}

if blocks.len() > 5 && total_bytes > 1024 * 1024 {
//Async IO
let bytes = self.pipe_log.async_read_bytes(blocks)?;
parse_entries_from_bytes::<M>(bytes, &mut ents_idx, vec)?;
} else {
//Sync IO
for i in ents_idx.iter() {
vec.push(read_entry_from_file::<M, _>(self.pipe_log.as_ref(), i)?);
}
}

ENGINE_READ_ENTRY_COUNT_HISTOGRAM.observe(ents_idx.len() as f64);

return Ok(ents_idx.len());
}
Ok(0)
Expand Down Expand Up @@ -545,6 +564,38 @@ thread_local! {
static BLOCK_CACHE: BlockCache = BlockCache::new();
}

pub(crate) fn parse_entries_from_bytes<M: MessageExt>(
bytes: Vec<Vec<u8>>,
ents_idx: &mut [EntryIndex],
vec: &mut Vec<M::Entry>,
) -> Result<()> {
let mut seq = 0;
for idx in ents_idx {
BLOCK_CACHE.with(|cache| {
if cache.key.get() != idx.entries.unwrap() {
cache.insert(
idx.entries.unwrap(),
LogBatch::decode_entries_block(
&bytes[seq],
idx.entries.unwrap(),
idx.compression_type,
)
.unwrap(),
);
seq += 1;
}
let e = parse_from_bytes(
&cache.block.borrow()
[idx.entry_offset as usize..(idx.entry_offset + idx.entry_len) as usize],
)
.unwrap();
assert_eq!(M::index(&e), idx.index);
vec.push(e);
});
}
Ok(())
}

pub(crate) fn read_entry_from_file<M, P>(pipe_log: &P, idx: &EntryIndex) -> Result<M::Entry>
where
M: MessageExt,
Expand Down Expand Up @@ -759,6 +810,57 @@ mod tests {
}
}

#[test]
fn test_multi_read_entry() {
let sync_batch_size = 1024;
let async_batch_size = 1024 * 1024;
for &entry_size in &[sync_batch_size, async_batch_size] {
let dir = tempfile::Builder::new()
.prefix("test_multi_read_entry")
.tempdir()
.unwrap();
let cfg = Config {
dir: dir.path().to_str().unwrap().to_owned(),
target_file_size: ReadableSize(1),
..Default::default()
};

let engine = RaftLogEngine::open_with_file_system(
cfg.clone(),
Arc::new(ObfuscatedFileSystem::default()),
)
.unwrap();
assert_eq!(engine.path(), dir.path().to_str().unwrap());
let data = vec![b'x'; entry_size];

for i in 0..10 {
for rid in 10..20 {
let index = i + rid;
engine.append(rid, index, index + 1, Some(&data));
}
}
for i in 10..20 {
let rid = i;
let index = i;
engine.scan_entries(rid, index, index + 10, |_, q, d| {
assert_eq!(q, LogQueue::Append);
assert_eq!(d, &data);
});
}

// Recover the engine.
let engine = engine.reopen();
for i in 10..20 {
let rid = i;
let index = i;
engine.scan_entries(rid, index, index + 10, |_, q, d| {
assert_eq!(q, LogQueue::Append);
assert_eq!(d, &data);
});
}
}
}

#[test]
fn test_clean_raft_group() {
fn run_steps(steps: &[Option<(u64, u64)>]) {
Expand Down Expand Up @@ -1994,6 +2096,20 @@ mod tests {
type Handle = <ObfuscatedFileSystem as FileSystem>::Handle;
type Reader = <ObfuscatedFileSystem as FileSystem>::Reader;
type Writer = <ObfuscatedFileSystem as FileSystem>::Writer;
type MultiReadContext = <ObfuscatedFileSystem as FileSystem>::MultiReadContext;

fn multi_read(
&self,
ctx: &mut Self::MultiReadContext,
handle: Arc<Self::Handle>,
block: &FileBlockHandle,
) -> std::io::Result<()> {
self.inner.multi_read(ctx, handle, block)
}

fn async_finish(&self, ctx: Self::MultiReadContext) -> std::io::Result<Vec<Vec<u8>>> {
self.inner.async_finish(ctx)
}

fn create<P: AsRef<Path>>(&self, path: P) -> std::io::Result<Self::Handle> {
let handle = self.inner.create(&path)?;
Expand Down Expand Up @@ -2056,6 +2172,10 @@ mod tests {
fn new_writer(&self, h: Arc<Self::Handle>) -> std::io::Result<Self::Writer> {
self.inner.new_writer(h)
}

fn new_async_io_context(&self) -> std::io::Result<Self::MultiReadContext> {
self.inner.new_async_io_context()
}
}

#[test]
Expand Down
49 changes: 49 additions & 0 deletions src/env/default.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,18 +3,23 @@
use std::io::{Read, Result as IoResult, Seek, SeekFrom, Write};
use std::os::unix::io::RawFd;
use std::path::Path;
use std::pin::Pin;
use std::slice;
use std::sync::Arc;

use fail::fail_point;
use log::error;
use nix::errno::Errno;
use nix::fcntl::{self, OFlag};
use nix::sys::aio::{aio_suspend, Aio, AioRead};
use nix::sys::signal::SigevNotify;
use nix::sys::stat::Mode;
use nix::sys::uio::{pread, pwrite};
use nix::unistd::{close, ftruncate, lseek, Whence};
use nix::NixPath;

use crate::env::{FileSystem, Handle, WriteExt};
use crate::pipe_log::FileBlockHandle;

fn from_nix_error(e: nix::Error, custom: &'static str) -> std::io::Error {
let kind = std::io::Error::from(e).kind();
Expand Down Expand Up @@ -256,13 +261,53 @@ impl WriteExt for LogFile {
self.inner.allocate(offset, size)
}
}
#[derive(Default)]
pub struct AioContext {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Don't use this name outside this file. Just like type Handle = <DefaultFileSystem as FileSystem>::Handle;, you can use the same syntax to reference aio context of base file system without needing to expose this struct.

aio_vec: Vec<Pin<Box<AioRead<'static>>>>,
buf_vec: Vec<Vec<u8>>,
}

pub struct DefaultFileSystem;

impl FileSystem for DefaultFileSystem {
type Handle = LogFd;
type Reader = LogFile;
type Writer = LogFile;
type MultiReadContext = AioContext;

fn multi_read(
&self,
ctx: &mut Self::MultiReadContext,
handle: Arc<Self::Handle>,
block: &FileBlockHandle,
) -> IoResult<()> {
let buf = vec![0_u8; block.len];
ctx.buf_vec.push(buf);

let mut aior = Box::pin(AioRead::new(
handle.0,
block.offset as i64,
unsafe {
slice::from_raw_parts_mut(ctx.buf_vec.last_mut().unwrap().as_mut_ptr(), block.len)
},
0,
SigevNotify::SigevNone,
));
aior.as_mut().submit()?;
ctx.aio_vec.push(aior);

Ok(())
}

fn async_finish(&self, mut ctx: Self::MultiReadContext) -> IoResult<Vec<Vec<u8>>> {
for seq in 0..ctx.aio_vec.len() {
let buf_len = ctx.buf_vec[seq].len();
aio_suspend(&[&*ctx.aio_vec[seq]], None)?;
assert_eq!(ctx.aio_vec[seq].as_mut().aio_return()?, buf_len);
}
let res = ctx.buf_vec.to_owned();
Ok(res)
}

fn create<P: AsRef<Path>>(&self, path: P) -> IoResult<Self::Handle> {
LogFd::create(path.as_ref())
Expand All @@ -288,4 +333,8 @@ impl FileSystem for DefaultFileSystem {
fn new_writer(&self, handle: Arc<Self::Handle>) -> IoResult<Self::Writer> {
Ok(LogFile::new(handle))
}

fn new_async_io_context(&self) -> IoResult<Self::MultiReadContext> {
Ok(AioContext::default())
}
}
12 changes: 12 additions & 0 deletions src/env/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,11 +10,21 @@ mod obfuscated;
pub use default::DefaultFileSystem;
pub use obfuscated::ObfuscatedFileSystem;

use crate::pipe_log::FileBlockHandle;
/// FileSystem
pub trait FileSystem: Send + Sync {
type Handle: Send + Sync + Handle;
type Reader: Seek + Read + Send;
type Writer: Seek + Write + Send + WriteExt;
type MultiReadContext;

fn multi_read(
&self,
ctx: &mut Self::MultiReadContext,
handle: Arc<Self::Handle>,
block: &FileBlockHandle,
) -> Result<()>;
fn async_finish(&self, ctx: Self::MultiReadContext) -> Result<Vec<Vec<u8>>>;

fn create<P: AsRef<Path>>(&self, path: P) -> Result<Self::Handle>;

Expand Down Expand Up @@ -55,6 +65,8 @@ pub trait FileSystem: Send + Sync {
fn new_reader(&self, handle: Arc<Self::Handle>) -> Result<Self::Reader>;

fn new_writer(&self, handle: Arc<Self::Handle>) -> Result<Self::Writer>;

fn new_async_io_context(&self) -> Result<Self::MultiReadContext>;
}

pub trait Handle {
Expand Down
27 changes: 27 additions & 0 deletions src/env/obfuscated.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ use std::sync::Arc;

use crate::env::{DefaultFileSystem, FileSystem, WriteExt};

use crate::pipe_log::FileBlockHandle;
pub struct ObfuscatedReader(<DefaultFileSystem as FileSystem>::Reader);

impl Read for ObfuscatedReader {
Expand Down Expand Up @@ -89,6 +90,28 @@ impl FileSystem for ObfuscatedFileSystem {
type Handle = <DefaultFileSystem as FileSystem>::Handle;
type Reader = ObfuscatedReader;
type Writer = ObfuscatedWriter;
type MultiReadContext = <DefaultFileSystem as FileSystem>::MultiReadContext;

fn multi_read(
&self,
ctx: &mut Self::MultiReadContext,
handle: Arc<Self::Handle>,
block: &FileBlockHandle,
) -> IoResult<()> {
self.inner.multi_read(ctx, handle, block)
}

fn async_finish(&self, ctx: Self::MultiReadContext) -> IoResult<Vec<Vec<u8>>> {
let mut base = self.inner.async_finish(ctx).unwrap();

for v in base.iter_mut() {
for c in v.iter_mut() {
// do obfuscation.
*c = c.wrapping_sub(1);
}
}
Ok(base)
}

fn create<P: AsRef<Path>>(&self, path: P) -> IoResult<Self::Handle> {
let r = self.inner.create(path);
Expand Down Expand Up @@ -127,4 +150,8 @@ impl FileSystem for ObfuscatedFileSystem {
fn new_writer(&self, handle: Arc<Self::Handle>) -> IoResult<Self::Writer> {
Ok(ObfuscatedWriter(self.inner.new_writer(handle)?))
}

fn new_async_io_context(&self) -> IoResult<Self::MultiReadContext> {
self.inner.new_async_io_context()
}
}
2 changes: 1 addition & 1 deletion src/file_pipe_log/log_file.rs
Original file line number Diff line number Diff line change
Expand Up @@ -172,7 +172,7 @@ impl<F: FileSystem> LogFileReader<F> {
}

pub fn read(&mut self, handle: FileBlockHandle) -> Result<Vec<u8>> {
let mut buf = vec![0; handle.len as usize];
let mut buf = vec![0; handle.len];
let size = self.read_to(handle.offset, &mut buf)?;
buf.truncate(size);
Ok(buf)
Expand Down
21 changes: 21 additions & 0 deletions src/file_pipe_log/pipe.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ use parking_lot::{Mutex, MutexGuard, RwLock};
use crate::config::Config;
use crate::env::FileSystem;
use crate::event_listener::EventListener;
use crate::memtable::EntryIndex;
use crate::metrics::*;
use crate::pipe_log::{
FileBlockHandle, FileId, FileSeq, LogFileContext, LogQueue, PipeLog, ReactiveBytes,
Expand Down Expand Up @@ -254,6 +255,15 @@ impl<F: FileSystem> SinglePipe<F> {
reader.read(handle)
}

fn async_read(&self, ctx: &mut F::MultiReadContext, blocks: Vec<FileBlockHandle>) {
for block in blocks.iter() {
let fd = self.get_fd(block.id.seq).unwrap();
self.file_system
.multi_read(ctx, fd, block)
.expect("Async read failed.");
}
}

fn append<T: ReactiveBytes + ?Sized>(&self, bytes: &mut T) -> Result<FileBlockHandle> {
fail_point!("file_pipe_log::append");
let mut writable_file = self.writable_file.lock();
Expand Down Expand Up @@ -444,6 +454,17 @@ impl<F: FileSystem> PipeLog for DualPipes<F> {
self.pipes[handle.id.queue as usize].read_bytes(handle)
}

#[inline]
fn async_read_bytes(&self, blocks: Vec<FileBlockHandle>) -> Result<Vec<Vec<u8>>> {
let fs = &self.pipes[LogQueue::Append as usize].file_system;
let mut ctx = fs.new_async_io_context()?;

self.pipes[LogQueue::Append as usize].async_read(&mut ctx, blocks);
let res = fs.async_finish(ctx)?;

Ok(res)
}

#[inline]
fn append<T: ReactiveBytes + ?Sized>(
&self,
Expand Down
4 changes: 4 additions & 0 deletions src/pipe_log.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
use std::cmp::Ordering;
use std::fmt::{self, Display};

use crate::memtable::EntryIndex;
use fail::fail_point;
use num_derive::{FromPrimitive, ToPrimitive};
use num_traits::ToPrimitive;
Expand Down Expand Up @@ -172,6 +173,9 @@ pub trait PipeLog: Sized {
/// Reads some bytes from the specified position.
fn read_bytes(&self, handle: FileBlockHandle) -> Result<Vec<u8>>;

/// Reads bytes from multi blocks using 'Async IO'.
fn async_read_bytes(&self, blocks: Vec<FileBlockHandle>) -> Result<Vec<Vec<u8>>>;

/// Appends some bytes to the specified log queue. Returns file position of
/// the written bytes.
fn append<T: ReactiveBytes + ?Sized>(
Expand Down