From 2d16b5815d00b66cf857baa3e56b7af4a1375147 Mon Sep 17 00:00:00 2001 From: root <1019495690@qq.com> Date: Sun, 26 Feb 2023 02:29:15 +0800 Subject: [PATCH] Code structure optimization (rough) --- src/engine.rs | 54 ++++++++-- src/env/default.rs | 31 ++++-- src/env/mod.rs | 8 +- src/env/obfuscated.rs | 48 ++++++++- src/file_pipe_log/pipe.rs | 218 +++++++++++++++++--------------------- src/pipe_log.rs | 12 +-- 6 files changed, 213 insertions(+), 158 deletions(-) diff --git a/src/engine.rs b/src/engine.rs index c9d515c0..ef5f28f5 100644 --- a/src/engine.rs +++ b/src/engine.rs @@ -341,9 +341,8 @@ where start.elapsed().as_micros() ); - self.pipe_log - .async_entry_read::(&mut ents_idx, vec) - .unwrap(); + let bytes = self.pipe_log.async_read_bytes(&mut ents_idx).unwrap(); + parse_entries_from_bytes::(bytes, &mut ents_idx, vec); ENGINE_READ_ENTRY_COUNT_HISTOGRAM.observe(ents_idx.len() as f64); println!( @@ -582,7 +581,36 @@ impl BlockCache { thread_local! { static BLOCK_CACHE: BlockCache = BlockCache::new(); } - +pub(crate) fn parse_entries_from_bytes( + bytes: Vec>, + ents_idx: &mut Vec, + vec: &mut Vec, +) { + let mut decode_buf = vec![]; + let mut seq: i32 = -1; + for (t, idx) in ents_idx.iter().enumerate() { + decode_buf = + match t == 0 || ents_idx[t - 1].entries.unwrap() != ents_idx[t].entries.unwrap() { + true => { + seq += 1; + bytes[seq as usize].to_vec() + } + false => decode_buf, + }; + vec.push( + parse_from_bytes( + &LogBatch::decode_entries_block( + &decode_buf, + idx.entries.unwrap(), + idx.compression_type, + ) + .unwrap() + [idx.entry_offset as usize..(idx.entry_offset + idx.entry_len) as usize], + ) + .unwrap(), + ); + } +} pub(crate) fn read_entry_from_file(pipe_log: &P, idx: &EntryIndex) -> Result where M: MessageExt, @@ -834,7 +862,7 @@ mod tests { #[test] fn test_async_read() { - let normal_batch_size = 4096; + let normal_batch_size = 10; let compressed_batch_size = 5120; for &entry_size in &[normal_batch_size] { if entry_size == normal_batch_size { @@ -2086,14 +2114,20 @@ mod tests { type Handle = ::Handle; type Reader = ::Reader; type Writer = ::Writer; - type AsyncIoContext = AioContext; + type AsyncIoContext = ::AsyncIoContext; - fn new_async_reader( + fn async_read( &self, - handle: Arc, ctx: &mut Self::AsyncIoContext, + handle: Arc, + buf: Vec, + block: &mut FileBlockHandle, ) -> std::io::Result<()> { - ctx.new_reader(handle) + todo!() + } + + fn async_finish(&self, ctx: &mut Self::AsyncIoContext) -> std::io::Result>> { + todo!() } fn create>(&self, path: P) -> std::io::Result { @@ -2156,7 +2190,7 @@ mod tests { } fn new_async_io_context(&self, block_sum: usize) -> std::io::Result { - todo!() + self.inner.new_async_io_context(block_sum) } } diff --git a/src/env/default.rs b/src/env/default.rs index c6eb0760..d7821c69 100644 --- a/src/env/default.rs +++ b/src/env/default.rs @@ -1,7 +1,7 @@ // Copyright (c) 2017-present, PingCAP, Inc. Licensed under Apache-2.0. use std::ffi::c_void; -use std::io::{Read, Result as IoResult, Seek, SeekFrom, Write}; +use std::io::{Read, Result as IoResult, Seek, SeekFrom, Write,Error, ErrorKind}; use std::os::unix::io::RawFd; use std::path::Path; use std::sync::{Arc, Mutex}; @@ -19,7 +19,9 @@ use nix::unistd::{close, ftruncate, lseek, Whence}; use nix::NixPath; use crate::env::{AsyncContext, FileSystem, Handle, WriteExt}; -use crate::Error; +use crate::pipe_log::FileBlockHandle; + +const MAX_ASYNC_READ_TRY_TIME:usize = 10; fn from_nix_error(e: nix::Error, custom: &'static str) -> std::io::Error { let kind = std::io::Error::from(e).kind(); @@ -300,9 +302,8 @@ impl AioContext { } } - pub fn new_reader(&mut self, fd: Arc) -> IoResult<()> { + pub fn set_fd(&mut self, fd: Arc) { self.inner = Some(fd); - Ok(()) } } @@ -324,8 +325,9 @@ impl AsyncContext for AioContext { fn single_wait(&mut self, seq: usize) -> IoResult { let buf_len = self.buf_vec[seq].len(); + unsafe { - loop { + for _ in 0..MAX_ASYNC_READ_TRY_TIME{ libc::aio_suspend( vec![&mut self.aio_vec[seq]].as_ptr() as *const *const aiocb, 1 as i32, @@ -336,6 +338,7 @@ impl AsyncContext for AioContext { } } } + Err(Error::new(ErrorKind::Other, "Async IO panic.")) } fn submit_read_req(&mut self, buf: Vec, offset: u64) -> IoResult<()> { @@ -362,12 +365,24 @@ impl FileSystem for DefaultFileSystem { type Writer = LogFile; type AsyncIoContext = AioContext; - fn new_async_reader( + fn async_read( &self, - handle: Arc, ctx: &mut Self::AsyncIoContext, + handle: Arc, + buf: Vec, + block: &mut FileBlockHandle, ) -> IoResult<()> { - ctx.new_reader(handle) + ctx.set_fd(handle); + ctx.submit_read_req(buf, block.offset) + } + + fn async_finish(&self, ctx: &mut Self::AsyncIoContext) -> IoResult>> { + let mut res = vec![]; + for seq in 0..ctx.index { + ctx.single_wait(seq); + res.push(ctx.data(seq).to_vec()); + } + Ok(res) } fn create>(&self, path: P) -> IoResult { diff --git a/src/env/mod.rs b/src/env/mod.rs index 838d7968..e7ffca76 100644 --- a/src/env/mod.rs +++ b/src/env/mod.rs @@ -12,6 +12,7 @@ pub use default::AioContext; pub use default::DefaultFileSystem; pub use obfuscated::ObfuscatedFileSystem; +use crate::pipe_log::FileBlockHandle; /// FileSystem pub trait FileSystem: Send + Sync { type Handle: Send + Sync + Handle; @@ -19,11 +20,14 @@ pub trait FileSystem: Send + Sync { type Writer: Seek + Write + Send + WriteExt; type AsyncIoContext: AsyncContext; - fn new_async_reader( + fn async_read( &self, - handle: Arc, ctx: &mut Self::AsyncIoContext, + handle: Arc, + buf: Vec, + block: &mut FileBlockHandle, ) -> Result<()>; + fn async_finish(&self, ctx: &mut Self::AsyncIoContext) -> Result>>; fn create>(&self, path: P) -> Result; diff --git a/src/env/obfuscated.rs b/src/env/obfuscated.rs index 30b5eec3..ebc3bcfe 100644 --- a/src/env/obfuscated.rs +++ b/src/env/obfuscated.rs @@ -9,6 +9,8 @@ use std::sync::{Arc, Mutex}; use crate::env::default::AioContext; use crate::env::{DefaultFileSystem, FileSystem, WriteExt}; +use super::AsyncContext; +use crate::pipe_log::FileBlockHandle; pub struct ObfuscatedReader(::Reader); impl Read for ObfuscatedReader { @@ -86,19 +88,53 @@ impl ObfuscatedFileSystem { self.files.load(Ordering::Relaxed) } } +pub struct ObfuscatedContext(::AsyncIoContext); +impl AsyncContext for ObfuscatedContext { + fn wait(&mut self) -> IoResult { + self.0.wait() + } + + fn data(&self, seq: usize) -> Vec { + self.0.data(seq) + } + + fn single_wait(&mut self, seq: usize) -> IoResult { + self.0.single_wait(seq) + } + + fn submit_read_req(&mut self, buf: Vec, offset: u64) -> IoResult<()> { + self.0.submit_read_req(buf, offset) + } +} impl FileSystem for ObfuscatedFileSystem { type Handle = ::Handle; type Reader = ObfuscatedReader; type Writer = ObfuscatedWriter; - type AsyncIoContext = AioContext; + type AsyncIoContext = ObfuscatedContext; - fn new_async_reader( + fn async_read( &self, - handle: Arc, ctx: &mut Self::AsyncIoContext, + handle: Arc, + buf: Vec, + block: &mut FileBlockHandle, ) -> IoResult<()> { - ctx.new_reader(handle) + self.inner.async_read(&mut ctx.0, handle, buf, block) + } + + fn async_finish(&self, ctx: &mut Self::AsyncIoContext) -> IoResult>> { + let base = self.inner.async_finish(&mut ctx.0).unwrap(); + let mut res = vec![]; + for v in base { + let mut temp = vec![]; + //do obfuscation. + for c in v { + temp.push(c.wrapping_sub(1)); + } + res.push(temp); + } + Ok(res) } fn create>(&self, path: P) -> IoResult { @@ -140,6 +176,8 @@ impl FileSystem for ObfuscatedFileSystem { } fn new_async_io_context(&self, block_sum: usize) -> IoResult { - Ok(AioContext::new(block_sum)) + Ok(ObfuscatedContext( + self.inner.new_async_io_context(block_sum)?, + )) } } diff --git a/src/file_pipe_log/pipe.rs b/src/file_pipe_log/pipe.rs index 34a8c3fb..7cef2fa6 100644 --- a/src/file_pipe_log/pipe.rs +++ b/src/file_pipe_log/pipe.rs @@ -352,15 +352,11 @@ impl SinglePipe { reader.read(handle) } - fn submit_read_req( - &self, - handle: &mut FileBlockHandle, - ctx: &mut F::AsyncIoContext, - ) { - let fd = self.get_fd(handle.id.seq).unwrap(); - let mut buf = vec![0 as u8; handle.len]; - self.file_system.as_ref().new_async_reader(fd, ctx).unwrap(); - ctx.submit_read_req(buf, handle.offset).unwrap(); + fn async_read(&self, block: &mut FileBlockHandle, ctx: &mut F::AsyncIoContext) { + let fd = self.get_fd(block.id.seq).unwrap(); + let buf = vec![0 as u8; block.len]; + + self.file_system.async_read(ctx, fd, buf, block).unwrap(); } fn append(&self, bytes: &mut T) -> Result { @@ -528,133 +524,111 @@ impl PipeLog for DualPipes { fn read_bytes(&self, handle: FileBlockHandle) -> Result> { self.pipes[handle.id.queue as usize].read_bytes(handle) } + // #[inline] + // fn async_entry_read>( + // &self, + // ents_idx: &mut Vec, + // vec: &mut Vec, + // ) -> Result<()> { + // let mut handles: Vec = vec![]; + // for (t, i) in ents_idx.iter().enumerate() { + // if t == 0 || (i.entries.unwrap() != ents_idx[t - 1].entries.unwrap()) + // { handles.push(i.entries.unwrap()); + // } + // } + + // let mut ctx_append = self.pipes[LogQueue::Append as usize] + // .file_system + // .new_async_io_context(handles.len() as usize) + // .unwrap(); + // let mut ctx_rewrite = self.pipes[LogQueue::Rewrite as usize] + // .file_system + // .new_async_io_context(handles.len() as usize) + // .unwrap(); + + // for handle in handles.iter_mut() { + // match handle.id.queue { + // LogQueue::Append => { + // self.pipes[LogQueue::Append as usize].submit_read_req( + // handle, + // &mut ctx_append, + // ); + // } + // LogQueue::Rewrite => { + // self.pipes[LogQueue::Rewrite as usize].submit_read_req( + // handle, + // &mut ctx_rewrite, + // ); + // } + // } + // } + + // let mut decode_buf = vec![]; + // let mut seq_append: i32 = -1; + // let mut seq_rewrite: i32 = -1; + + // for (t, i) in ents_idx.iter().enumerate() { + // decode_buf = + // match t == 0 || ents_idx[t - 1].entries.unwrap() != + // ents_idx[t].entries.unwrap() { true => match + // ents_idx[t].entries.unwrap().id.queue { + // LogQueue::Append => { seq_append += 1; + // ctx_append.single_wait(seq_append as usize).unwrap(); + // LogBatch::decode_entries_block( + // &ctx_append.data(seq_append as usize), + // i.entries.unwrap(), + // i.compression_type, + // ) + // .unwrap() + // } + // LogQueue::Rewrite => { + // seq_rewrite += 1; + // ctx_rewrite.single_wait(seq_rewrite as + // usize).unwrap(); LogBatch::decode_entries_block( + // &ctx_rewrite.data(seq_rewrite as usize), + // i.entries.unwrap(), + // i.compression_type, + // ) + // .unwrap() + // } + // }, + // false => decode_buf, + // }; + + // vec.push( + // parse_from_bytes::( + // &mut decode_buf + // [(i.entry_offset) as usize..(i.entry_offset + + // i.entry_len) as usize], ) + // .unwrap(), + // ); + // } + // Ok(()) + // } #[inline] - fn async_entry_read>( - &self, - ents_idx: &mut Vec, - vec: &mut Vec, - ) -> Result<()> { - let mut handles: Vec = vec![]; + fn async_read_bytes(&self, ents_idx: &mut Vec) -> Result>> { + let mut blocks: Vec = vec![]; for (t, i) in ents_idx.iter().enumerate() { if t == 0 || (i.entries.unwrap() != ents_idx[t - 1].entries.unwrap()) { - handles.push(i.entries.unwrap()); - } - } - - let mut ctx_append = self.pipes[LogQueue::Append as usize] - .file_system - .new_async_io_context(handles.len() as usize) - .unwrap(); - let mut ctx_rewrite = self.pipes[LogQueue::Rewrite as usize] - .file_system - .new_async_io_context(handles.len() as usize) - .unwrap(); - - for handle in handles.iter_mut() { - match handle.id.queue { - LogQueue::Append => { - self.pipes[LogQueue::Append as usize].submit_read_req( - handle, - &mut ctx_append, - ); - } - LogQueue::Rewrite => { - self.pipes[LogQueue::Rewrite as usize].submit_read_req( - handle, - &mut ctx_rewrite, - ); - } + blocks.push(i.entries.unwrap()); } } - - let mut decode_buf = vec![]; - let mut seq_append: i32 = -1; - let mut seq_rewrite: i32 = -1; - - for (t, i) in ents_idx.iter().enumerate() { - decode_buf = - match t == 0 || ents_idx[t - 1].entries.unwrap() != ents_idx[t].entries.unwrap() { - true => match ents_idx[t].entries.unwrap().id.queue { - LogQueue::Append => { - seq_append += 1; - ctx_append.single_wait(seq_append as usize).unwrap(); - LogBatch::decode_entries_block( - &ctx_append.data(seq_append as usize), - i.entries.unwrap(), - i.compression_type, - ) - .unwrap() - } - LogQueue::Rewrite => { - seq_rewrite += 1; - ctx_rewrite.single_wait(seq_rewrite as usize).unwrap(); - LogBatch::decode_entries_block( - &ctx_rewrite.data(seq_rewrite as usize), - i.entries.unwrap(), - i.compression_type, - ) - .unwrap() - } - }, - false => decode_buf, - }; - - vec.push( - parse_from_bytes::( - &mut decode_buf - [(i.entry_offset) as usize..(i.entry_offset + i.entry_len) as usize], - ) - .unwrap(), - ); - } - Ok(()) - } - #[inline] - fn async_read_bytes(&self, handles: &mut Vec) -> Result>> { let mut res: Vec> = vec![]; - let mut ctx_append = self.pipes[LogQueue::Append as usize] - .file_system - .new_async_io_context(handles.len() as usize) - .unwrap(); - let mut ctx_rewrite = self.pipes[LogQueue::Rewrite as usize] - .file_system - .new_async_io_context(handles.len() as usize) - .unwrap(); + let fs = &self.pipes[LogQueue::Append as usize].file_system; + let mut ctx = fs.new_async_io_context(blocks.len() as usize).unwrap(); - for (seq, handle) in handles.iter_mut().enumerate() { - match handle.id.queue { - LogQueue::Append => { - self.pipes[LogQueue::Append as usize].submit_read_req( - handle, - &mut ctx_append, - ); - } - LogQueue::Rewrite => { - self.pipes[LogQueue::Rewrite as usize].submit_read_req( - handle, - &mut ctx_rewrite, - ); - } - } - } - let mut seq_append = 0; - let mut seq_rewrite = 0; - for handle in handles.iter_mut(){ - match handle.id.queue { + for (seq, block) in blocks.iter_mut().enumerate() { + match block.id.queue { LogQueue::Append => { - ctx_append.single_wait(seq_append); - res.push(ctx_append.data(seq_append)); - seq_append += 1; + self.pipes[LogQueue::Append as usize].async_read(block, &mut ctx); } LogQueue::Rewrite => { - ctx_rewrite.single_wait(seq_rewrite); - res.push(ctx_rewrite.data(seq_rewrite)); - seq_rewrite += 1; + self.pipes[LogQueue::Rewrite as usize].async_read(block, &mut ctx); } } } - + let res = fs.async_finish(&mut ctx).unwrap(); Ok(res) } diff --git a/src/pipe_log.rs b/src/pipe_log.rs index c85e667e..7ef3cfce 100644 --- a/src/pipe_log.rs +++ b/src/pipe_log.rs @@ -176,18 +176,8 @@ pub trait PipeLog: Sized { /// Reads some bytes from the specified position. fn read_bytes(&self, handle: FileBlockHandle) -> Result>; - /// Read entries from pipe logs using 'Async IO'. - fn async_entry_read>( - &self, - ents_idx: &mut Vec, - vec: &mut Vec, - ) -> Result<()>; - /// Reads bytes from multi blocks using 'Async IO'. - fn async_read_bytes( - &self, - handles: &mut Vec, - ) -> Result>>; + fn async_read_bytes(&self, ents_idx: &mut Vec) -> Result>>; /// Appends some bytes to the specified log queue. Returns file position of /// the written bytes.