From b6fbaae50da5961bf93fb11926418a0e22991c2b Mon Sep 17 00:00:00 2001 From: root Date: Mon, 19 Dec 2022 21:47:39 +0800 Subject: [PATCH] Optimize code structure(version 1.3). Signed-off-by: root --- src/engine.rs | 100 ++++-------------------- src/env/default.rs | 78 ++++++++++++------- src/env/mod.rs | 19 ++--- src/env/obfuscated.rs | 13 ++++ src/file_pipe_log/pipe.rs | 159 ++++++++++++++++++++++++++++++++------ src/pipe_log.rs | 19 +++-- 6 files changed, 237 insertions(+), 151 deletions(-) diff --git a/src/engine.rs b/src/engine.rs index fc2c0f03..c9d515c0 100644 --- a/src/engine.rs +++ b/src/engine.rs @@ -341,73 +341,10 @@ where start.elapsed().as_micros() ); - let mut new_block_flags: Vec = Vec::with_capacity(length); - let mut block_sum = 0; - for (t, i) in ents_idx.iter().enumerate() { - if match t { - 0 => true, - _ => ents_idx[t - 1].entries.unwrap() != ents_idx[t].entries.unwrap(), - } { - block_sum += 1; - new_block_flags.push(true); - } else { - new_block_flags.push(false); - } - } - - let mut ctx = AioContext::new(block_sum); - for (seq, i) in ents_idx.iter().enumerate() { - if new_block_flags[seq] { - submit_read_request_to_file( - self.pipe_log.as_ref(), - seq, - &mut ctx, - i.entries.unwrap(), - ) - .unwrap(); - } - } - println!( - "[fetch_entries_to_aio] (stage2) time cost: {:?} us", - start.elapsed().as_micros() - ); + self.pipe_log + .async_entry_read::(&mut ents_idx, vec) + .unwrap(); - let mut seq = 0; - let mut decode_buf = vec![]; - - for (t, i) in ents_idx.iter().enumerate() { - decode_buf = match t { - 0 => { - ctx.single_wait(0).unwrap(); - LogBatch::decode_entries_block( - &ctx.data(0), - ents_idx[0].entries.unwrap(), - ents_idx[0].compression_type, - ) - .unwrap() - } - _ => match new_block_flags[t] { - true => { - seq += 1; - ctx.single_wait(seq).unwrap(); - LogBatch::decode_entries_block( - &ctx.data(seq), - i.entries.unwrap(), - i.compression_type, - ) - .unwrap() - } - false => decode_buf, - }, - }; - vec.push( - parse_from_bytes::( - &mut decode_buf - [(i.entry_offset) as usize..(i.entry_offset + i.entry_len) as usize], - ) - .unwrap(), - ); - } ENGINE_READ_ENTRY_COUNT_HISTOGRAM.observe(ents_idx.len() as f64); println!( "[fetch_entries_to_aio] (end) time cost: {:?} us", @@ -692,19 +629,6 @@ where }) } -pub(crate) fn submit_read_request_to_file

