Skip to content

Commit

Permalink
Code structure optimization (rough)
Browse files Browse the repository at this point in the history
  • Loading branch information
ustc-wxy committed Feb 25, 2023
1 parent b6fbaae commit 2d16b58
Show file tree
Hide file tree
Showing 6 changed files with 213 additions and 158 deletions.
54 changes: 44 additions & 10 deletions src/engine.rs
Original file line number Diff line number Diff line change
Expand Up @@ -341,9 +341,8 @@ where
start.elapsed().as_micros()
);

self.pipe_log
.async_entry_read::<M>(&mut ents_idx, vec)
.unwrap();
let bytes = self.pipe_log.async_read_bytes(&mut ents_idx).unwrap();
parse_entries_from_bytes::<M>(bytes, &mut ents_idx, vec);

ENGINE_READ_ENTRY_COUNT_HISTOGRAM.observe(ents_idx.len() as f64);
println!(
Expand Down Expand Up @@ -582,7 +581,36 @@ impl BlockCache {
thread_local! {
static BLOCK_CACHE: BlockCache = BlockCache::new();
}

pub(crate) fn parse_entries_from_bytes<M: MessageExt>(
bytes: Vec<Vec<u8>>,
ents_idx: &mut Vec<EntryIndex>,
vec: &mut Vec<M::Entry>,
) {
let mut decode_buf = vec![];
let mut seq: i32 = -1;
for (t, idx) in ents_idx.iter().enumerate() {
decode_buf =
match t == 0 || ents_idx[t - 1].entries.unwrap() != ents_idx[t].entries.unwrap() {
true => {
seq += 1;
bytes[seq as usize].to_vec()
}
false => decode_buf,
};
vec.push(
parse_from_bytes(
&LogBatch::decode_entries_block(
&decode_buf,
idx.entries.unwrap(),
idx.compression_type,
)
.unwrap()
[idx.entry_offset as usize..(idx.entry_offset + idx.entry_len) as usize],
)
.unwrap(),
);
}
}
pub(crate) fn read_entry_from_file<M, P>(pipe_log: &P, idx: &EntryIndex) -> Result<M::Entry>
where
M: MessageExt,
Expand Down Expand Up @@ -834,7 +862,7 @@ mod tests {

#[test]
fn test_async_read() {
let normal_batch_size = 4096;
let normal_batch_size = 10;
let compressed_batch_size = 5120;
for &entry_size in &[normal_batch_size] {
if entry_size == normal_batch_size {
Expand Down Expand Up @@ -2086,14 +2114,20 @@ mod tests {
type Handle = <ObfuscatedFileSystem as FileSystem>::Handle;
type Reader = <ObfuscatedFileSystem as FileSystem>::Reader;
type Writer = <ObfuscatedFileSystem as FileSystem>::Writer;
type AsyncIoContext = AioContext;
type AsyncIoContext = <ObfuscatedFileSystem as FileSystem>::AsyncIoContext;

fn new_async_reader(
fn async_read(
&self,
handle: Arc<Self::Handle>,
ctx: &mut Self::AsyncIoContext,
handle: Arc<Self::Handle>,
buf: Vec<u8>,
block: &mut FileBlockHandle,
) -> std::io::Result<()> {
ctx.new_reader(handle)
todo!()
}

fn async_finish(&self, ctx: &mut Self::AsyncIoContext) -> std::io::Result<Vec<Vec<u8>>> {
todo!()
}

fn create<P: AsRef<Path>>(&self, path: P) -> std::io::Result<Self::Handle> {
Expand Down Expand Up @@ -2156,7 +2190,7 @@ mod tests {
}

fn new_async_io_context(&self, block_sum: usize) -> std::io::Result<Self::AsyncIoContext> {
todo!()
self.inner.new_async_io_context(block_sum)
}
}

Expand Down
31 changes: 23 additions & 8 deletions src/env/default.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
// Copyright (c) 2017-present, PingCAP, Inc. Licensed under Apache-2.0.

use std::ffi::c_void;
use std::io::{Read, Result as IoResult, Seek, SeekFrom, Write};
use std::io::{Read, Result as IoResult, Seek, SeekFrom, Write,Error, ErrorKind};
use std::os::unix::io::RawFd;
use std::path::Path;
use std::sync::{Arc, Mutex};
Expand All @@ -19,7 +19,9 @@ use nix::unistd::{close, ftruncate, lseek, Whence};
use nix::NixPath;

use crate::env::{AsyncContext, FileSystem, Handle, WriteExt};
use crate::Error;
use crate::pipe_log::FileBlockHandle;

const MAX_ASYNC_READ_TRY_TIME:usize = 10;

fn from_nix_error(e: nix::Error, custom: &'static str) -> std::io::Error {
let kind = std::io::Error::from(e).kind();
Expand Down Expand Up @@ -300,9 +302,8 @@ impl AioContext {
}
}

pub fn new_reader(&mut self, fd: Arc<LogFd>) -> IoResult<()> {
pub fn set_fd(&mut self, fd: Arc<LogFd>) {
self.inner = Some(fd);
Ok(())
}
}

Expand All @@ -324,8 +325,9 @@ impl AsyncContext for AioContext {

fn single_wait(&mut self, seq: usize) -> IoResult<usize> {
let buf_len = self.buf_vec[seq].len();

unsafe {
loop {
for _ in 0..MAX_ASYNC_READ_TRY_TIME{
libc::aio_suspend(
vec![&mut self.aio_vec[seq]].as_ptr() as *const *const aiocb,
1 as i32,
Expand All @@ -336,6 +338,7 @@ impl AsyncContext for AioContext {
}
}
}
Err(Error::new(ErrorKind::Other, "Async IO panic."))
}

fn submit_read_req(&mut self, buf: Vec<u8>, offset: u64) -> IoResult<()> {
Expand All @@ -362,12 +365,24 @@ impl FileSystem for DefaultFileSystem {
type Writer = LogFile;
type AsyncIoContext = AioContext;

fn new_async_reader(
fn async_read(
&self,
handle: Arc<Self::Handle>,
ctx: &mut Self::AsyncIoContext,
handle: Arc<Self::Handle>,
buf: Vec<u8>,
block: &mut FileBlockHandle,
) -> IoResult<()> {
ctx.new_reader(handle)
ctx.set_fd(handle);
ctx.submit_read_req(buf, block.offset)
}

fn async_finish(&self, ctx: &mut Self::AsyncIoContext) -> IoResult<Vec<Vec<u8>>> {
let mut res = vec![];
for seq in 0..ctx.index {
ctx.single_wait(seq);
res.push(ctx.data(seq).to_vec());
}
Ok(res)
}

fn create<P: AsRef<Path>>(&self, path: P) -> IoResult<Self::Handle> {
Expand Down
8 changes: 6 additions & 2 deletions src/env/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,18 +12,22 @@ pub use default::AioContext;
pub use default::DefaultFileSystem;
pub use obfuscated::ObfuscatedFileSystem;

use crate::pipe_log::FileBlockHandle;
/// FileSystem
pub trait FileSystem: Send + Sync {
type Handle: Send + Sync + Handle;
type Reader: Seek + Read + Send;
type Writer: Seek + Write + Send + WriteExt;
type AsyncIoContext: AsyncContext;

fn new_async_reader(
fn async_read(
&self,
handle: Arc<Self::Handle>,
ctx: &mut Self::AsyncIoContext,
handle: Arc<Self::Handle>,
buf: Vec<u8>,
block: &mut FileBlockHandle,
) -> Result<()>;
fn async_finish(&self, ctx: &mut Self::AsyncIoContext) -> Result<Vec<Vec<u8>>>;

fn create<P: AsRef<Path>>(&self, path: P) -> Result<Self::Handle>;

Expand Down
48 changes: 43 additions & 5 deletions src/env/obfuscated.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,8 @@ use std::sync::{Arc, Mutex};
use crate::env::default::AioContext;
use crate::env::{DefaultFileSystem, FileSystem, WriteExt};

use super::AsyncContext;
use crate::pipe_log::FileBlockHandle;
pub struct ObfuscatedReader(<DefaultFileSystem as FileSystem>::Reader);

impl Read for ObfuscatedReader {
Expand Down Expand Up @@ -86,19 +88,53 @@ impl ObfuscatedFileSystem {
self.files.load(Ordering::Relaxed)
}
}
pub struct ObfuscatedContext(<DefaultFileSystem as FileSystem>::AsyncIoContext);
impl AsyncContext for ObfuscatedContext {
fn wait(&mut self) -> IoResult<usize> {
self.0.wait()
}

fn data(&self, seq: usize) -> Vec<u8> {
self.0.data(seq)
}

fn single_wait(&mut self, seq: usize) -> IoResult<usize> {
self.0.single_wait(seq)
}

fn submit_read_req(&mut self, buf: Vec<u8>, offset: u64) -> IoResult<()> {
self.0.submit_read_req(buf, offset)
}
}

impl FileSystem for ObfuscatedFileSystem {
type Handle = <DefaultFileSystem as FileSystem>::Handle;
type Reader = ObfuscatedReader;
type Writer = ObfuscatedWriter;
type AsyncIoContext = AioContext;
type AsyncIoContext = ObfuscatedContext;

fn new_async_reader(
fn async_read(
&self,
handle: Arc<Self::Handle>,
ctx: &mut Self::AsyncIoContext,
handle: Arc<Self::Handle>,
buf: Vec<u8>,
block: &mut FileBlockHandle,
) -> IoResult<()> {
ctx.new_reader(handle)
self.inner.async_read(&mut ctx.0, handle, buf, block)
}

fn async_finish(&self, ctx: &mut Self::AsyncIoContext) -> IoResult<Vec<Vec<u8>>> {
let base = self.inner.async_finish(&mut ctx.0).unwrap();
let mut res = vec![];
for v in base {
let mut temp = vec![];
//do obfuscation.
for c in v {
temp.push(c.wrapping_sub(1));
}
res.push(temp);
}
Ok(res)
}

fn create<P: AsRef<Path>>(&self, path: P) -> IoResult<Self::Handle> {
Expand Down Expand Up @@ -140,6 +176,8 @@ impl FileSystem for ObfuscatedFileSystem {
}

fn new_async_io_context(&self, block_sum: usize) -> IoResult<Self::AsyncIoContext> {
Ok(AioContext::new(block_sum))
Ok(ObfuscatedContext(
self.inner.new_async_io_context(block_sum)?,
))
}
}
Loading

0 comments on commit 2d16b58

Please sign in to comment.