Skip to content

Commit

Permalink
Optimize code structure(version 1.3).
Browse files Browse the repository at this point in the history
Signed-off-by: root <[email protected]>
  • Loading branch information
root authored and root committed Dec 19, 2022
1 parent b334a08 commit b6fbaae
Show file tree
Hide file tree
Showing 6 changed files with 237 additions and 151 deletions.
100 changes: 16 additions & 84 deletions src/engine.rs
Original file line number Diff line number Diff line change
Expand Up @@ -341,73 +341,10 @@ where
start.elapsed().as_micros()
);

let mut new_block_flags: Vec<bool> = Vec::with_capacity(length);
let mut block_sum = 0;
for (t, i) in ents_idx.iter().enumerate() {
if match t {
0 => true,
_ => ents_idx[t - 1].entries.unwrap() != ents_idx[t].entries.unwrap(),
} {
block_sum += 1;
new_block_flags.push(true);
} else {
new_block_flags.push(false);
}
}

let mut ctx = AioContext::new(block_sum);
for (seq, i) in ents_idx.iter().enumerate() {
if new_block_flags[seq] {
submit_read_request_to_file(
self.pipe_log.as_ref(),
seq,
&mut ctx,
i.entries.unwrap(),
)
.unwrap();
}
}
println!(
"[fetch_entries_to_aio] (stage2) time cost: {:?} us",
start.elapsed().as_micros()
);
self.pipe_log
.async_entry_read::<M>(&mut ents_idx, vec)
.unwrap();

let mut seq = 0;
let mut decode_buf = vec![];

for (t, i) in ents_idx.iter().enumerate() {
decode_buf = match t {
0 => {
ctx.single_wait(0).unwrap();
LogBatch::decode_entries_block(
&ctx.data(0),
ents_idx[0].entries.unwrap(),
ents_idx[0].compression_type,
)
.unwrap()
}
_ => match new_block_flags[t] {
true => {
seq += 1;
ctx.single_wait(seq).unwrap();
LogBatch::decode_entries_block(
&ctx.data(seq),
i.entries.unwrap(),
i.compression_type,
)
.unwrap()
}
false => decode_buf,
},
};
vec.push(
parse_from_bytes::<M>(
&mut decode_buf
[(i.entry_offset) as usize..(i.entry_offset + i.entry_len) as usize],
)
.unwrap(),
);
}
ENGINE_READ_ENTRY_COUNT_HISTOGRAM.observe(ents_idx.len() as f64);
println!(
"[fetch_entries_to_aio] (end) time cost: {:?} us",
Expand Down Expand Up @@ -692,19 +629,6 @@ where
})
}

pub(crate) fn submit_read_request_to_file<P>(
pipe_log: &P,
seq: usize,
ctx: &mut AioContext,
handle: FileBlockHandle,
) -> Result<()>
where
P: PipeLog,
{
pipe_log.read_bytes_aio(seq, ctx, handle).unwrap();
Ok(())
}