( - pipe_log: &P, - seq: usize, - ctx: &mut AioContext, - handle: FileBlockHandle, -) -> Result<()> -where - P: PipeLog, -{ - pipe_log.read_bytes_aio(seq, ctx, handle).unwrap(); - Ok(()) -} - #[cfg(test)] mod tests { use super::*; @@ -910,7 +834,7 @@ mod tests { #[test] fn test_async_read() { - let normal_batch_size = 8192; + let normal_batch_size = 4096; let compressed_batch_size = 5120; for &entry_size in &[normal_batch_size] { if entry_size == normal_batch_size { @@ -2162,11 +2086,15 @@ mod tests { type Handle = ::Handle; type Reader = ::Reader; type Writer = ::Writer; + type AsyncIoContext = AioContext; - // fn read_aio(&self, ctx: &mut AioContext, offset: u64) -> std::io::Result<()> - // { // todo!() - // Ok(()) - // } + fn new_async_reader( + &self, + handle: Arc, + ctx: &mut Self::AsyncIoContext, + ) -> std::io::Result<()> { + ctx.new_reader(handle) + } fn create>(&self, path: P) -> std::io::Result { let handle = self.inner.create(&path)?; @@ -2226,6 +2154,10 @@ mod tests { fn new_writer(&self, h: Arc) -> std::io::Result { self.inner.new_writer(h) } + + fn new_async_io_context(&self, block_sum: usize) -> std::io::Result { + todo!() + } } #[test] diff --git a/src/env/default.rs b/src/env/default.rs index 4380672d..c6eb0760 100644 --- a/src/env/default.rs +++ b/src/env/default.rs @@ -102,14 +102,13 @@ impl LogFd { Ok(readed) } - pub fn read_aio(&self, seq: usize, ctx: &mut AioContext, offset: u64) { + pub fn read_aio(&self, aior: &mut aiocb, len: usize, pbuf: *mut u8, offset: u64) { unsafe { - let aior = &mut ctx.aio_vec[seq]; aior.aio_fildes = self.0; aior.aio_reqprio = 0; aior.aio_sigevent = SigEvent::new(SigevNotify::SigevNone).sigevent(); - aior.aio_nbytes = ctx.buf_vec[seq].len() as usize; - aior.aio_buf = ctx.buf_vec[seq].as_mut_ptr() as *mut c_void; + aior.aio_nbytes = len; + aior.aio_buf = pbuf as *mut c_void; aior.aio_lio_opcode = libc::LIO_READ; aior.aio_offset = offset as off_t; libc::aio_read(aior); @@ -277,25 +276,52 @@ impl WriteExt for LogFile { } pub struct AioContext { + inner: Option>, + offset: u64, + index: usize, aio_vec: Vec, pub(crate) buf_vec: Vec>, } impl AioContext { pub fn new(block_sum: usize) -> Self { - let mut vec = vec![]; + let mut aio_vec = vec![]; + let mut buf_vec = vec![]; unsafe { for i in 0..block_sum { - vec.push(mem::zeroed::()); + aio_vec.push(mem::zeroed::()); } } Self { - aio_vec: vec, - buf_vec: vec![], + inner: None, + offset: 0, + index: 0, + aio_vec, + buf_vec, } } + + pub fn new_reader(&mut self, fd: Arc) -> IoResult<()> { + self.inner = Some(fd); + Ok(()) + } } impl AsyncContext for AioContext { + fn wait(&mut self) -> IoResult { + let mut total = 0; + for seq in 0..self.aio_vec.len() { + match self.single_wait(seq) { + Ok(len) => total += 1, + Err(e) => return Err(e), + } + } + Ok(total as usize) + } + + fn data(&self, seq: usize) -> Vec { + self.buf_vec[seq].to_vec() + } + fn single_wait(&mut self, seq: usize) -> IoResult { let buf_len = self.buf_vec[seq].len(); unsafe { @@ -312,19 +338,19 @@ impl AsyncContext for AioContext { } } - fn wait(&mut self) -> IoResult { - let mut total = 0; - for seq in 0..self.aio_vec.len() { - match self.single_wait(seq) { - Ok(len) => total += 1, - Err(e) => return Err(e), - } - } - Ok(total as usize) - } + fn submit_read_req(&mut self, buf: Vec, offset: u64) -> IoResult<()> { + let seq = self.index; + self.index += 1; + self.buf_vec.push(buf); - fn data(&self, seq: usize) -> Vec { - self.buf_vec[seq].to_vec() + self.inner.as_ref().unwrap().read_aio( + &mut self.aio_vec[seq], + self.buf_vec[seq].len(), + self.buf_vec[seq].as_mut_ptr(), + offset, + ); + + Ok(()) } } @@ -334,16 +360,14 @@ impl FileSystem for DefaultFileSystem { type Handle = LogFd; type Reader = LogFile; type Writer = LogFile; + type AsyncIoContext = AioContext; - fn read_aio( + fn new_async_reader( &self, handle: Arc, - seq: usize, - ctx: &mut AioContext, - offset: u64, + ctx: &mut Self::AsyncIoContext, ) -> IoResult<()> { - handle.read_aio(seq, ctx, offset); - Ok(()) + ctx.new_reader(handle) } fn create>(&self, path: P) -> IoResult { @@ -370,7 +394,7 @@ impl FileSystem for DefaultFileSystem { Ok(LogFile::new(handle)) } - fn new_async_context(&self, block_sum: usize) -> IoResult { + fn new_async_io_context(&self, block_sum: usize) -> IoResult { Ok(AioContext::new(block_sum)) } } diff --git a/src/env/mod.rs b/src/env/mod.rs index f60385af..838d7968 100644 --- a/src/env/mod.rs +++ b/src/env/mod.rs @@ -17,16 +17,13 @@ pub trait FileSystem: Send + Sync { type Handle: Send + Sync + Handle; type Reader: Seek + Read + Send; type Writer: Seek + Write + Send + WriteExt; + type AsyncIoContext: AsyncContext; - fn read_aio( + fn new_async_reader( &self, handle: Arc, - seq: usize, - ctx: &mut AioContext, - offset: u64, - ) -> Result<()> { - Ok(()) - } + ctx: &mut Self::AsyncIoContext, + ) -> Result<()>; fn create>(&self, path: P) -> Result; @@ -62,9 +59,7 @@ pub trait FileSystem: Send + Sync { fn new_writer(&self, handle: Arc) -> Result; - fn new_async_context(&self, block_sum: usize) -> Result { - Ok(AioContext::new(block_sum)) - } + fn new_async_io_context(&self, block_sum: usize) -> Result; } pub trait Handle { @@ -84,6 +79,8 @@ pub trait WriteExt { pub trait AsyncContext { fn wait(&mut self) -> Result; - fn single_wait(&mut self, seq: usize) -> Result; fn data(&self, seq: usize) -> Vec; + fn single_wait(&mut self, seq: usize) -> Result; + + fn submit_read_req(&mut self, buf: Vec, offset: u64) -> Result<()>; } diff --git a/src/env/obfuscated.rs b/src/env/obfuscated.rs index 2b7c0c62..30b5eec3 100644 --- a/src/env/obfuscated.rs +++ b/src/env/obfuscated.rs @@ -91,6 +91,15 @@ impl FileSystem for ObfuscatedFileSystem { type Handle = ::Handle; type Reader = ObfuscatedReader; type Writer = ObfuscatedWriter; + type AsyncIoContext = AioContext; + + fn new_async_reader( + &self, + handle: Arc, + ctx: &mut Self::AsyncIoContext, + ) -> IoResult<()> { + ctx.new_reader(handle) + } fn create>(&self, path: P) -> IoResult { let r = self.inner.create(path); @@ -129,4 +138,8 @@ impl FileSystem for ObfuscatedFileSystem { fn new_writer(&self, handle: Arc) -> IoResult { Ok(ObfuscatedWriter(self.inner.new_writer(handle)?)) } + + fn new_async_io_context(&self, block_sum: usize) -> IoResult { + Ok(AioContext::new(block_sum)) + } } diff --git a/src/file_pipe_log/pipe.rs b/src/file_pipe_log/pipe.rs index c5c9db72..34a8c3fb 100644 --- a/src/file_pipe_log/pipe.rs +++ b/src/file_pipe_log/pipe.rs @@ -11,15 +11,17 @@ use fail::fail_point; use libc::aiocb; use log::error; use parking_lot::{Mutex, MutexGuard, RwLock}; +use protobuf::{parse_from_bytes, Message}; use crate::config::Config; -use crate::env::{AioContext, DefaultFileSystem, FileSystem}; +use crate::env::{AioContext, AsyncContext, DefaultFileSystem, FileSystem}; use crate::event_listener::EventListener; +use crate::memtable::EntryIndex; use crate::metrics::*; use crate::pipe_log::{ FileBlockHandle, FileId, FileSeq, LogFileContext, LogQueue, PipeLog, ReactiveBytes, }; -use crate::{perf_context, Error, Result}; +use crate::{perf_context, Error, LogBatch, MessageExt, Result}; use super::format::{FileNameExt, LogFileFormat}; use super::log_file::{build_file_reader, build_file_writer, LogFileWriter}; @@ -350,22 +352,15 @@ impl SinglePipe { reader.read(handle) } - fn read_bytes_aio( + fn submit_read_req( &self, - seq: usize, - ctx: &mut AioContext, - handle: FileBlockHandle, - ) -> Result<()> { - unsafe { - let fd = self.get_fd(handle.id.seq)?; - let mut buf = vec![0 as u8; handle.len]; - ctx.buf_vec.push(buf); - self.file_system - .as_ref() - .read_aio(fd, seq, ctx, handle.offset) - .unwrap(); - return Ok(()); - } + handle: &mut FileBlockHandle, + ctx: &mut F::AsyncIoContext, + ) { + let fd = self.get_fd(handle.id.seq).unwrap(); + let mut buf = vec![0 as u8; handle.len]; + self.file_system.as_ref().new_async_reader(fd, ctx).unwrap(); + ctx.submit_read_req(buf, handle.offset).unwrap(); } fn append(&self, bytes: &mut T) -> Result { @@ -533,19 +528,135 @@ impl PipeLog for DualPipes { fn read_bytes(&self, handle: FileBlockHandle) -> Result> { self.pipes[handle.id.queue as usize].read_bytes(handle) } - #[inline] - fn read_bytes_aio( + fn async_entry_read>( &self, - seq: usize, - ctx: &mut AioContext, - handle: FileBlockHandle, + ents_idx: &mut Vec, + vec: &mut Vec, ) -> Result<()> { - self.pipes[handle.id.queue as usize] - .read_bytes_aio(seq, ctx, handle) + let mut handles: Vec = vec![]; + for (t, i) in ents_idx.iter().enumerate() { + if t == 0 || (i.entries.unwrap() != ents_idx[t - 1].entries.unwrap()) { + handles.push(i.entries.unwrap()); + } + } + + let mut ctx_append = self.pipes[LogQueue::Append as usize] + .file_system + .new_async_io_context(handles.len() as usize) .unwrap(); + let mut ctx_rewrite = self.pipes[LogQueue::Rewrite as usize] + .file_system + .new_async_io_context(handles.len() as usize) + .unwrap(); + + for handle in handles.iter_mut() { + match handle.id.queue { + LogQueue::Append => { + self.pipes[LogQueue::Append as usize].submit_read_req( + handle, + &mut ctx_append, + ); + } + LogQueue::Rewrite => { + self.pipes[LogQueue::Rewrite as usize].submit_read_req( + handle, + &mut ctx_rewrite, + ); + } + } + } + + let mut decode_buf = vec![]; + let mut seq_append: i32 = -1; + let mut seq_rewrite: i32 = -1; + + for (t, i) in ents_idx.iter().enumerate() { + decode_buf = + match t == 0 || ents_idx[t - 1].entries.unwrap() != ents_idx[t].entries.unwrap() { + true => match ents_idx[t].entries.unwrap().id.queue { + LogQueue::Append => { + seq_append += 1; + ctx_append.single_wait(seq_append as usize).unwrap(); + LogBatch::decode_entries_block( + &ctx_append.data(seq_append as usize), + i.entries.unwrap(), + i.compression_type, + ) + .unwrap() + } + LogQueue::Rewrite => { + seq_rewrite += 1; + ctx_rewrite.single_wait(seq_rewrite as usize).unwrap(); + LogBatch::decode_entries_block( + &ctx_rewrite.data(seq_rewrite as usize), + i.entries.unwrap(), + i.compression_type, + ) + .unwrap() + } + }, + false => decode_buf, + }; + + vec.push( + parse_from_bytes::( + &mut decode_buf + [(i.entry_offset) as usize..(i.entry_offset + i.entry_len) as usize], + ) + .unwrap(), + ); + } Ok(()) } + #[inline] + fn async_read_bytes(&self, handles: &mut Vec) -> Result>> { + let mut res: Vec> = vec![]; + + let mut ctx_append = self.pipes[LogQueue::Append as usize] + .file_system + .new_async_io_context(handles.len() as usize) + .unwrap(); + let mut ctx_rewrite = self.pipes[LogQueue::Rewrite as usize] + .file_system + .new_async_io_context(handles.len() as usize) + .unwrap(); + + for (seq, handle) in handles.iter_mut().enumerate() { + match handle.id.queue { + LogQueue::Append => { + self.pipes[LogQueue::Append as usize].submit_read_req( + handle, + &mut ctx_append, + ); + } + LogQueue::Rewrite => { + self.pipes[LogQueue::Rewrite as usize].submit_read_req( + handle, + &mut ctx_rewrite, + ); + } + } + } + let mut seq_append = 0; + let mut seq_rewrite = 0; + for handle in handles.iter_mut(){ + match handle.id.queue { + LogQueue::Append => { + ctx_append.single_wait(seq_append); + res.push(ctx_append.data(seq_append)); + seq_append += 1; + } + LogQueue::Rewrite => { + ctx_rewrite.single_wait(seq_rewrite); + res.push(ctx_rewrite.data(seq_rewrite)); + seq_rewrite += 1; + } + } + } + + Ok(res) + } #[inline] fn append( diff --git a/src/pipe_log.rs b/src/pipe_log.rs index dc06554f..c85e667e 100644 --- a/src/pipe_log.rs +++ b/src/pipe_log.rs @@ -6,14 +6,16 @@ use std::cmp::Ordering; use std::fmt::{self, Display}; use crate::env::AioContext; +use crate::memtable::EntryIndex; use fail::fail_point; use libc::aiocb; use num_derive::{FromPrimitive, ToPrimitive}; use num_traits::ToPrimitive; +use protobuf::Message; use serde_repr::{Deserialize_repr, Serialize_repr}; use strum::EnumIter; -use crate::Result; +use crate::{MessageExt, Result}; /// The type of log queue. #[repr(u8)] @@ -174,12 +176,19 @@ pub trait PipeLog: Sized { /// Reads some bytes from the specified position. fn read_bytes(&self, handle: FileBlockHandle) -> Result>; - fn read_bytes_aio( + /// Read entries from pipe logs using 'Async IO'. + fn async_entry_read>( &self, - seq: usize, - ctx: &mut AioContext, - handle: FileBlockHandle, + ents_idx: &mut Vec, + vec: &mut Vec, ) -> Result<()>; + + /// Reads bytes from multi blocks using 'Async IO'. + fn async_read_bytes( + &self, + handles: &mut Vec, + ) -> Result>>; + /// Appends some bytes to the specified log queue. Returns file position of /// the written bytes. fn append(