Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

read entries using AIO #286

Open
wants to merge 16 commits into
base: master
Choose a base branch
from
163 changes: 162 additions & 1 deletion src/engine.rs
Original file line number Diff line number Diff line change
Expand Up @@ -308,11 +308,45 @@ where
vec.push(read_entry_from_file::<M, _>(self.pipe_log.as_ref(), i)?);
}
ENGINE_READ_ENTRY_COUNT_HISTOGRAM.observe(ents_idx.len() as f64);

return Ok(ents_idx.len());
}
Ok(0)
}

pub fn fetch_entries_to_aio<M: Message + MessageExt<Entry = M>>(
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
pub fn fetch_entries_to_aio<M: Message + MessageExt<Entry = M>>(
pub fn fetch_entries_to_aio<M: MessageExt>(

&self,
region_id: u64,
begin: u64,
end: u64,
max_size: Option<usize>,
vec: &mut Vec<M::Entry>,
) -> Result<usize> {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think based on the tests, you can antomatically select single_read or multi_read and avoid creating two different engine methods, e.g. use aio when blocks.len() > 4 or something.

One issue though is that I'm not sure if aio syscall is portable enough. You might need to do some research on how to detect if aio is available (maybe take a look at how RocksDB did it).

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

OK. Let me try.

let _t = StopWatch::new(&*ENGINE_READ_ENTRY_DURATION_HISTOGRAM);
if let Some(memtable) = self.memtables.get(region_id) {
let length = (end - begin) as usize;
let mut ents_idx: Vec<EntryIndex> = Vec::with_capacity(length);
memtable
.read()
.fetch_entries_to(begin, end, max_size, &mut ents_idx)?;

let mut blocks: Vec<FileBlockHandle> = Vec::new();
for (t, i) in ents_idx.iter().enumerate() {
if t == 0 || (i.entries.unwrap() != ents_idx[t - 1].entries.unwrap()) {
blocks.push(i.entries.unwrap());
}
}
let bytes = self.pipe_log.async_read_bytes(blocks)?;
parse_entries_from_bytes::<M>(bytes, &mut ents_idx, vec)?;

ENGINE_READ_ENTRY_COUNT_HISTOGRAM.observe(ents_idx.len() as f64);

return Ok(ents_idx.len());
}

Ok(0)
}

pub fn first_index(&self, region_id: u64) -> Option<u64> {
if let Some(memtable) = self.memtables.get(region_id) {
return memtable.read().first_index();
Expand Down Expand Up @@ -544,7 +578,32 @@ impl BlockCache {
thread_local! {
static BLOCK_CACHE: BlockCache = BlockCache::new();
}

pub(crate) fn parse_entries_from_bytes<M: MessageExt>(
bytes: Vec<Vec<u8>>,
ents_idx: &mut [EntryIndex],
vec: &mut Vec<M::Entry>,
) -> Result<()> {
let mut decode_buf = vec![];
let mut seq: i32 = -1;
for (t, idx) in ents_idx.iter().enumerate() {
decode_buf =
match t == 0 || ents_idx[t - 1].entries.unwrap() != ents_idx[t].entries.unwrap() {
true => {
seq += 1;
bytes[seq as usize].to_vec()
}
false => decode_buf,
};
vec.push(parse_from_bytes(
&LogBatch::decode_entries_block(
&decode_buf,
idx.entries.unwrap(),
idx.compression_type,
)?[idx.entry_offset as usize..(idx.entry_offset + idx.entry_len) as usize],
)?);
}
Ok(())
}
pub(crate) fn read_entry_from_file<M, P>(pipe_log: &P, idx: &EntryIndex) -> Result<M::Entry>
where
M: MessageExt,
Expand Down Expand Up @@ -683,6 +742,41 @@ mod tests {
reader(e.index, entry_index.entries.unwrap().id.queue, &e.data);
}
}
fn scan_entries_aio<FR: Fn(u64, LogQueue, &[u8])>(
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Add a newline between functions.

&self,
rid: u64,
start: u64,
end: u64,
reader: FR,
) {
let mut entries = Vec::new();
self.fetch_entries_to_aio::<Entry>(
rid,
self.first_index(rid).unwrap(),
self.last_index(rid).unwrap() + 1,
None,
&mut entries,
)
.unwrap();
assert_eq!(entries.len(), (end - start) as usize);
assert_eq!(entries.first().unwrap().index, start);
assert_eq!(
entries.last().unwrap().index,
self.decode_last_index(rid).unwrap()
);
assert_eq!(entries.last().unwrap().index + 1, end);
for e in entries.iter() {
let entry_index = self
.memtables
.get(rid)
.unwrap()
.read()
.get_entry(e.index)
.unwrap();
assert_eq!(&self.get_entry::<Entry>(rid, e.index).unwrap().unwrap(), e);
reader(e.index, entry_index.entries.unwrap().id.queue, &e.data);
}
}

fn file_count(&self, queue: Option<LogQueue>) -> usize {
if let Some(queue) = queue {
Expand Down Expand Up @@ -759,6 +853,55 @@ mod tests {
}
}

#[test]
fn test_async_get_entry() {
let normal_batch_size = 10;
let compressed_batch_size = 5120;
for &entry_size in &[normal_batch_size, compressed_batch_size] {
let dir = tempfile::Builder::new()
.prefix("test_get_entry")
.tempdir()
.unwrap();
let cfg = Config {
dir: dir.path().to_str().unwrap().to_owned(),
target_file_size: ReadableSize(1),
..Default::default()
};

let engine = RaftLogEngine::open_with_file_system(
cfg.clone(),
Arc::new(ObfuscatedFileSystem::default()),
)
.unwrap();
assert_eq!(engine.path(), dir.path().to_str().unwrap());
let data = vec![b'x'; entry_size];
for i in 10..20 {
let rid = i;
let index = i;
engine.append(rid, index, index + 2, Some(&data));
}
for i in 10..20 {
let rid = i;
let index = i;
engine.scan_entries_aio(rid, index, index + 2, |_, q, d| {
assert_eq!(q, LogQueue::Append);
assert_eq!(d, &data);
});
}

// Recover the engine.
let engine = engine.reopen();
for i in 10..20 {
let rid = i;
let index = i;
engine.scan_entries_aio(rid, index, index + 2, |_, q, d| {
assert_eq!(q, LogQueue::Append);
assert_eq!(d, &data);
});
}
}
}

#[test]
fn test_clean_raft_group() {
fn run_steps(steps: &[Option<(u64, u64)>]) {
Expand Down Expand Up @@ -1994,6 +2137,20 @@ mod tests {
type Handle = <ObfuscatedFileSystem as FileSystem>::Handle;
type Reader = <ObfuscatedFileSystem as FileSystem>::Reader;
type Writer = <ObfuscatedFileSystem as FileSystem>::Writer;
type AsyncIoContext = <ObfuscatedFileSystem as FileSystem>::AsyncIoContext;

fn async_read(
&self,
ctx: &mut Self::AsyncIoContext,
handle: Arc<Self::Handle>,
block: &FileBlockHandle,
) -> std::io::Result<()> {
self.inner.async_read(ctx, handle, block)
}

fn async_finish(&self, ctx: Self::AsyncIoContext) -> std::io::Result<Vec<Vec<u8>>> {
self.inner.async_finish(ctx)
}

fn create<P: AsRef<Path>>(&self, path: P) -> std::io::Result<Self::Handle> {
let handle = self.inner.create(&path)?;
Expand Down Expand Up @@ -2056,6 +2213,10 @@ mod tests {
fn new_writer(&self, h: Arc<Self::Handle>) -> std::io::Result<Self::Writer> {
self.inner.new_writer(h)
}

fn new_async_io_context(&self) -> std::io::Result<Self::AsyncIoContext> {
self.inner.new_async_io_context()
}
}

#[test]
Expand Down
48 changes: 48 additions & 0 deletions src/env/default.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,18 +3,23 @@
use std::io::{Read, Result as IoResult, Seek, SeekFrom, Write};
use std::os::unix::io::RawFd;
use std::path::Path;
use std::pin::Pin;
use std::slice;
use std::sync::Arc;

use fail::fail_point;
use log::error;
use nix::errno::Errno;
use nix::fcntl::{self, OFlag};
use nix::sys::aio::{aio_suspend, Aio, AioRead};
use nix::sys::signal::SigevNotify;
use nix::sys::stat::Mode;
use nix::sys::uio::{pread, pwrite};
use nix::unistd::{close, ftruncate, lseek, Whence};
use nix::NixPath;

use crate::env::{FileSystem, Handle, WriteExt};
use crate::pipe_log::FileBlockHandle;

fn from_nix_error(e: nix::Error, custom: &'static str) -> std::io::Error {
let kind = std::io::Error::from(e).kind();
Expand Down Expand Up @@ -256,13 +261,52 @@ impl WriteExt for LogFile {
self.inner.allocate(offset, size)
}
}
#[derive(Default)]
pub struct AioContext {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Don't use this name outside this file. Just like type Handle = <DefaultFileSystem as FileSystem>::Handle;, you can use the same syntax to reference aio context of base file system without needing to expose this struct.

aio_vec: Vec<Pin<Box<AioRead<'static>>>>,
buf_vec: Vec<Vec<u8>>,
}

pub struct DefaultFileSystem;

impl FileSystem for DefaultFileSystem {
type Handle = LogFd;
type Reader = LogFile;
type Writer = LogFile;
type AsyncIoContext = AioContext;

fn async_read(
&self,
ctx: &mut Self::AsyncIoContext,
handle: Arc<Self::Handle>,
block: &FileBlockHandle,
) -> IoResult<()> {
let buf = vec![0_u8; block.len];
ctx.buf_vec.push(buf);

let mut aior = Box::pin(AioRead::new(
handle.0,
block.offset as i64,
unsafe {
slice::from_raw_parts_mut(ctx.buf_vec.last_mut().unwrap().as_mut_ptr(), block.len)
},
0,
SigevNotify::SigevNone,
));
aior.as_mut().submit()?;
ctx.aio_vec.push(aior);

Ok(())
}
fn async_finish(&self, mut ctx: Self::AsyncIoContext) -> IoResult<Vec<Vec<u8>>> {
for seq in 0..ctx.aio_vec.len() {
let buf_len = ctx.buf_vec[seq].len();
aio_suspend(&[&*ctx.aio_vec[seq]], None)?;
assert_eq!(ctx.aio_vec[seq].as_mut().aio_return()?, buf_len);
}
let res = ctx.buf_vec.to_owned();
Ok(res)
}

fn create<P: AsRef<Path>>(&self, path: P) -> IoResult<Self::Handle> {
LogFd::create(path.as_ref())
Expand All @@ -288,4 +332,8 @@ impl FileSystem for DefaultFileSystem {
fn new_writer(&self, handle: Arc<Self::Handle>) -> IoResult<Self::Writer> {
Ok(LogFile::new(handle))
}

fn new_async_io_context(&self) -> IoResult<Self::AsyncIoContext> {
Ok(AioContext::default())
}
}
12 changes: 12 additions & 0 deletions src/env/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,11 +10,21 @@ mod obfuscated;
pub use default::DefaultFileSystem;
pub use obfuscated::ObfuscatedFileSystem;

use crate::pipe_log::FileBlockHandle;
/// FileSystem
pub trait FileSystem: Send + Sync {
type Handle: Send + Sync + Handle;
type Reader: Seek + Read + Send;
type Writer: Seek + Write + Send + WriteExt;
type AsyncIoContext;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This to MultiReadContext.


fn async_read(
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since all the async happens inside the implementation, we can rename this to something like RocksDB's MultiGet, i.e. multi_read.

&self,
ctx: &mut Self::AsyncIoContext,
handle: Arc<Self::Handle>,
block: &FileBlockHandle,
) -> Result<()>;
fn async_finish(&self, ctx: Self::AsyncIoContext) -> Result<Vec<Vec<u8>>>;

fn create<P: AsRef<Path>>(&self, path: P) -> Result<Self::Handle>;

Expand Down Expand Up @@ -55,6 +65,8 @@ pub trait FileSystem: Send + Sync {
fn new_reader(&self, handle: Arc<Self::Handle>) -> Result<Self::Reader>;

fn new_writer(&self, handle: Arc<Self::Handle>) -> Result<Self::Writer>;

fn new_async_io_context(&self) -> Result<Self::AsyncIoContext>;
}

pub trait Handle {
Expand Down
27 changes: 27 additions & 0 deletions src/env/obfuscated.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ use std::sync::Arc;

use crate::env::{DefaultFileSystem, FileSystem, WriteExt};

use crate::pipe_log::FileBlockHandle;
pub struct ObfuscatedReader(<DefaultFileSystem as FileSystem>::Reader);

impl Read for ObfuscatedReader {
Expand Down Expand Up @@ -89,6 +90,28 @@ impl FileSystem for ObfuscatedFileSystem {
type Handle = <DefaultFileSystem as FileSystem>::Handle;
type Reader = ObfuscatedReader;
type Writer = ObfuscatedWriter;
type AsyncIoContext = <DefaultFileSystem as FileSystem>::AsyncIoContext;

fn async_read(
&self,
ctx: &mut Self::AsyncIoContext,
handle: Arc<Self::Handle>,
block: &FileBlockHandle,
) -> IoResult<()> {
self.inner.async_read(ctx, handle, block)
}

fn async_finish(&self, ctx: Self::AsyncIoContext) -> IoResult<Vec<Vec<u8>>> {
let mut base = self.inner.async_finish(ctx).unwrap();

for v in base.iter_mut() {
for c in v.iter_mut() {
// do obfuscation.
*c = c.wrapping_sub(1);
}
}
Ok(base)
}

fn create<P: AsRef<Path>>(&self, path: P) -> IoResult<Self::Handle> {
let r = self.inner.create(path);
Expand Down Expand Up @@ -127,4 +150,8 @@ impl FileSystem for ObfuscatedFileSystem {
fn new_writer(&self, handle: Arc<Self::Handle>) -> IoResult<Self::Writer> {
Ok(ObfuscatedWriter(self.inner.new_writer(handle)?))
}

fn new_async_io_context(&self) -> IoResult<Self::AsyncIoContext> {
self.inner.new_async_io_context()
}
}
2 changes: 1 addition & 1 deletion src/file_pipe_log/log_file.rs
Original file line number Diff line number Diff line change
Expand Up @@ -172,7 +172,7 @@ impl<F: FileSystem> LogFileReader<F> {
}

pub fn read(&mut self, handle: FileBlockHandle) -> Result<Vec<u8>> {
let mut buf = vec![0; handle.len as usize];
let mut buf = vec![0; handle.len];
let size = self.read_to(handle.offset, &mut buf)?;
buf.truncate(size);
Ok(buf)
Expand Down
Loading