diff --git a/src/engine.rs b/src/engine.rs index 9a6984eb..fc2c0f03 100644 --- a/src/engine.rs +++ b/src/engine.rs @@ -380,7 +380,7 @@ where 0 => { ctx.single_wait(0).unwrap(); LogBatch::decode_entries_block( - &ctx.data(0).lock().unwrap(), + &ctx.data(0), ents_idx[0].entries.unwrap(), ents_idx[0].compression_type, ) @@ -391,7 +391,7 @@ where seq += 1; ctx.single_wait(seq).unwrap(); LogBatch::decode_entries_block( - &ctx.data(seq).lock().unwrap(), + &ctx.data(seq), i.entries.unwrap(), i.compression_type, ) diff --git a/src/env/default.rs b/src/env/default.rs index 7295f7b2..4380672d 100644 --- a/src/env/default.rs +++ b/src/env/default.rs @@ -103,17 +103,16 @@ impl LogFd { } pub fn read_aio(&self, seq: usize, ctx: &mut AioContext, offset: u64) { - let mut buf = ctx.buf_vec.last().unwrap().lock().unwrap(); unsafe { let aior = &mut ctx.aio_vec[seq]; aior.aio_fildes = self.0; - aior.aio_buf = buf.as_mut_ptr() as *mut c_void; aior.aio_reqprio = 0; aior.aio_sigevent = SigEvent::new(SigevNotify::SigevNone).sigevent(); - aior.aio_nbytes = buf.len() as usize; + aior.aio_nbytes = ctx.buf_vec[seq].len() as usize; + aior.aio_buf = ctx.buf_vec[seq].as_mut_ptr() as *mut c_void; aior.aio_lio_opcode = libc::LIO_READ; aior.aio_offset = offset as off_t; - libc::aio_read(&mut ctx.aio_vec[seq]); + libc::aio_read(aior); } } @@ -279,7 +278,7 @@ impl WriteExt for LogFile { pub struct AioContext { aio_vec: Vec, - pub(crate) buf_vec: Vec>>>, + pub(crate) buf_vec: Vec>, } impl AioContext { pub fn new(block_sum: usize) -> Self { @@ -298,7 +297,7 @@ impl AioContext { impl AsyncContext for AioContext { fn single_wait(&mut self, seq: usize) -> IoResult { - let buf_len = self.buf_vec[seq].lock().unwrap().len(); + let buf_len = self.buf_vec[seq].len(); unsafe { loop { libc::aio_suspend( @@ -324,8 +323,8 @@ impl AsyncContext for AioContext { Ok(total as usize) } - fn data(&self, seq: usize) -> Arc>> { - self.buf_vec[seq].clone() + fn data(&self, seq: usize) -> Vec { + self.buf_vec[seq].to_vec() } } diff --git a/src/env/mod.rs b/src/env/mod.rs index 70fb2317..f60385af 100644 --- a/src/env/mod.rs +++ b/src/env/mod.rs @@ -85,5 +85,5 @@ pub trait WriteExt { pub trait AsyncContext { fn wait(&mut self) -> Result; fn single_wait(&mut self, seq: usize) -> Result; - fn data(&self, seq: usize) -> Arc>>; + fn data(&self, seq: usize) -> Vec; } diff --git a/src/file_pipe_log/pipe.rs b/src/file_pipe_log/pipe.rs index 8ccbcc9b..c5c9db72 100644 --- a/src/file_pipe_log/pipe.rs +++ b/src/file_pipe_log/pipe.rs @@ -358,8 +358,7 @@ impl SinglePipe { ) -> Result<()> { unsafe { let fd = self.get_fd(handle.id.seq)?; - let buf = vec![0 as u8; handle.len]; - let buf = Arc::new(SyncMutex::new(buf)); + let mut buf = vec![0 as u8; handle.len]; ctx.buf_vec.push(buf); self.file_system .as_ref()