#[cfg(test)]
mod tests {
use super::*;
Expand Down Expand Up @@ -910,7 +834,7 @@ mod tests {

#[test]
fn test_async_read() {
let normal_batch_size = 8192;
let normal_batch_size = 4096;
let compressed_batch_size = 5120;
for &entry_size in &[normal_batch_size] {
if entry_size == normal_batch_size {
Expand Down Expand Up @@ -2162,11 +2086,15 @@ mod tests {
type Handle = <ObfuscatedFileSystem as FileSystem>::Handle;
type Reader = <ObfuscatedFileSystem as FileSystem>::Reader;
type Writer = <ObfuscatedFileSystem as FileSystem>::Writer;
type AsyncIoContext = AioContext;

// fn read_aio(&self, ctx: &mut AioContext, offset: u64) -> std::io::Result<()>
// { // todo!()
// Ok(())
// }
fn new_async_reader(
&self,
handle: Arc<Self::Handle>,
ctx: &mut Self::AsyncIoContext,
) -> std::io::Result<()> {
ctx.new_reader(handle)
}

fn create<P: AsRef<Path>>(&self, path: P) -> std::io::Result<Self::Handle> {
let handle = self.inner.create(&path)?;
Expand Down Expand Up @@ -2226,6 +2154,10 @@ mod tests {
fn new_writer(&self, h: Arc<Self::Handle>) -> std::io::Result<Self::Writer> {
self.inner.new_writer(h)
}

fn new_async_io_context(&self, block_sum: usize) -> std::io::Result<Self::AsyncIoContext> {
todo!()
}
}

#[test]
Expand Down
78 changes: 51 additions & 27 deletions src/env/default.rs
Original file line number Diff line number Diff line change
Expand Up @@ -102,14 +102,13 @@ impl LogFd {
Ok(readed)
}

pub fn read_aio(&self, seq: usize, ctx: &mut AioContext, offset: u64) {
pub fn read_aio(&self, aior: &mut aiocb, len: usize, pbuf: *mut u8, offset: u64) {
unsafe {
let aior = &mut ctx.aio_vec[seq];
aior.aio_fildes = self.0;
aior.aio_reqprio = 0;
aior.aio_sigevent = SigEvent::new(SigevNotify::SigevNone).sigevent();
aior.aio_nbytes = ctx.buf_vec[seq].len() as usize;
aior.aio_buf = ctx.buf_vec[seq].as_mut_ptr() as *mut c_void;
aior.aio_nbytes = len;
aior.aio_buf = pbuf as *mut c_void;
aior.aio_lio_opcode = libc::LIO_READ;
aior.aio_offset = offset as off_t;
libc::aio_read(aior);
Expand Down Expand Up @@ -277,25 +276,52 @@ impl WriteExt for LogFile {
}

pub struct AioContext {
inner: Option<Arc<LogFd>>,
offset: u64,
index: usize,
aio_vec: Vec<aiocb>,
pub(crate) buf_vec: Vec<Vec<u8>>,
}
impl AioContext {
pub fn new(block_sum: usize) -> Self {
let mut vec = vec![];
let mut aio_vec = vec![];
let mut buf_vec = vec![];
unsafe {
for i in 0..block_sum {
vec.push(mem::zeroed::<libc::aiocb>());
aio_vec.push(mem::zeroed::<libc::aiocb>());
}
}
Self {
aio_vec: vec,
buf_vec: vec![],
inner: None,
offset: 0,
index: 0,
aio_vec,
buf_vec,
}
}

pub fn new_reader(&mut self, fd: Arc<LogFd>) -> IoResult<()> {
self.inner = Some(fd);
Ok(())
}
}

impl AsyncContext for AioContext {
fn wait(&mut self) -> IoResult<usize> {
let mut total = 0;
for seq in 0..self.aio_vec.len() {
match self.single_wait(seq) {
Ok(len) => total += 1,
Err(e) => return Err(e),
}
}
Ok(total as usize)
}

fn data(&self, seq: usize) -> Vec<u8> {
self.buf_vec[seq].to_vec()
}

fn single_wait(&mut self, seq: usize) -> IoResult<usize> {
let buf_len = self.buf_vec[seq].len();
unsafe {
Expand All @@ -312,19 +338,19 @@ impl AsyncContext for AioContext {
}
}

fn wait(&mut self) -> IoResult<usize> {
let mut total = 0;
for seq in 0..self.aio_vec.len() {
match self.single_wait(seq) {
Ok(len) => total += 1,
Err(e) => return Err(e),
}
}
Ok(total as usize)
}
fn submit_read_req(&mut self, buf: Vec<u8>, offset: u64) -> IoResult<()> {
let seq = self.index;
self.index += 1;
self.buf_vec.push(buf);

fn data(&self, seq: usize) -> Vec<u8> {
self.buf_vec[seq].to_vec()
self.inner.as_ref().unwrap().read_aio(
&mut self.aio_vec[seq],
self.buf_vec[seq].len(),
self.buf_vec[seq].as_mut_ptr(),
offset,
);

Ok(())
}
}

Expand All @@ -334,16 +360,14 @@ impl FileSystem for DefaultFileSystem {
type Handle = LogFd;
type Reader = LogFile;
type Writer = LogFile;
type AsyncIoContext = AioContext;

fn read_aio(
fn new_async_reader(
&self,
handle: Arc<Self::Handle>,
seq: usize,
ctx: &mut AioContext,
offset: u64,
ctx: &mut Self::AsyncIoContext,
) -> IoResult<()> {
handle.read_aio(seq, ctx, offset);
Ok(())
ctx.new_reader(handle)
}

fn create<P: AsRef<Path>>(&self, path: P) -> IoResult<Self::Handle> {
Expand All @@ -370,7 +394,7 @@ impl FileSystem for DefaultFileSystem {
Ok(LogFile::new(handle))
}

fn new_async_context(&self, block_sum: usize) -> IoResult<AioContext> {
fn new_async_io_context(&self, block_sum: usize) -> IoResult<Self::AsyncIoContext> {
Ok(AioContext::new(block_sum))
}
}
19 changes: 8 additions & 11 deletions src/env/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,16 +17,13 @@ pub trait FileSystem: Send + Sync {
type Handle: Send + Sync + Handle;
type Reader: Seek + Read + Send;
type Writer: Seek + Write + Send + WriteExt;
type AsyncIoContext: AsyncContext;

fn read_aio(
fn new_async_reader(
&self,
handle: Arc<Self::Handle>,
seq: usize,
ctx: &mut AioContext,
offset: u64,
) -> Result<()> {
Ok(())
}
ctx: &mut Self::AsyncIoContext,
) -> Result<()>;

fn create<P: AsRef<Path>>(&self, path: P) -> Result<Self::Handle>;

Expand Down Expand Up @@ -62,9 +59,7 @@ pub trait FileSystem: Send + Sync {

fn new_writer(&self, handle: Arc<Self::Handle>) -> Result<Self::Writer>;

fn new_async_context(&self, block_sum: usize) -> Result<AioContext> {
Ok(AioContext::new(block_sum))
}
fn new_async_io_context(&self, block_sum: usize) -> Result<Self::AsyncIoContext>;
}

pub trait Handle {
Expand All @@ -84,6 +79,8 @@ pub trait WriteExt {

pub trait AsyncContext {
fn wait(&mut self) -> Result<usize>;
fn single_wait(&mut self, seq: usize) -> Result<usize>;
fn data(&self, seq: usize) -> Vec<u8>;
fn single_wait(&mut self, seq: usize) -> Result<usize>;

fn submit_read_req(&mut self, buf: Vec<u8>, offset: u64) -> Result<()>;
}
13 changes: 13 additions & 0 deletions src/env/obfuscated.rs
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,15 @@ impl FileSystem for ObfuscatedFileSystem {
type Handle = <DefaultFileSystem as FileSystem>::Handle;
type Reader = ObfuscatedReader;
type Writer = ObfuscatedWriter;
type AsyncIoContext = AioContext;

fn new_async_reader(
&self,
handle: Arc<Self::Handle>,
ctx: &mut Self::AsyncIoContext,
) -> IoResult<()> {
ctx.new_reader(handle)
}

fn create<P: AsRef<Path>>(&self, path: P) -> IoResult<Self::Handle> {
let r = self.inner.create(path);
Expand Down Expand Up @@ -129,4 +138,8 @@ impl FileSystem for ObfuscatedFileSystem {
fn new_writer(&self, handle: Arc<Self::Handle>) -> IoResult<Self::Writer> {
Ok(ObfuscatedWriter(self.inner.new_writer(handle)?))
}

fn new_async_io_context(&self, block_sum: usize) -> IoResult<Self::AsyncIoContext> {
Ok(AioContext::new(block_sum))
}
}
Loading

0 comments on commit b6fbaae

Please sign in to comment.