From 1c7a820b251e7d012f463e1aee722eac5b5a0440 Mon Sep 17 00:00:00 2001 From: "dependabot[bot]" <49699333+dependabot[bot]@users.noreply.github.com> Date: Mon, 3 Jul 2023 08:57:59 +0800 Subject: [PATCH 01/10] Update num-derive requirement from 0.3 to 0.4 (#321) Updates the requirements on [num-derive](https://github.com/rust-num/num-derive) to permit the latest version. - [Changelog](https://github.com/rust-num/num-derive/blob/master/RELEASES.md) - [Commits](https://github.com/rust-num/num-derive/compare/num-derive-0.3.0...num-derive-0.4.0) --- updated-dependencies: - dependency-name: num-derive dependency-type: direct:production ... Signed-off-by: dependabot[bot] Co-authored-by: dependabot[bot] <49699333+dependabot[bot]@users.noreply.github.com> --- Cargo.toml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/Cargo.toml b/Cargo.toml index ea8107df..54cf384e 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -46,7 +46,7 @@ log = { version = "0.4", features = ["max_level_trace", "release_max_level_debug lz4-sys = "1.9" memmap2 = { version = "0.7", optional = true } nix = "0.26" -num-derive = "0.3" +num-derive = "0.4" num-traits = "0.2" parking_lot = "0.12" prometheus = { version = "0.13" } From 58041ad3e69ae76941309733bba936743bf8b5a5 Mon Sep 17 00:00:00 2001 From: Xinye Tao Date: Fri, 21 Jul 2023 14:32:23 +0800 Subject: [PATCH 02/10] fix issues on latest nightly and improve feature tests (#325) --- .github/workflows/rust.yml | 7 +++---- Cargo.toml | 6 ++---- Makefile | 24 +++++++++++++++++------- src/codec.rs | 1 + src/memtable.rs | 1 + src/metrics.rs | 2 +- src/purge.rs | 10 ++++------ src/swappy_allocator.rs | 22 +++++++++++----------- 8 files changed, 40 insertions(+), 33 deletions(-) diff --git a/.github/workflows/rust.yml b/.github/workflows/rust.yml index c05c121d..2a5ce83d 100644 --- a/.github/workflows/rust.yml +++ b/.github/workflows/rust.yml @@ -19,7 +19,7 @@ jobs: uses: actions-rs/toolchain@v1 with: profile: minimal - toolchain: nightly-2023-01-01 + toolchain: nightly-2023-07-01 override: true components: rustfmt, clippy, rust-src - uses: Swatinem/rust-cache@v1 @@ -87,7 +87,7 @@ jobs: uses: actions-rs/toolchain@v1 with: profile: minimal - toolchain: nightly-2023-01-01 + toolchain: nightly-2023-07-01 override: true components: llvm-tools-preview - uses: Swatinem/rust-cache@v1 @@ -97,8 +97,7 @@ jobs: run: if [[ ! -e ~/.cargo/bin/grcov ]]; then cargo install --locked grcov; fi - name: Run tests run: | - make test - env WITH_STABLE_TOOLCHAIN=auto make test + make test_matrix env: RUSTFLAGS: '-Zinstrument-coverage' LLVM_PROFILE_FILE: '%p-%m.profraw' diff --git a/Cargo.toml b/Cargo.toml index 54cf384e..9a558bc6 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -72,6 +72,7 @@ tempfile = "3.1" toml = "0.7" [features] +default = ["internals", "scripting"] internals = [] nightly = [ "prometheus/nightly", @@ -87,10 +88,7 @@ swap = [ "memmap2", ] -# Shortcuts -all_except_failpoints = ["internals", "scripting", "nightly", "swap"] -all_stable = ["internals", "scripting", "failpoints"] -all_stable_except_failpoints = ["internals", "scripting"] +nightly_group = ["nightly", "swap"] [patch.crates-io] raft-proto = { git = "https://github.com/tikv/raft-rs", branch = "master" } diff --git a/Makefile b/Makefile index dc928e89..1f69789f 100644 --- a/Makefile +++ b/Makefile @@ -4,7 +4,7 @@ EXTRA_CARGO_ARGS ?= ## How to test stable toolchain. ## - auto: use current default toolchain, disable nightly features. -## - force: always use stable toolchain, disable nightly features. +## - force: explicitly use stable toolchain, disable nightly features. WITH_STABLE_TOOLCHAIN ?= WITH_NIGHTLY_FEATURES = @@ -44,19 +44,29 @@ format: ## Run clippy. clippy: ifdef WITH_NIGHTLY_FEATURES - cargo ${TOOLCHAIN_ARGS} clippy --all --all-features --all-targets -- -D clippy::all + cargo ${TOOLCHAIN_ARGS} clippy --all --features nightly_group,failpoints --all-targets -- -D clippy::all else - cargo ${TOOLCHAIN_ARGS} clippy --all --features all_stable --all-targets -- -D clippy::all + cargo ${TOOLCHAIN_ARGS} clippy --all --features failpoints --all-targets -- -D clippy::all endif ## Run tests. test: ifdef WITH_NIGHTLY_FEATURES - cargo ${TOOLCHAIN_ARGS} test --all --features all_except_failpoints ${EXTRA_CARGO_ARGS} -- --nocapture - cargo ${TOOLCHAIN_ARGS} test --test failpoints --all-features ${EXTRA_CARGO_ARGS} -- --test-threads 1 --nocapture + cargo ${TOOLCHAIN_ARGS} test --all --features nightly_group ${EXTRA_CARGO_ARGS} -- --nocapture + cargo ${TOOLCHAIN_ARGS} test --test failpoints --features nightly_group,failpoints ${EXTRA_CARGO_ARGS} -- --test-threads 1 --nocapture else - cargo ${TOOLCHAIN_ARGS} test --all --features all_stable_except_failpoints ${EXTRA_CARGO_ARGS} -- --nocapture - cargo ${TOOLCHAIN_ARGS} test --test failpoints --features all_stable ${EXTRA_CARGO_ARGS} -- --test-threads 1 --nocapture + cargo ${TOOLCHAIN_ARGS} test --all ${EXTRA_CARGO_ARGS} -- --nocapture + cargo ${TOOLCHAIN_ARGS} test --test failpoints --features failpoints ${EXTRA_CARGO_ARGS} -- --test-threads 1 --nocapture +endif + +## Run tests with various features for maximum code coverage. +ifndef WITH_NIGHTLY_FEATURES +test_matrix: + $(error Must run test matrix with nightly features. Please reset WITH_STABLE_TOOLCHAIN.) +else +test_matrix: test + cargo ${TOOLCHAIN_ARGS} test --all ${EXTRA_CARGO_ARGS} -- --nocapture + cargo ${TOOLCHAIN_ARGS} test --test failpoints --features failpoints ${EXTRA_CARGO_ARGS} -- --test-threads 1 --nocapture endif ## Build raft-engine-ctl. diff --git a/src/codec.rs b/src/codec.rs index af75b75b..b8b34b18 100644 --- a/src/codec.rs +++ b/src/codec.rs @@ -670,6 +670,7 @@ mod tests { decode_var_u64(&mut buf.as_slice()), ErrorKind::UnexpectedEof ); + check_error!(decode_var_u64(&mut [].as_slice()), ErrorKind::UnexpectedEof); buf.push(0); assert_eq!(0, decode_var_u64(&mut buf.as_slice()).unwrap()); diff --git a/src/memtable.rs b/src/memtable.rs index d46ba68b..dee9dcf4 100644 --- a/src/memtable.rs +++ b/src/memtable.rs @@ -2256,6 +2256,7 @@ mod tests { 7, FileId::new(LogQueue::Rewrite, 1), )); + memtable.replay_rewrite(Vec::new()); } } memtable diff --git a/src/metrics.rs b/src/metrics.rs index b2304b1a..6ca10940 100644 --- a/src/metrics.rs +++ b/src/metrics.rs @@ -105,7 +105,7 @@ where } } -#[macro_export(crate)] +#[macro_export] macro_rules! perf_context { ($field: ident) => { $crate::metrics::PerfContextField::new(|perf_context| &mut perf_context.$field) diff --git a/src/purge.rs b/src/purge.rs index cb76f776..7fd3b777 100644 --- a/src/purge.rs +++ b/src/purge.rs @@ -103,12 +103,10 @@ where }); // Ordering - // 1. Must rewrite tombstones AFTER acquiring - // `append_queue_barrier`, or deletion marks might be lost - // after restart. - // 2. Must rewrite tombstones BEFORE rewrite entries, or - // entries from recreated region might be lost after - // restart. + // 1. Must rewrite tombstones AFTER acquiring `append_queue_barrier`, or + // deletion marks might be lost after restart. + // 2. Must rewrite tombstones BEFORE rewrite entries, or entries from recreated + // region might be lost after restart. self.rewrite_append_queue_tombstones()?; should_compact.extend(self.rewrite_or_compact_append_queue( rewrite_watermark, diff --git a/src/swappy_allocator.rs b/src/swappy_allocator.rs index a6cacb21..c552005b 100644 --- a/src/swappy_allocator.rs +++ b/src/swappy_allocator.rs @@ -20,7 +20,7 @@ const DEFAULT_PAGE_SIZE: usize = 64 * 1024 * 1024; // 64MB struct SwappyAllocatorCore where - A: Allocator, + A: Allocator + Send + Sync, { budget: usize, path: PathBuf, @@ -39,9 +39,9 @@ where /// The allocations of its internal metadata are not managed (i.e. allocated via /// `std::alloc::Global`). Do NOT use it as the global allocator. #[derive(Clone)] -pub struct SwappyAllocator(Arc>); +pub struct SwappyAllocator(Arc>); -impl SwappyAllocator { +impl SwappyAllocator { pub fn new_over(path: &Path, budget: usize, alloc: A) -> SwappyAllocator { if path.exists() { if let Err(e) = std::fs::remove_dir_all(path) { @@ -106,7 +106,7 @@ impl SwappyAllocator { } } -unsafe impl Allocator for SwappyAllocator { +unsafe impl Allocator for SwappyAllocator { #[inline] fn allocate(&self, layout: Layout) -> Result, AllocError> { // Always use mem_allocator to allocate empty pointer. @@ -616,14 +616,14 @@ mod tests { assert_eq!(allocator.memory_usage(), 16); assert_eq!(global.stats(), (2, 1, 0, 0)); // Deallocate all pages, calls when memory use is low. - disk_vec.clear(); + disk_vec.truncate(1); disk_vec.shrink_to_fit(); - assert_eq!(allocator.memory_usage(), 16); + assert_eq!(allocator.memory_usage(), 16 + 1); assert_eq!(global.stats(), (3, 1, 0, 0)); assert_eq!(file_count(dir.path()), 0); // Grow calls now. mem_vec.resize(32, 0); - assert_eq!(allocator.memory_usage(), 32); + assert_eq!(allocator.memory_usage(), 32 + 1); assert_eq!(global.stats(), (3, 1, 1, 0)); } @@ -1134,7 +1134,7 @@ mod tests { { // issue-58952 let c = 2; - let bv = vec![2]; + let bv = [2]; let b = bv.iter().filter(|a| **a == c); let _a = collect( @@ -1159,8 +1159,8 @@ mod tests { } { // issue-54477 - let mut vecdeque_13 = collect(vec![].into_iter(), allocator.clone()); - let mut vecdeque_29 = collect(vec![0].into_iter(), allocator.clone()); + let mut vecdeque_13 = collect(vec![], allocator.clone()); + let mut vecdeque_29 = collect(vec![0], allocator.clone()); vecdeque_29.insert(0, 30); vecdeque_29.insert(1, 31); vecdeque_29.insert(2, 32); @@ -1172,7 +1172,7 @@ mod tests { assert_eq!( vecdeque_13, - collect(vec![30, 31, 32, 33, 34, 35, 0].into_iter(), allocator,) + collect(vec![30, 31, 32, 33, 34, 35, 0], allocator,) ); } From 2dcaf5beeea3d5de9ec9c7133a2451d00f508f52 Mon Sep 17 00:00:00 2001 From: LFC Date: Thu, 27 Jul 2023 10:45:14 +0800 Subject: [PATCH 03/10] build on windows (#322) Signed-off-by: luofucong --- .github/workflows/rust.yml | 2 +- Cargo.toml | 5 +- Makefile | 2 + src/env/default.rs | 229 ++++-------------------------- src/env/log_fd.rs | 11 ++ src/env/log_fd/plain.rs | 81 +++++++++++ src/env/log_fd/unix.rs | 185 ++++++++++++++++++++++++ src/env/mod.rs | 1 + src/file_pipe_log/pipe.rs | 4 + src/fork.rs | 6 +- src/lib.rs | 2 + src/swappy_allocator.rs | 7 + tests/failpoints/test_engine.rs | 12 +- tests/failpoints/test_io_error.rs | 30 ++-- 14 files changed, 353 insertions(+), 224 deletions(-) create mode 100644 src/env/log_fd.rs create mode 100644 src/env/log_fd/plain.rs create mode 100644 src/env/log_fd/unix.rs diff --git a/.github/workflows/rust.yml b/.github/workflows/rust.yml index 2a5ce83d..e3eca258 100644 --- a/.github/workflows/rust.yml +++ b/.github/workflows/rust.yml @@ -10,7 +10,7 @@ jobs: runs-on: ${{ matrix.os }} strategy: matrix: - os: [ ubuntu-latest, macos-latest ] + os: [ ubuntu-latest, macos-latest, windows-latest ] steps: - uses: actions/checkout@v2 with: diff --git a/Cargo.toml b/Cargo.toml index 9a558bc6..973335ca 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -68,7 +68,7 @@ kvproto = { git = "https://github.com/pingcap/kvproto.git", default-features = f raft = { git = "https://github.com/tikv/raft-rs", branch = "master", default-features = false, features = ["protobuf-codec"] } rand = "0.8" rand_distr = "0.4" -tempfile = "3.1" +tempfile = "3.6" toml = "0.7" [features] @@ -87,6 +87,7 @@ swap = [ "nightly", "memmap2", ] +std_fs = [] nightly_group = ["nightly", "swap"] @@ -94,6 +95,8 @@ nightly_group = ["nightly", "swap"] raft-proto = { git = "https://github.com/tikv/raft-rs", branch = "master" } protobuf = { git = "https://github.com/pingcap/rust-protobuf", branch = "v2.8" } protobuf-codegen = { git = "https://github.com/pingcap/rust-protobuf", branch = "v2.8" } +# TODO: Use official grpc-rs once https://github.com/tikv/grpc-rs/pull/622 is merged. +grpcio = { git = "https://github.com/tabokie/grpc-rs", branch = "v0.10.x-win" } [workspace] members = ["stress", "ctl"] diff --git a/Makefile b/Makefile index 1f69789f..4bab52f0 100644 --- a/Makefile +++ b/Makefile @@ -67,6 +67,8 @@ else test_matrix: test cargo ${TOOLCHAIN_ARGS} test --all ${EXTRA_CARGO_ARGS} -- --nocapture cargo ${TOOLCHAIN_ARGS} test --test failpoints --features failpoints ${EXTRA_CARGO_ARGS} -- --test-threads 1 --nocapture + cargo ${TOOLCHAIN_ARGS} test --all --features nightly_group,std_fs ${EXTRA_CARGO_ARGS} -- --nocapture + cargo ${TOOLCHAIN_ARGS} test --test failpoints --features nightly_group,std_fs,failpoints ${EXTRA_CARGO_ARGS} -- --test-threads 1 --nocapture endif ## Build raft-engine-ctl. diff --git a/src/env/default.rs b/src/env/default.rs index 9839e668..9aadc591 100644 --- a/src/env/default.rs +++ b/src/env/default.rs @@ -1,212 +1,14 @@ // Copyright (c) 2017-present, PingCAP, Inc. Licensed under Apache-2.0. -use std::io::{Read, Result as IoResult, Seek, SeekFrom, Write}; -use std::os::unix::io::RawFd; +use std::io::{Error, ErrorKind, Read, Result as IoResult, Seek, SeekFrom, Write}; use std::path::Path; use std::sync::Arc; use fail::fail_point; -use log::error; -use nix::errno::Errno; -use nix::fcntl::{self, OFlag}; -use nix::sys::stat::Mode; -use nix::sys::uio::{pread, pwrite}; -use nix::unistd::{close, ftruncate, lseek, Whence}; -use nix::NixPath; +use crate::env::log_fd::LogFd; use crate::env::{FileSystem, Handle, Permission, WriteExt}; -fn from_nix_error(e: nix::Error, custom: &'static str) -> std::io::Error { - let kind = std::io::Error::from(e).kind(); - std::io::Error::new(kind, custom) -} - -impl From for OFlag { - fn from(value: Permission) -> OFlag { - match value { - Permission::ReadOnly => OFlag::O_RDONLY, - Permission::ReadWrite => OFlag::O_RDWR, - } - } -} - -/// A RAII-style low-level file. Errors occurred during automatic resource -/// release are logged and ignored. -/// -/// A [`LogFd`] is essentially a thin wrapper around [`RawFd`]. It's only -/// supported on *Unix*, and primarily optimized for *Linux*. -/// -/// All [`LogFd`] instances are opened with read and write permission. -pub struct LogFd(RawFd); - -impl LogFd { - /// Opens a file with the given `path`. - pub fn open(path: &P, perm: Permission) -> IoResult { - fail_point!("log_fd::open::err", |_| { - Err(from_nix_error(nix::Error::EINVAL, "fp")) - }); - // Permission 644 - let mode = Mode::S_IRUSR | Mode::S_IWUSR | Mode::S_IRGRP | Mode::S_IROTH; - fail_point!("log_fd::open::fadvise_dontneed", |_| { - let fd = - LogFd(fcntl::open(path, perm.into(), mode).map_err(|e| from_nix_error(e, "open"))?); - #[cfg(target_os = "linux")] - unsafe { - extern crate libc; - libc::posix_fadvise64(fd.0, 0, fd.file_size()? as i64, libc::POSIX_FADV_DONTNEED); - } - Ok(fd) - }); - Ok(LogFd( - fcntl::open(path, perm.into(), mode).map_err(|e| from_nix_error(e, "open"))?, - )) - } - - /// Opens a file with the given `path`. The specified file will be created - /// first if not exists. - pub fn create(path: &P) -> IoResult { - fail_point!("log_fd::create::err", |_| { - Err(from_nix_error(nix::Error::EINVAL, "fp")) - }); - let flags = OFlag::O_RDWR | OFlag::O_CREAT; - // Permission 644 - let mode = Mode::S_IRUSR | Mode::S_IWUSR | Mode::S_IRGRP | Mode::S_IROTH; - let fd = fcntl::open(path, flags, mode).map_err(|e| from_nix_error(e, "open"))?; - Ok(LogFd(fd)) - } - - /// Closes the file. - pub fn close(&self) -> IoResult<()> { - fail_point!("log_fd::close::err", |_| { - Err(from_nix_error(nix::Error::EINVAL, "fp")) - }); - close(self.0).map_err(|e| from_nix_error(e, "close")) - } - - /// Reads some bytes starting at `offset` from this file into the specified - /// buffer. Returns how many bytes were read. - pub fn read(&self, mut offset: usize, buf: &mut [u8]) -> IoResult { - let mut readed = 0; - while readed < buf.len() { - fail_point!("log_fd::read::err", |_| { - Err(from_nix_error(nix::Error::EINVAL, "fp")) - }); - let bytes = match pread(self.0, &mut buf[readed..], offset as i64) { - Ok(bytes) => bytes, - Err(e) if e == Errno::EINTR => continue, - Err(e) => return Err(from_nix_error(e, "pread")), - }; - // EOF - if bytes == 0 { - break; - } - readed += bytes; - offset += bytes; - } - Ok(readed) - } - - /// Writes some bytes to this file starting at `offset`. Returns how many - /// bytes were written. - pub fn write(&self, mut offset: usize, content: &[u8]) -> IoResult { - fail_point!("log_fd::write::zero", |_| { Ok(0) }); - fail_point!("log_fd::write::no_space_err", |_| { - Err(from_nix_error(nix::Error::ENOSPC, "nospace")) - }); - let mut written = 0; - while written < content.len() { - let bytes = match pwrite(self.0, &content[written..], offset as i64) { - Ok(bytes) => bytes, - Err(e) if e == Errno::EINTR => continue, - Err(e) if e == Errno::ENOSPC => return Err(from_nix_error(e, "nospace")), - Err(e) => return Err(from_nix_error(e, "pwrite")), - }; - if bytes == 0 { - break; - } - written += bytes; - offset += bytes; - } - fail_point!("log_fd::write::err", |_| { - Err(from_nix_error(nix::Error::EINVAL, "fp")) - }); - Ok(written) - } - - /// Truncates all data after `offset`. - pub fn truncate(&self, offset: usize) -> IoResult<()> { - fail_point!("log_fd::truncate::err", |_| { - Err(from_nix_error(nix::Error::EINVAL, "fp")) - }); - ftruncate(self.0, offset as i64).map_err(|e| from_nix_error(e, "ftruncate")) - } - - /// Attempts to allocate space for `size` bytes starting at `offset`. - #[allow(unused_variables)] - pub fn allocate(&self, offset: usize, size: usize) -> IoResult<()> { - fail_point!("log_fd::allocate::err", |_| { - Err(from_nix_error(nix::Error::EINVAL, "fp")) - }); - #[cfg(target_os = "linux")] - { - if let Err(e) = fcntl::fallocate( - self.0, - fcntl::FallocateFlags::empty(), - offset as i64, - size as i64, - ) { - if e != nix::Error::EOPNOTSUPP { - return Err(from_nix_error(e, "fallocate")); - } - } - } - Ok(()) - } -} - -impl Handle for LogFd { - #[inline] - fn truncate(&self, offset: usize) -> IoResult<()> { - fail_point!("log_fd::truncate::err", |_| { - Err(from_nix_error(nix::Error::EINVAL, "fp")) - }); - ftruncate(self.0, offset as i64).map_err(|e| from_nix_error(e, "ftruncate")) - } - - #[inline] - fn file_size(&self) -> IoResult { - fail_point!("log_fd::file_size::err", |_| { - Err(from_nix_error(nix::Error::EINVAL, "fp")) - }); - lseek(self.0, 0, Whence::SeekEnd) - .map(|n| n as usize) - .map_err(|e| from_nix_error(e, "lseek")) - } - - #[inline] - fn sync(&self) -> IoResult<()> { - fail_point!("log_fd::sync::err", |_| { - Err(from_nix_error(nix::Error::EINVAL, "fp")) - }); - #[cfg(target_os = "linux")] - { - nix::unistd::fdatasync(self.0).map_err(|e| from_nix_error(e, "fdatasync")) - } - #[cfg(not(target_os = "linux"))] - { - nix::unistd::fsync(self.0).map_err(|e| from_nix_error(e, "fsync")) - } - } -} - -impl Drop for LogFd { - fn drop(&mut self) { - if let Err(e) = self.close() { - error!("error while closing file: {e}"); - } - } -} - /// A low-level file adapted for standard interfaces including [`Seek`], /// [`Write`] and [`Read`]. pub struct LogFile { @@ -226,7 +28,14 @@ impl LogFile { impl Write for LogFile { fn write(&mut self, buf: &[u8]) -> IoResult { + fail_point!("log_file::write::zero", |_| { Ok(0) }); + let len = self.inner.write(self.offset, buf)?; + + fail_point!("log_file::write::err", |_| { + Err(Error::new(ErrorKind::InvalidInput, "fp")) + }); + self.offset += len; Ok(len) } @@ -238,6 +47,10 @@ impl Write for LogFile { impl Read for LogFile { fn read(&mut self, buf: &mut [u8]) -> IoResult { + fail_point!("log_file::read::err", |_| { + Err(Error::new(ErrorKind::InvalidInput, "fp")) + }); + let len = self.inner.read(self.offset, buf)?; self.offset += len; Ok(len) @@ -260,12 +73,20 @@ impl Seek for LogFile { impl WriteExt for LogFile { fn truncate(&mut self, offset: usize) -> IoResult<()> { + fail_point!("log_file::truncate::err", |_| { + Err(Error::new(ErrorKind::InvalidInput, "fp")) + }); + self.inner.truncate(offset)?; self.offset = offset; Ok(()) } fn allocate(&mut self, offset: usize, size: usize) -> IoResult<()> { + fail_point!("log_file::allocate::err", |_| { + Err(Error::new(ErrorKind::InvalidInput, "fp")) + }); + self.inner.allocate(offset, size) } } @@ -278,10 +99,18 @@ impl FileSystem for DefaultFileSystem { type Writer = LogFile; fn create>(&self, path: P) -> IoResult { + fail_point!("default_fs::create::err", |_| { + Err(Error::new(ErrorKind::InvalidInput, "fp")) + }); + LogFd::create(path.as_ref()) } fn open>(&self, path: P, perm: Permission) -> IoResult { + fail_point!("default_fs::open::err", |_| { + Err(Error::new(ErrorKind::InvalidInput, "fp")) + }); + LogFd::open(path.as_ref(), perm) } diff --git a/src/env/log_fd.rs b/src/env/log_fd.rs new file mode 100644 index 00000000..23cc2b3f --- /dev/null +++ b/src/env/log_fd.rs @@ -0,0 +1,11 @@ +// Copyright (c) 2017-present, PingCAP, Inc. Licensed under Apache-2.0. + +#[cfg(not(any(windows, feature = "std_fs")))] +mod unix; +#[cfg(not(any(windows, feature = "std_fs")))] +pub use unix::LogFd; + +#[cfg(any(windows, feature = "std_fs"))] +mod plain; +#[cfg(any(windows, feature = "std_fs"))] +pub use plain::LogFd; diff --git a/src/env/log_fd/plain.rs b/src/env/log_fd/plain.rs new file mode 100644 index 00000000..08c40135 --- /dev/null +++ b/src/env/log_fd/plain.rs @@ -0,0 +1,81 @@ +// Copyright (c) 2017-present, PingCAP, Inc. Licensed under Apache-2.0. + +use crate::env::{Handle, Permission}; + +use fail::fail_point; +use parking_lot::RwLock; + +use std::fs::{File, OpenOptions}; +use std::io::{Error, ErrorKind, Read, Result, Seek, SeekFrom, Write}; +use std::path::Path; +use std::sync::Arc; + +pub struct LogFd(Arc>); + +impl LogFd { + pub fn open>(path: P, _: Permission) -> Result { + OpenOptions::new() + .read(true) + .write(true) + .open(path) + .map(|x| Self(Arc::new(RwLock::new(x)))) + } + + pub fn create>(path: P) -> Result { + OpenOptions::new() + .create(true) + .read(true) + .write(true) + .open(path) + .map(|x| Self(Arc::new(RwLock::new(x)))) + } + + pub fn read(&self, offset: usize, buf: &mut [u8]) -> Result { + let mut file = self.0.write(); + let _ = file.seek(SeekFrom::Start(offset as u64))?; + file.read(buf) + } + + pub fn write(&self, offset: usize, content: &[u8]) -> Result { + fail_point!("log_fd::write::no_space_err", |_| { + Err(Error::new(ErrorKind::Other, "nospace")) + }); + + let mut file = self.0.write(); + let _ = file.seek(SeekFrom::Start(offset as u64))?; + file.write(content) + } + + pub fn truncate(&self, offset: usize) -> Result<()> { + let file = self.0.write(); + file.set_len(offset as u64) + } + + pub fn allocate(&self, _offset: usize, _size: usize) -> Result<()> { + Ok(()) + } +} + +impl Handle for LogFd { + fn truncate(&self, offset: usize) -> Result<()> { + self.truncate(offset) + } + + fn file_size(&self) -> Result { + fail_point!("log_fd::file_size::err", |_| { + Err(Error::new(ErrorKind::InvalidInput, "fp")) + }); + + let file = self.0.read(); + file.metadata().map(|x| x.len() as usize) + } + + fn sync(&self) -> Result<()> { + fail_point!("log_fd::sync::err", |_| { + Err(Error::new(ErrorKind::InvalidInput, "fp")) + }); + + let file = self.0.write(); + file.sync_all() + } +} diff --git a/src/env/log_fd/unix.rs b/src/env/log_fd/unix.rs new file mode 100644 index 00000000..608cca70 --- /dev/null +++ b/src/env/log_fd/unix.rs @@ -0,0 +1,185 @@ +// Copyright (c) 2017-present, PingCAP, Inc. Licensed under Apache-2.0. + +use crate::env::{Handle, Permission}; + +use fail::fail_point; +use log::error; + +use std::io::Result as IoResult; +use std::os::unix::io::RawFd; + +use nix::errno::Errno; +use nix::fcntl::{self, OFlag}; +use nix::sys::stat::Mode; +use nix::sys::uio::{pread, pwrite}; +use nix::unistd::{close, ftruncate, lseek, Whence}; +use nix::NixPath; + +fn from_nix_error(e: nix::Error, custom: &'static str) -> std::io::Error { + let kind = std::io::Error::from(e).kind(); + std::io::Error::new(kind, custom) +} + +impl From for OFlag { + fn from(value: Permission) -> OFlag { + match value { + Permission::ReadOnly => OFlag::O_RDONLY, + Permission::ReadWrite => OFlag::O_RDWR, + } + } +} + +/// A RAII-style low-level file. Errors occurred during automatic resource +/// release are logged and ignored. +/// +/// A [`LogFd`] is essentially a thin wrapper around [`RawFd`]. It's only +/// supported on *Unix*, and primarily optimized for *Linux*. +/// +/// All [`LogFd`] instances are opened with read and write permission. +pub struct LogFd(RawFd); + +impl LogFd { + /// Opens a file with the given `path`. + pub fn open(path: &P, perm: Permission) -> IoResult { + // Permission 644 + let mode = Mode::S_IRUSR | Mode::S_IWUSR | Mode::S_IRGRP | Mode::S_IROTH; + fail_point!("log_fd::open::fadvise_dontneed", |_| { + let fd = + LogFd(fcntl::open(path, perm.into(), mode).map_err(|e| from_nix_error(e, "open"))?); + #[cfg(target_os = "linux")] + unsafe { + extern crate libc; + libc::posix_fadvise64(fd.0, 0, fd.file_size()? as i64, libc::POSIX_FADV_DONTNEED); + } + Ok(fd) + }); + Ok(LogFd( + fcntl::open(path, perm.into(), mode).map_err(|e| from_nix_error(e, "open"))?, + )) + } + + /// Opens a file with the given `path`. The specified file will be created + /// first if not exists. + pub fn create(path: &P) -> IoResult { + let flags = OFlag::O_RDWR | OFlag::O_CREAT; + // Permission 644 + let mode = Mode::S_IRUSR | Mode::S_IWUSR | Mode::S_IRGRP | Mode::S_IROTH; + let fd = fcntl::open(path, flags, mode).map_err(|e| from_nix_error(e, "open"))?; + Ok(LogFd(fd)) + } + + /// Closes the file. + pub fn close(&self) -> IoResult<()> { + fail_point!("log_fd::close::err", |_| { + Err(from_nix_error(nix::Error::EINVAL, "fp")) + }); + close(self.0).map_err(|e| from_nix_error(e, "close")) + } + + /// Reads some bytes starting at `offset` from this file into the specified + /// buffer. Returns how many bytes were read. + pub fn read(&self, mut offset: usize, buf: &mut [u8]) -> IoResult { + let mut readed = 0; + while readed < buf.len() { + let bytes = match pread(self.0, &mut buf[readed..], offset as i64) { + Ok(bytes) => bytes, + Err(e) if e == Errno::EINTR => continue, + Err(e) => return Err(from_nix_error(e, "pread")), + }; + // EOF + if bytes == 0 { + break; + } + readed += bytes; + offset += bytes; + } + Ok(readed) + } + + /// Writes some bytes to this file starting at `offset`. Returns how many + /// bytes were written. + pub fn write(&self, mut offset: usize, content: &[u8]) -> IoResult { + fail_point!("log_fd::write::no_space_err", |_| { + Err(from_nix_error(nix::Error::ENOSPC, "nospace")) + }); + let mut written = 0; + while written < content.len() { + let bytes = match pwrite(self.0, &content[written..], offset as i64) { + Ok(bytes) => bytes, + Err(e) if e == Errno::EINTR => continue, + Err(e) if e == Errno::ENOSPC => return Err(from_nix_error(e, "nospace")), + Err(e) => return Err(from_nix_error(e, "pwrite")), + }; + if bytes == 0 { + break; + } + written += bytes; + offset += bytes; + } + Ok(written) + } + + /// Truncates all data after `offset`. + pub fn truncate(&self, offset: usize) -> IoResult<()> { + ftruncate(self.0, offset as i64).map_err(|e| from_nix_error(e, "ftruncate")) + } + + /// Attempts to allocate space for `size` bytes starting at `offset`. + #[allow(unused_variables)] + pub fn allocate(&self, offset: usize, size: usize) -> IoResult<()> { + #[cfg(target_os = "linux")] + { + if let Err(e) = fcntl::fallocate( + self.0, + fcntl::FallocateFlags::empty(), + offset as i64, + size as i64, + ) { + if e != nix::Error::EOPNOTSUPP { + return Err(from_nix_error(e, "fallocate")); + } + } + } + Ok(()) + } +} + +impl Handle for LogFd { + #[inline] + fn truncate(&self, offset: usize) -> IoResult<()> { + self.truncate(offset) + } + + #[inline] + fn file_size(&self) -> IoResult { + fail_point!("log_fd::file_size::err", |_| { + Err(from_nix_error(nix::Error::EINVAL, "fp")) + }); + lseek(self.0, 0, Whence::SeekEnd) + .map(|n| n as usize) + .map_err(|e| from_nix_error(e, "lseek")) + } + + #[inline] + fn sync(&self) -> IoResult<()> { + fail_point!("log_fd::sync::err", |_| { + Err(from_nix_error(nix::Error::EINVAL, "fp")) + }); + #[cfg(target_os = "linux")] + { + nix::unistd::fdatasync(self.0).map_err(|e| from_nix_error(e, "fdatasync")) + } + #[cfg(not(target_os = "linux"))] + { + nix::unistd::fsync(self.0).map_err(|e| from_nix_error(e, "fsync")) + } + } +} + +impl Drop for LogFd { + fn drop(&mut self) { + if let Err(e) = self.close() { + error!("error while closing file: {e}"); + } + } +} diff --git a/src/env/mod.rs b/src/env/mod.rs index 3e24d2be..06de495a 100644 --- a/src/env/mod.rs +++ b/src/env/mod.rs @@ -5,6 +5,7 @@ use std::path::Path; use std::sync::Arc; mod default; +mod log_fd; mod obfuscated; pub use default::DefaultFileSystem; diff --git a/src/file_pipe_log/pipe.rs b/src/file_pipe_log/pipe.rs index 5a5916ea..f42ade1b 100644 --- a/src/file_pipe_log/pipe.rs +++ b/src/file_pipe_log/pipe.rs @@ -175,6 +175,10 @@ impl SinglePipe { /// filesystem. fn sync_dir(&self, path_id: PathId) -> Result<()> { debug_assert!(!self.paths.is_empty()); + + // Skip syncing directory in Windows. Refer to badger's discussion for more + // detail: https://github.com/dgraph-io/badger/issues/699 + #[cfg(not(windows))] std::fs::File::open(PathBuf::from(&self.paths[path_id])).and_then(|d| d.sync_all())?; Ok(()) } diff --git a/src/fork.rs b/src/fork.rs index 1bfc1251..cab65a92 100644 --- a/src/fork.rs +++ b/src/fork.rs @@ -1,10 +1,14 @@ // Copyright (c) 2023-present, PingCAP, Inc. Licensed under Apache-2.0. use std::fs::{copy, create_dir_all}; -use std::os::unix::fs::symlink; use std::path::Path; use std::sync::Arc; +#[cfg(not(windows))] +use std::os::unix::fs::symlink; +#[cfg(windows)] +use std::os::windows::fs::symlink_file as symlink; + use crate::config::{Config, RecoveryMode}; use crate::env::FileSystem; use crate::file_pipe_log::{FileNameExt, FilePipeLog, FilePipeLogBuilder}; diff --git a/src/lib.rs b/src/lib.rs index 9a786238..25ebadd0 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -16,6 +16,8 @@ #![cfg_attr(feature = "nightly", feature(test))] #![cfg_attr(feature = "swap", feature(allocator_api))] #![cfg_attr(feature = "swap", feature(slice_ptr_get))] +// Though the new nightly rust stablized this feature, keep it anyway +// because some other project (like TiKV) is still using the old. #![cfg_attr(feature = "swap", feature(nonnull_slice_from_raw_parts))] #![cfg_attr(feature = "swap", feature(slice_ptr_len))] #![cfg_attr(feature = "swap", feature(alloc_layout_extra))] diff --git a/src/swappy_allocator.rs b/src/swappy_allocator.rs index c552005b..8baa4835 100644 --- a/src/swappy_allocator.rs +++ b/src/swappy_allocator.rs @@ -342,6 +342,13 @@ impl Page { #[inline] fn release(self, root: &Path) { debug_assert_eq!(self.ref_counter, 0); + + // Somehow in Windows, we have to drop the mmap file handle first, otherwise + // the following file removal will return "Access Denied (OS Error 5)". + // Not using `#[cfg(windows)]` here is because it might do no harm in other + // operating systems - the mmap file handle is dropped anyhow. + drop(self.mmap); + let path = root.join(Self::page_file_name(self.seq)); if let Err(e) = std::fs::remove_file(path) { warn!("Failed to delete swap file: {e}"); diff --git a/tests/failpoints/test_engine.rs b/tests/failpoints/test_engine.rs index a2700532..8d74b70b 100644 --- a/tests/failpoints/test_engine.rs +++ b/tests/failpoints/test_engine.rs @@ -792,7 +792,7 @@ fn test_partial_rewrite_rewrite() { } { - let _f = FailGuard::new("log_fd::write::err", "10*off->return->off"); + let _f = FailGuard::new("log_file::write::err", "10*off->return->off"); assert!( catch_unwind_silent(|| engine.purge_manager().must_rewrite_rewrite_queue()).is_err() ); @@ -830,7 +830,7 @@ fn test_partial_rewrite_rewrite_online() { assert_eq!(engine.file_span(LogQueue::Append).0, old_active_file + 1); { - let _f = FailGuard::new("log_fd::write::err", "10*off->return->off"); + let _f = FailGuard::new("log_file::write::err", "10*off->return->off"); assert!( catch_unwind_silent(|| engine.purge_manager().must_rewrite_rewrite_queue()).is_err() ); @@ -897,13 +897,13 @@ fn test_split_rewrite_batch_imp(regions: u64, region_size: u64, split_size: u64, let count = AtomicU64::new(0); fail::cfg_callback("atomic_group::begin", move || { if count.fetch_add(1, Ordering::Relaxed) + 1 == i { - fail::cfg("log_fd::write::err", "return").unwrap(); + fail::cfg("log_file::write::err", "return").unwrap(); } }) .unwrap(); let r = catch_unwind_silent(|| engine.purge_manager().must_rewrite_rewrite_queue()); fail::remove("atomic_group::begin"); - fail::remove("log_fd::write::err"); + fail::remove("log_file::write::err"); if r.is_ok() { break; } @@ -917,13 +917,13 @@ fn test_split_rewrite_batch_imp(regions: u64, region_size: u64, split_size: u64, let count = AtomicU64::new(0); fail::cfg_callback("atomic_group::add", move || { if count.fetch_add(1, Ordering::Relaxed) + 1 == i { - fail::cfg("log_fd::write::err", "return").unwrap(); + fail::cfg("log_file::write::err", "return").unwrap(); } }) .unwrap(); let r = catch_unwind_silent(|| engine.purge_manager().must_rewrite_rewrite_queue()); fail::remove("atomic_group::add"); - fail::remove("log_fd::write::err"); + fail::remove("log_file::write::err"); if r.is_ok() { break; } diff --git a/tests/failpoints/test_io_error.rs b/tests/failpoints/test_io_error.rs index e3d3d574..d24ab049 100644 --- a/tests/failpoints/test_io_error.rs +++ b/tests/failpoints/test_io_error.rs @@ -23,11 +23,11 @@ fn test_file_open_error() { let fs = Arc::new(ObfuscatedFileSystem::default()); { - let _f = FailGuard::new("log_fd::create::err", "return"); + let _f = FailGuard::new("default_fs::create::err", "return"); assert!(Engine::open_with_file_system(cfg.clone(), fs.clone()).is_err()); } { - let _f = FailGuard::new("log_fd::open::err", "return"); + let _f = FailGuard::new("default_fs::open::err", "return"); let _ = Engine::open_with_file_system(cfg.clone(), fs.clone()).unwrap(); assert!(Engine::open_with_file_system(cfg, fs).is_err()); } @@ -66,7 +66,7 @@ fn test_file_read_error() { engine.write(&mut kv_batch, true).unwrap(); let mut entries = Vec::new(); - let _f = FailGuard::new("log_fd::read::err", "return"); + let _f = FailGuard::new("log_file::read::err", "return"); engine .fetch_entries_to::(1, 0, 1, None, &mut entries) .unwrap(); @@ -95,7 +95,7 @@ fn test_file_write_error() { .write(&mut generate_batch(1, 1, 2, Some(&entry)), false) .unwrap(); { - let _f = FailGuard::new("log_fd::write::err", "return"); + let _f = FailGuard::new("log_file::write::err", "return"); engine .write(&mut generate_batch(1, 2, 3, Some(&entry)), false) .unwrap_err(); @@ -164,7 +164,7 @@ fn test_file_rotate_error() { } { // Fail to create new log file. - let _f = FailGuard::new("log_fd::create::err", "return"); + let _f = FailGuard::new("default_fs::create::err", "return"); assert!(catch_unwind_silent(|| { let _ = engine.write(&mut generate_batch(1, 4, 5, Some(&entry)), false); }) @@ -173,7 +173,7 @@ fn test_file_rotate_error() { } { // Fail to write header of new log file. - let _f = FailGuard::new("log_fd::write::err", "1*off->return"); + let _f = FailGuard::new("log_file::write::err", "1*off->return"); assert!(catch_unwind_silent(|| { let _ = engine.write(&mut generate_batch(1, 4, 5, Some(&entry)), false); }) @@ -220,7 +220,7 @@ fn test_concurrent_write_error() { let mut ctx = ConcurrentWriteContext::new(engine.clone()); // The second of three writes will fail. - fail::cfg("log_fd::write::err", "1*off->1*return->off").unwrap(); + fail::cfg("log_file::write::err", "1*off->1*return->off").unwrap(); let entry_clone = entry.clone(); ctx.write_ext(move |e| { e.write(&mut generate_batch(1, 1, 11, Some(&entry_clone)), false) @@ -258,8 +258,8 @@ fn test_concurrent_write_error() { ); { - let _f1 = FailGuard::new("log_fd::write::err", "return"); - let _f2 = FailGuard::new("log_fd::truncate::err", "return"); + let _f1 = FailGuard::new("log_file::write::err", "return"); + let _f2 = FailGuard::new("log_file::truncate::err", "return"); let entry_clone = entry.clone(); ctx.write_ext(move |e| { catch_unwind_silent(|| { @@ -305,7 +305,7 @@ fn test_non_atomic_write_error() { { // Write partially succeeds. We can reopen. let engine = Engine::open_with_file_system(cfg.clone(), fs.clone()).unwrap(); - let _f1 = FailGuard::new("log_fd::write::err", "return"); + let _f1 = FailGuard::new("log_file::write::err", "return"); engine .write(&mut generate_batch(rid, 0, 1, Some(&entry)), true) .unwrap_err(); @@ -317,7 +317,7 @@ fn test_non_atomic_write_error() { { // Write partially succeeds. We can overwrite. let engine = Engine::open_with_file_system(cfg.clone(), fs.clone()).unwrap(); - let _f1 = FailGuard::new("log_fd::write::err", "1*off->1*return->off"); + let _f1 = FailGuard::new("log_file::write::err", "1*off->1*return->off"); engine .write(&mut generate_batch(rid, 0, 1, Some(&entry)), true) .unwrap_err(); @@ -333,7 +333,7 @@ fn test_non_atomic_write_error() { { // Write partially succeeds and can't be reverted. We panic. let engine = Engine::open_with_file_system(cfg.clone(), fs.clone()).unwrap(); - let _f1 = FailGuard::new("log_fd::write::err", "return"); + let _f1 = FailGuard::new("log_file::write::err", "return"); let _f2 = FailGuard::new("log_file::seek::err", "return"); assert!(catch_unwind_silent(|| { engine @@ -378,7 +378,7 @@ fn test_error_during_repair() { " .to_owned(); { - let _f = FailGuard::new("log_fd::write::err", "return"); + let _f = FailGuard::new("log_file::write::err", "return"); assert!( Engine::unsafe_repair_with_file_system(dir.path(), None, script, fs.clone()).is_err() ); @@ -429,7 +429,7 @@ fn test_file_allocate_error() { let fs = Arc::new(ObfuscatedFileSystem::default()); let entry = vec![b'x'; 1024]; { - let _f = FailGuard::new("log_fd::allocate::err", "return"); + let _f = FailGuard::new("log_file::allocate::err", "return"); let engine = Engine::open_with_file_system(cfg.clone(), fs.clone()).unwrap(); engine .write(&mut generate_batch(1, 1, 5, Some(&entry)), true) @@ -458,7 +458,7 @@ fn test_start_with_recycled_file_allocate_error() { // Mock that the engine starts with the circumstance where // the pref-reserved file with seqno[5] failed to be generated. { - let _f = FailGuard::new("log_fd::write::zero", "4*off->1*return->off"); + let _f = FailGuard::new("log_file::write::zero", "4*off->1*return->off"); Engine::open(cfg.clone()).unwrap(); } // Extra recycled files have been supplemented. From d4943a9f725b0d5907615154852545f539ae953a Mon Sep 17 00:00:00 2001 From: Xinye Tao Date: Fri, 1 Sep 2023 15:28:58 +0800 Subject: [PATCH 04/10] support configuring compression level (#311) * support config compression level Signed-off-by: tabokie * lock version Signed-off-by: tabokie * update doc Signed-off-by: tabokie * update changelog Signed-off-by: tabokie * address comment Signed-off-by: tabokie --------- Signed-off-by: tabokie --- CHANGELOG.md | 1 + Cargo.toml | 2 +- src/config.rs | 7 +++++++ src/engine.rs | 9 ++++++--- src/file_pipe_log/mod.rs | 2 +- src/filter.rs | 4 ++-- src/log_batch.rs | 24 ++++++++++++++++-------- src/purge.rs | 5 ++++- src/util.rs | 10 +++++++--- 9 files changed, 45 insertions(+), 19 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index c7e3c9a6..c27405d0 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -18,6 +18,7 @@ * Support preparing prefilled logs to enable log recycling when start-up. The amount of logs to prepare is controlled by `Config::prefill_limit`. * Add a new configuration `spill-dir` to allow automatic placement of logs into an auxiliary directory when `dir` is full. * Add a new method `Engine::fork` to duplicate an `Engine` to a new place, with a few disk file copies. +* Support configuring lz4 acceleration factor with `compression-level`. ## [0.3.0] - 2022-09-14 diff --git a/Cargo.toml b/Cargo.toml index 973335ca..177ae280 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -61,7 +61,7 @@ strum = { version = "0.25.0", features = ["derive"] } thiserror = "1.0" [dev-dependencies] -criterion = "0.5" +criterion = "0.4" ctor = "0.2" env_logger = "0.10" kvproto = { git = "https://github.com/pingcap/kvproto.git", default-features = false, features = ["protobuf-codec"] } diff --git a/src/config.rs b/src/config.rs index fd467571..30bf4082 100644 --- a/src/config.rs +++ b/src/config.rs @@ -58,6 +58,12 @@ pub struct Config { /// /// Default: "8KB" pub batch_compression_threshold: ReadableSize, + /// Acceleration factor for LZ4 compression. It can be fine tuned, with each + /// successive value providing roughly +~3% to speed. The value will be + /// capped within [1, 65537] by LZ4. + /// + /// Default: 1. + pub compression_level: Option, /// Deprecated. /// Incrementally sync log files after specified bytes have been written. /// Setting it to zero disables incremental sync. @@ -127,6 +133,7 @@ impl Default for Config { recovery_read_block_size: ReadableSize::kb(16), recovery_threads: 4, batch_compression_threshold: ReadableSize::kb(8), + compression_level: None, bytes_per_sync: None, format_version: Version::V2, target_file_size: ReadableSize::mb(128), diff --git a/src/engine.rs b/src/engine.rs index 42e55821..17321649 100644 --- a/src/engine.rs +++ b/src/engine.rs @@ -142,7 +142,10 @@ where return Ok(0); } let start = Instant::now(); - let len = log_batch.finish_populate(self.cfg.batch_compression_threshold.0 as usize)?; + let len = log_batch.finish_populate( + self.cfg.batch_compression_threshold.0 as usize, + self.cfg.compression_level, + )?; debug_assert!(len > 0); let mut attempt_count = 0_u64; @@ -2430,7 +2433,7 @@ pub(crate) mod tests { // Directly write to pipe log. let mut log_batch = LogBatch::default(); let flush = |lb: &mut LogBatch| { - lb.finish_populate(0).unwrap(); + lb.finish_populate(0, None).unwrap(); engine.pipe_log.append(LogQueue::Rewrite, lb).unwrap(); lb.drain(); }; @@ -2580,7 +2583,7 @@ pub(crate) mod tests { log_batch.put_unchecked(3, crate::make_internal_key(&[1]), value.clone()); log_batch.put_unchecked(4, crate::make_internal_key(&[1]), value); - log_batch.finish_populate(0).unwrap(); + log_batch.finish_populate(0, None).unwrap(); let block_handle = engine .pipe_log .append(LogQueue::Rewrite, &mut log_batch) diff --git a/src/file_pipe_log/mod.rs b/src/file_pipe_log/mod.rs index fd7c5036..64042e01 100644 --- a/src/file_pipe_log/mod.rs +++ b/src/file_pipe_log/mod.rs @@ -219,7 +219,7 @@ pub mod debug { for batch in bs.iter_mut() { let offset = writer.offset() as u64; let len = batch - .finish_populate(1 /* compression_threshold */) + .finish_populate(1 /* compression_threshold */, None) .unwrap(); batch.prepare_write(&log_file_format).unwrap(); writer diff --git a/src/filter.rs b/src/filter.rs index 8a4fb4b7..f992d788 100644 --- a/src/filter.rs +++ b/src/filter.rs @@ -315,7 +315,7 @@ impl RhaiFilterMachine { } // Batch 64KB. if log_batch.approximate_size() >= 64 * 1024 { - log_batch.finish_populate(0 /* compression_threshold */)?; + log_batch.finish_populate(0 /* compression_threshold */, None)?; log_batch.prepare_write(&log_file_context)?; writer.write( log_batch.encoded_bytes(), @@ -325,7 +325,7 @@ impl RhaiFilterMachine { } } if !log_batch.is_empty() { - log_batch.finish_populate(0 /* compression_threshold */)?; + log_batch.finish_populate(0 /* compression_threshold */, None)?; log_batch.prepare_write(&log_file_context)?; writer.write( log_batch.encoded_bytes(), diff --git a/src/log_batch.rs b/src/log_batch.rs index b12cf727..33587ea9 100644 --- a/src/log_batch.rs +++ b/src/log_batch.rs @@ -768,7 +768,11 @@ impl LogBatch { /// /// Internally, encodes and optionally compresses log entries. Sets the /// compression type to each entry index. - pub(crate) fn finish_populate(&mut self, compression_threshold: usize) -> Result { + pub(crate) fn finish_populate( + &mut self, + compression_threshold: usize, + compression_level: Option, + ) -> Result { let _t = StopWatch::new(perf_context!(log_populating_duration)); debug_assert!(self.buf_state == BufState::Open); if self.is_empty() { @@ -782,7 +786,11 @@ impl LogBatch { && self.buf.len() >= LOG_BATCH_HEADER_LEN + compression_threshold { let buf_len = self.buf.len(); - lz4::append_compress_block(&mut self.buf, LOG_BATCH_HEADER_LEN)?; + lz4::append_compress_block( + &mut self.buf, + LOG_BATCH_HEADER_LEN, + compression_level.unwrap_or(lz4::DEFAULT_LZ4_COMPRESSION_LEVEL), + )?; (buf_len - LOG_BATCH_HEADER_LEN, CompressionType::Lz4) } else { (0, CompressionType::None) @@ -1325,7 +1333,7 @@ mod tests { offset: 0, }; let old_approximate_size = batch.approximate_size(); - let len = batch.finish_populate(usize::from(compress)).unwrap(); + let len = batch.finish_populate(usize::from(compress), None).unwrap(); assert!(old_approximate_size >= len); assert_eq!(batch.approximate_size(), len); let mut batch_handle = mocked_file_block_handle; @@ -1490,7 +1498,7 @@ mod tests { batch1.merge(&mut batch2).unwrap(); assert!(batch2.is_empty()); - let len = batch1.finish_populate(0).unwrap(); + let len = batch1.finish_populate(0, None).unwrap(); batch1.prepare_write(&file_context).unwrap(); let encoded = batch1.encoded_bytes(); assert_eq!(len, encoded.len()); @@ -1546,7 +1554,7 @@ mod tests { offset: 0, }; let buf_len = batch.buf.len(); - let len = batch.finish_populate(1).unwrap(); + let len = batch.finish_populate(1, None).unwrap(); assert!(len == 0); assert_eq!(batch.buf_state, BufState::Encoded(buf_len, 0)); let file_context = LogFileContext::new(mocked_file_block_handles.id, Version::V2); @@ -1585,7 +1593,7 @@ mod tests { .put(region_id, b"key".to_vec(), b"value".to_vec()) .unwrap(); // enable compression so that len_and_type > len. - batch.finish_populate(1).unwrap(); + batch.finish_populate(1, None).unwrap(); let file_context = LogFileContext::new(FileId::dummy(LogQueue::Append), Version::default()); batch.prepare_write(&file_context).unwrap(); let encoded = batch.encoded_bytes(); @@ -1626,7 +1634,7 @@ mod tests { .add_entries::(thread_rng().gen(), entries) .unwrap(); } - log_batch.finish_populate(0).unwrap(); + log_batch.finish_populate(0, None).unwrap(); let _ = log_batch.drain(); } let data: Vec = (0..128).map(|_| thread_rng().gen()).collect(); @@ -1668,7 +1676,7 @@ mod tests { }, ]; let old_approximate_size = batch.approximate_size(); - let len = batch.finish_populate(1).unwrap(); + let len = batch.finish_populate(1, None).unwrap(); assert!(old_approximate_size >= len); assert_eq!(batch.approximate_size(), len); let checksum = batch.item_batch.checksum; diff --git a/src/purge.rs b/src/purge.rs index 7fd3b777..b1183438 100644 --- a/src/purge.rs +++ b/src/purge.rs @@ -433,7 +433,10 @@ where self.pipe_log.sync(LogQueue::Rewrite)?; return Ok(None); } - log_batch.finish_populate(self.cfg.batch_compression_threshold.0 as usize)?; + log_batch.finish_populate( + self.cfg.batch_compression_threshold.0 as usize, + self.cfg.compression_level, + )?; let file_handle = self.pipe_log.append(LogQueue::Rewrite, log_batch)?; if sync { self.pipe_log.sync(LogQueue::Rewrite)? diff --git a/src/util.rs b/src/util.rs index b86d7813..363ace71 100644 --- a/src/util.rs +++ b/src/util.rs @@ -223,8 +223,10 @@ pub mod lz4 { use crate::{Error, Result}; use std::{i32, ptr}; + pub const DEFAULT_LZ4_COMPRESSION_LEVEL: usize = 1; + /// Compress content in `buf[skip..]`, and append output to `buf`. - pub fn append_compress_block(buf: &mut Vec, skip: usize) -> Result<()> { + pub fn append_compress_block(buf: &mut Vec, skip: usize, level: usize) -> Result<()> { let buf_len = buf.len(); let content_len = buf_len - skip; if content_len > 0 { @@ -244,11 +246,12 @@ pub mod lz4 { let le_len = content_len.to_le_bytes(); ptr::copy_nonoverlapping(le_len.as_ptr(), buf_ptr.add(buf_len), 4); - let compressed = lz4_sys::LZ4_compress_default( + let compressed = lz4_sys::LZ4_compress_fast( buf_ptr.add(skip) as _, buf_ptr.add(buf_len + 4) as _, content_len as i32, bound, + level as i32, ); if compressed == 0 { return Err(Error::Other(box_err!("Compression failed"))); @@ -298,7 +301,8 @@ pub mod lz4 { let vecs: Vec> = vec![b"".to_vec(), b"123".to_vec(), b"12345678910".to_vec()]; for mut vec in vecs.into_iter() { let uncompressed_len = vec.len(); - super::append_compress_block(&mut vec, 0).unwrap(); + super::append_compress_block(&mut vec, 0, super::DEFAULT_LZ4_COMPRESSION_LEVEL) + .unwrap(); let res = super::decompress_block(&vec[uncompressed_len..]).unwrap(); assert_eq!(res, vec[..uncompressed_len].to_owned()); } From 0a33383758364358d9ee2f80b5fc96ff9f814e97 Mon Sep 17 00:00:00 2001 From: Xinye Tao Date: Fri, 1 Sep 2023 16:47:11 +0800 Subject: [PATCH 05/10] bump 0.4.0 (#330) * cleanup Signed-off-by: tabokie * lock version Signed-off-by: tabokie * bump version Signed-off-by: tabokie * update date Signed-off-by: tabokie --------- Signed-off-by: tabokie --- CHANGELOG.md | 2 ++ Cargo.toml | 2 +- Makefile | 5 +++-- README.md | 2 +- ctl/Cargo.toml | 4 ++-- src/config.rs | 8 +++++--- src/engine.rs | 30 +++++++++++++++++++++++++++++- src/env/default.rs | 4 +++- src/env/log_fd/plain.rs | 3 +++ src/file_pipe_log/pipe.rs | 1 - src/log_batch.rs | 9 ++------- src/memtable.rs | 3 +-- src/pipe_log.rs | 1 - src/util.rs | 2 +- stress/Cargo.toml | 2 +- 15 files changed, 54 insertions(+), 24 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index c27405d0..bddbd4f0 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -2,6 +2,8 @@ ## [Unreleased] +## [0.4.0] - 2023-09-01 + ### Behavior Changes * `LogBatch::put` returns a `Result<()>` instead of `()`. It errs when the key is reserved for internal use. diff --git a/Cargo.toml b/Cargo.toml index 177ae280..b88e3e59 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "raft-engine" -version = "0.3.0" +version = "0.4.0" authors = ["The TiKV Project Developers"] edition = "2018" rust-version = "1.66.0" diff --git a/Makefile b/Makefile index 4bab52f0..da01ab1b 100644 --- a/Makefile +++ b/Makefile @@ -41,12 +41,13 @@ clean: format: cargo ${TOOLCHAIN_ARGS} fmt --all +CLIPPY_WHITELIST += -A clippy::bool_assert_comparison ## Run clippy. clippy: ifdef WITH_NIGHTLY_FEATURES - cargo ${TOOLCHAIN_ARGS} clippy --all --features nightly_group,failpoints --all-targets -- -D clippy::all + cargo ${TOOLCHAIN_ARGS} clippy --all --features nightly_group,failpoints --all-targets -- -D clippy::all ${CLIPPY_WHITELIST} else - cargo ${TOOLCHAIN_ARGS} clippy --all --features failpoints --all-targets -- -D clippy::all + cargo ${TOOLCHAIN_ARGS} clippy --all --features failpoints --all-targets -- -D clippy::all ${CLIPPY_WHITELIST} endif ## Run tests. diff --git a/README.md b/README.md index f70a1b1b..0caff515 100644 --- a/README.md +++ b/README.md @@ -54,7 +54,7 @@ Put this in your Cargo.toml: ```rust [dependencies] -raft-engine = "0.3.0" +raft-engine = "0.4.0" ``` Available Cargo features: diff --git a/ctl/Cargo.toml b/ctl/Cargo.toml index c10b092e..54b7ee87 100644 --- a/ctl/Cargo.toml +++ b/ctl/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "raft-engine-ctl" -version = "0.3.0" +version = "0.4.0" authors = ["The TiKV Project Developers"] edition = "2018" rust-version = "1.61.0" @@ -11,4 +11,4 @@ license = "Apache-2.0" [dependencies] clap = { version = "3.1", features = ["derive", "cargo"] } env_logger = "0.10" -raft-engine = { path = "..", version = "0.3.0", features = ["scripting", "internals"] } +raft-engine = { path = "..", version = "0.4.0", features = ["scripting", "internals"] } diff --git a/src/config.rs b/src/config.rs index 30bf4082..d41774ae 100644 --- a/src/config.rs +++ b/src/config.rs @@ -284,6 +284,8 @@ mod tests { assert_eq!(load.target_file_size, ReadableSize::mb(1)); assert_eq!(load.purge_threshold, ReadableSize::mb(3)); assert_eq!(load.format_version, Version::V1); + assert_eq!(load.enable_log_recycle, false); + assert_eq!(load.prefill_for_recycle, false); load.sanitize().unwrap(); } @@ -297,7 +299,7 @@ mod tests { assert!(hard_load.sanitize().is_err()); let soft_error = r#" - recovery-read-block-size = "1KB" + recovery-read-block-size = 1 recovery-threads = 0 target-file-size = "5000MB" format-version = 2 @@ -305,6 +307,8 @@ mod tests { prefill-for-recycle = true "#; let soft_load: Config = toml::from_str(soft_error).unwrap(); + assert!(soft_load.recovery_read_block_size.0 < MIN_RECOVERY_READ_BLOCK_SIZE as u64); + assert!(soft_load.recovery_threads < MIN_RECOVERY_THREADS); let mut soft_sanitized = soft_load; soft_sanitized.sanitize().unwrap(); assert!(soft_sanitized.recovery_read_block_size.0 >= MIN_RECOVERY_READ_BLOCK_SIZE as u64); @@ -313,8 +317,6 @@ mod tests { soft_sanitized.purge_rewrite_threshold.unwrap(), soft_sanitized.target_file_size ); - assert_eq!(soft_sanitized.format_version, Version::V2); - assert!(soft_sanitized.enable_log_recycle); let recycle_error = r#" enable-log-recycle = true diff --git a/src/engine.rs b/src/engine.rs index 17321649..4467ca43 100644 --- a/src/engine.rs +++ b/src/engine.rs @@ -1324,6 +1324,7 @@ pub(crate) mod tests { engine.append(rid, index, index + 1, Some(&data)); } } + engine.append(11, 1, 11, Some(&data)); // The engine needs purge, and all old entries should be rewritten. assert!(engine @@ -1342,8 +1343,9 @@ pub(crate) mod tests { }); } - // Recover with rewrite queue and append queue. + engine.clean(11); let cleaned_region_ids = engine.memtables.cleaned_region_ids(); + assert_eq!(cleaned_region_ids.len(), 1); let engine = engine.reopen(); assert_eq!(engine.memtables.cleaned_region_ids(), cleaned_region_ids); @@ -2438,6 +2440,7 @@ pub(crate) mod tests { lb.drain(); }; { + // begin. let mut builder = AtomicGroupBuilder::with_id(3); builder.begin(&mut log_batch); log_batch.put(rid, key.clone(), value.clone()).unwrap(); @@ -2445,6 +2448,7 @@ pub(crate) mod tests { engine.pipe_log.rotate(LogQueue::Rewrite).unwrap(); } { + // begin - unrelated - end. let mut builder = AtomicGroupBuilder::with_id(3); builder.begin(&mut log_batch); rid += 1; @@ -2464,6 +2468,7 @@ pub(crate) mod tests { engine.pipe_log.rotate(LogQueue::Rewrite).unwrap(); } { + // begin - middle - middle - end. let mut builder = AtomicGroupBuilder::with_id(3); builder.begin(&mut log_batch); rid += 1; @@ -2488,6 +2493,7 @@ pub(crate) mod tests { engine.pipe_log.rotate(LogQueue::Rewrite).unwrap(); } { + // begin - begin - end. let mut builder = AtomicGroupBuilder::with_id(3); builder.begin(&mut log_batch); rid += 1; @@ -2507,6 +2513,7 @@ pub(crate) mod tests { engine.pipe_log.rotate(LogQueue::Rewrite).unwrap(); } { + // end - middle - end. // We must change id to avoid getting merged with last group. // It is actually not possible in real life to only have "begin" missing. let mut builder = AtomicGroupBuilder::with_id(4); @@ -2528,6 +2535,7 @@ pub(crate) mod tests { engine.pipe_log.rotate(LogQueue::Rewrite).unwrap(); } { + // end - begin - end let mut builder = AtomicGroupBuilder::with_id(5); builder.begin(&mut LogBatch::default()); builder.end(&mut log_batch); @@ -2547,6 +2555,26 @@ pub(crate) mod tests { flush(&mut log_batch); engine.pipe_log.rotate(LogQueue::Rewrite).unwrap(); } + { + // begin - end - begin - end. + let mut builder = AtomicGroupBuilder::with_id(6); + builder.begin(&mut log_batch); + rid += 1; + log_batch.put(rid, key.clone(), value.clone()).unwrap(); + data.insert(rid); + flush(&mut log_batch); + builder.end(&mut log_batch); + flush(&mut log_batch); + let mut builder = AtomicGroupBuilder::with_id(7); + builder.begin(&mut log_batch); + flush(&mut log_batch); + builder.end(&mut log_batch); + rid += 1; + log_batch.put(rid, key.clone(), value.clone()).unwrap(); + data.insert(rid); + flush(&mut log_batch); + engine.pipe_log.rotate(LogQueue::Rewrite).unwrap(); + } engine.pipe_log.sync(LogQueue::Rewrite).unwrap(); let engine = engine.reopen(); diff --git a/src/env/default.rs b/src/env/default.rs index 9aadc591..44f4fa18 100644 --- a/src/env/default.rs +++ b/src/env/default.rs @@ -1,6 +1,8 @@ // Copyright (c) 2017-present, PingCAP, Inc. Licensed under Apache-2.0. -use std::io::{Error, ErrorKind, Read, Result as IoResult, Seek, SeekFrom, Write}; +#[cfg(feature = "failpoints")] +use std::io::{Error, ErrorKind}; +use std::io::{Read, Result as IoResult, Seek, SeekFrom, Write}; use std::path::Path; use std::sync::Arc; diff --git a/src/env/log_fd/plain.rs b/src/env/log_fd/plain.rs index 08c40135..03328e91 100644 --- a/src/env/log_fd/plain.rs +++ b/src/env/log_fd/plain.rs @@ -1,5 +1,8 @@ // Copyright (c) 2017-present, PingCAP, Inc. Licensed under Apache-2.0. +//! A naive file handle implementation based on standard `File`. All I/O +//! operations need to synchronize under a `RwLock`. + use crate::env::{Handle, Permission}; use fail::fail_point; diff --git a/src/file_pipe_log/pipe.rs b/src/file_pipe_log/pipe.rs index f42ade1b..09bc1f42 100644 --- a/src/file_pipe_log/pipe.rs +++ b/src/file_pipe_log/pipe.rs @@ -33,7 +33,6 @@ pub const DEFAULT_PATH_ID: PathId = 0; /// compatibility. pub const DEFAULT_FIRST_FILE_SEQ: FileSeq = 1; -#[derive(Debug)] pub struct File { pub seq: FileSeq, pub handle: Arc, diff --git a/src/log_batch.rs b/src/log_batch.rs index 33587ea9..d2bbaec2 100644 --- a/src/log_batch.rs +++ b/src/log_batch.rs @@ -383,11 +383,6 @@ impl LogItemBatch { self.items.drain(..) } - pub fn push(&mut self, item: LogItem) { - self.item_size += item.approximate_size(); - self.items.push(item); - } - pub fn merge(&mut self, rhs: &mut LogItemBatch) { for item in &mut rhs.items { if let LogItemContent::EntryIndexes(entry_indexes) = &mut item.content { @@ -1582,7 +1577,7 @@ mod tests { } #[test] - fn test_corruption() { + fn test_header_corruption() { let region_id = 7; let data = vec![b'x'; 16]; let mut batch = LogBatch::default(); @@ -1688,7 +1683,7 @@ mod tests { batch_handle.len = len; let file_context = LogFileContext::new(batch_handle.id, Version::V2); batch.prepare_write(&file_context).unwrap(); - // batch.finish_write(batch_handle); + assert_eq!(batch.approximate_size(), len); let encoded = batch.encoded_bytes(); assert_eq!(encoded.len(), len); let mut bytes_slice = encoded; diff --git a/src/memtable.rs b/src/memtable.rs index dee9dcf4..7a0ea41b 100644 --- a/src/memtable.rs +++ b/src/memtable.rs @@ -989,7 +989,7 @@ impl MemTableAccessor { /// [`MemTable`]s. /// /// This method is only used for recovery. - #[allow(dead_code)] + #[cfg(test)] pub fn cleaned_region_ids(&self) -> HashSet { let mut ids = HashSet::default(); let removed_memtables = self.removed_memtables.lock(); @@ -1202,7 +1202,6 @@ fn has_internal_key(item: &LogItem) -> bool { matches!(&item.content, LogItemContent::Kv(KeyValue { key, .. }) if crate::is_internal_key(key, None)) } -#[derive(Debug)] struct PendingAtomicGroup { status: AtomicGroupStatus, items: Vec, diff --git a/src/pipe_log.rs b/src/pipe_log.rs index 57e94d1e..33ca4071 100644 --- a/src/pipe_log.rs +++ b/src/pipe_log.rs @@ -118,7 +118,6 @@ impl Display for Version { } } -#[derive(Debug, Clone)] pub struct LogFileContext { pub id: FileId, pub version: Version, diff --git a/src/util.rs b/src/util.rs index 363ace71..2e35a83e 100644 --- a/src/util.rs +++ b/src/util.rs @@ -286,7 +286,7 @@ pub mod lz4 { } } else if !src.is_empty() { Err(Error::Corruption(format!( - "Content to compress to short {}", + "Content to compress too short {}", src.len() ))) } else { diff --git a/stress/Cargo.toml b/stress/Cargo.toml index e89c92a5..7fa0b830 100644 --- a/stress/Cargo.toml +++ b/stress/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "stress" -version = "0.3.0" +version = "0.4.0" authors = ["The TiKV Authors"] edition = "2018" From ec6aa90163a9753e0928606b6f3f1fbfbf518e32 Mon Sep 17 00:00:00 2001 From: glorv Date: Thu, 7 Sep 2023 13:51:11 +0800 Subject: [PATCH 06/10] enlarge append recycle log threshold (#331) * increate the log recycle threshold Signed-off-by: glorv * fix prefill Signed-off-by: glorv * keepd default prefill threshold the same value as recycle threshold Signed-off-by: glorv * fix comment Signed-off-by: glorv * fix unit test Signed-off-by: glorv * revert change Signed-off-by: glorv --------- Signed-off-by: glorv --- src/config.rs | 10 +++++----- src/engine.rs | 8 +++++--- 2 files changed, 10 insertions(+), 8 deletions(-) diff --git a/src/config.rs b/src/config.rs index d41774ae..4283737f 100644 --- a/src/config.rs +++ b/src/config.rs @@ -116,7 +116,7 @@ pub struct Config { pub prefill_for_recycle: bool, /// Maximum capacity for preparing log files for recycling when start. - /// If not `None`, its size is equal to `purge-threshold`. + /// If `None`, its size is equal to `purge-threshold`*1.5. /// Only available for `prefill-for-recycle` is true. /// /// Default: None @@ -219,10 +219,10 @@ impl Config { } if self.enable_log_recycle && self.purge_threshold.0 >= self.target_file_size.0 { // (1) At most u32::MAX so that the file number can be capped into an u32 - // without colliding. (2) Add some more file as an additional buffer to - // avoid jitters. + // without colliding. (2) Increase the threshold by 50% to add some more file + // as an additional buffer to avoid jitters. std::cmp::min( - (self.purge_threshold.0 / self.target_file_size.0) as usize + 2, + (self.purge_threshold.0 / self.target_file_size.0) as usize * 3 / 2, u32::MAX as usize, ) } else { @@ -241,7 +241,7 @@ impl Config { if self.prefill_for_recycle && prefill_limit >= self.target_file_size.0 { // Keep same with the maximum setting of `recycle_capacity`. std::cmp::min( - (prefill_limit / self.target_file_size.0) as usize + 2, + (prefill_limit / self.target_file_size.0) as usize * 3 / 2, u32::MAX as usize, ) } else { diff --git a/src/engine.rs b/src/engine.rs index 4467ca43..0d055296 100644 --- a/src/engine.rs +++ b/src/engine.rs @@ -2162,6 +2162,7 @@ pub(crate) mod tests { prefill_for_recycle: true, ..Default::default() }; + let recycle_capacity = cfg.recycle_capacity() as u64; let fs = Arc::new(DeleteMonitoredFileSystem::new()); let engine = RaftLogEngine::open_with_file_system(cfg, fs.clone()).unwrap(); @@ -2197,11 +2198,11 @@ pub(crate) mod tests { assert_eq!(reserved_start_2, reserved_start_3); // Reuse all of reserved files. - for rid in 1..=50 { + for rid in 1..=recycle_capacity { engine.append(rid, 1, 11, Some(&entry_data)); } assert!(fs.reserved_metadata.lock().unwrap().is_empty()); - for rid in 1..=50 { + for rid in 1..=recycle_capacity { engine.clean(rid); } engine.purge_manager.must_rewrite_append_queue(None, None); @@ -2704,6 +2705,7 @@ pub(crate) mod tests { purge_threshold: ReadableSize(40), ..cfg.clone() }; + let recycle_capacity = cfg_2.recycle_capacity() as u64; let engine = RaftLogEngine::open_with_file_system(cfg_2, file_system.clone()).unwrap(); assert!(number_of_files(spill_dir.path()) > 0); for rid in 1..=10 { @@ -2718,7 +2720,7 @@ pub(crate) mod tests { ); assert!(file_count > engine.file_count(None)); // Append data, recycled files are reused. - for rid in 1..=30 { + for rid in 1..=recycle_capacity - 10 { engine.append(rid, 20, 30, Some(&entry_data)); } // No new file is created. From 3724bfc510b910e71a430863e9a357e28dab044b Mon Sep 17 00:00:00 2001 From: Xinye Tao Date: Mon, 11 Sep 2023 16:05:28 +0800 Subject: [PATCH 07/10] disable windows build (#333) * Empty commit Signed-off-by: tabokie * Use windows 2019 Signed-off-by: tabokie * Disable windows build Signed-off-by: tabokie --------- Signed-off-by: tabokie --- .github/workflows/rust.yml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/.github/workflows/rust.yml b/.github/workflows/rust.yml index e3eca258..2a5ce83d 100644 --- a/.github/workflows/rust.yml +++ b/.github/workflows/rust.yml @@ -10,7 +10,7 @@ jobs: runs-on: ${{ matrix.os }} strategy: matrix: - os: [ ubuntu-latest, macos-latest, windows-latest ] + os: [ ubuntu-latest, macos-latest ] steps: - uses: actions/checkout@v2 with: From 22dfb426cd994602b57725ef080287d3e53db479 Mon Sep 17 00:00:00 2001 From: Weny Xu Date: Tue, 12 Sep 2023 18:40:42 +0900 Subject: [PATCH 08/10] refactor: reduce scope of internal keys (#335) * refactor: reduce scope of internal keys Signed-off-by: WenyXu * chore: apply suggestions from CR Signed-off-by: WenyXu * Update CHANGELOG.md Signed-off-by: WenyXu * chore: apply suggestions from CR Signed-off-by: WenyXu --------- Signed-off-by: WenyXu Co-authored-by: Xinye Tao --- CHANGELOG.md | 3 +++ src/lib.rs | 26 ++++++++++++++++++++++++++ src/log_batch.rs | 4 ++-- 3 files changed, 31 insertions(+), 2 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index bddbd4f0..a06384af 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -2,6 +2,9 @@ ## [Unreleased] +### Behavior Changes +* Reduce the scope of keys reserved for internal use. + ## [0.4.0] - 2023-09-01 ### Behavior Changes diff --git a/src/lib.rs b/src/lib.rs index 25ebadd0..f282f6cc 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -167,6 +167,7 @@ impl GlobalStats { pub(crate) const INTERNAL_KEY_PREFIX: &[u8] = b"__"; #[inline] +#[cfg(test)] pub(crate) fn make_internal_key(k: &[u8]) -> Vec { assert!(!k.is_empty()); let mut v = INTERNAL_KEY_PREFIX.to_vec(); @@ -174,12 +175,23 @@ pub(crate) fn make_internal_key(k: &[u8]) -> Vec { v } +#[cfg(not(test))] +pub(crate) fn make_internal_key(k: &[u8]) -> Vec { + use log_batch::ATOMIC_GROUP_KEY; + + assert!(k == ATOMIC_GROUP_KEY); + let mut v = INTERNAL_KEY_PREFIX.to_vec(); + v.extend_from_slice(k); + v +} + /// We ensure internal keys are not visible to the user by: /// (1) Writing internal keys will be rejected by `LogBatch::put`. /// (2) Internal keys are filtered out during apply and replay of both queues. /// This also makes sure future internal keys under the prefix won't become /// visible after downgrading. #[inline] +#[cfg(test)] pub(crate) fn is_internal_key(s: &[u8], ext: Option<&[u8]>) -> bool { if let Some(ext) = ext { s.len() == INTERNAL_KEY_PREFIX.len() + ext.len() @@ -191,6 +203,20 @@ pub(crate) fn is_internal_key(s: &[u8], ext: Option<&[u8]>) -> bool { } } +#[inline] +#[cfg(not(test))] +pub(crate) fn is_internal_key(s: &[u8], ext: Option<&[u8]>) -> bool { + use log_batch::ATOMIC_GROUP_KEY; + + if let Some(ext) = ext { + s.len() == INTERNAL_KEY_PREFIX.len() + ext.len() + && s[..INTERNAL_KEY_PREFIX.len()] == *INTERNAL_KEY_PREFIX + && s[INTERNAL_KEY_PREFIX.len()..] == *ext + } else { + is_internal_key(s, Some(ATOMIC_GROUP_KEY)) + } +} + #[cfg(test)] mod tests { use crate::log_batch::MessageExt; diff --git a/src/log_batch.rs b/src/log_batch.rs index d2bbaec2..c6ce147c 100644 --- a/src/log_batch.rs +++ b/src/log_batch.rs @@ -994,7 +994,7 @@ fn verify_checksum_with_signature(buf: &[u8], signature: Option) -> Result< lazy_static! { static ref ATOMIC_GROUP_ID: Arc = Arc::new(AtomicU64::new(0)); } -const ATOMIC_GROUP_KEY: &[u8] = &[0x01]; +pub(crate) const ATOMIC_GROUP_KEY: &[u8] = &[0x01]; // const ATOMIC_GROUP_VALUE_LEN: usize = 1; @@ -1570,7 +1570,7 @@ mod tests { )); assert!(matches!( batch - .put_message(0, crate::make_internal_key(&[1]), &Entry::new()) + .put_message(0, crate::make_internal_key(ATOMIC_GROUP_KEY), &Entry::new()) .unwrap_err(), Error::InvalidArgument(_) )); From 9aa124558cde12c51f8a9c30d1cccd793d1d5b0c Mon Sep 17 00:00:00 2001 From: "dependabot[bot]" <49699333+dependabot[bot]@users.noreply.github.com> Date: Thu, 14 Sep 2023 08:27:57 +0800 Subject: [PATCH 09/10] Update toml requirement from 0.7 to 0.8 (#337) Updates the requirements on [toml](https://github.com/toml-rs/toml) to permit the latest version. - [Commits](https://github.com/toml-rs/toml/compare/toml-v0.7.0...toml-v0.8.0) --- updated-dependencies: - dependency-name: toml dependency-type: direct:production ... Signed-off-by: dependabot[bot] Co-authored-by: dependabot[bot] <49699333+dependabot[bot]@users.noreply.github.com> --- Cargo.toml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/Cargo.toml b/Cargo.toml index b88e3e59..4330fa6b 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -69,7 +69,7 @@ raft = { git = "https://github.com/tikv/raft-rs", branch = "master", default-fea rand = "0.8" rand_distr = "0.4" tempfile = "3.6" -toml = "0.7" +toml = "0.8" [features] default = ["internals", "scripting"] From fa56f891fdf0b1cb5b7849b7bee3c5dadbb96103 Mon Sep 17 00:00:00 2001 From: Xinye Tao Date: Thu, 14 Sep 2023 17:40:57 +0800 Subject: [PATCH 10/10] bump 0.4.1 (#336) * bump 0.4.1 Signed-off-by: tabokie * Update CHANGELOG.md Signed-off-by: Xinye Tao --------- Signed-off-by: tabokie Signed-off-by: Xinye Tao --- CHANGELOG.md | 4 ++++ Cargo.toml | 2 +- README.md | 2 +- ctl/Cargo.toml | 4 ++-- stress/Cargo.toml | 2 +- 5 files changed, 9 insertions(+), 5 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index a06384af..e2aaa484 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -2,7 +2,11 @@ ## [Unreleased] +## [0.4.1] - 2023-09-14 + ### Behavior Changes + +* When log recycling is enabled, Raft Engine will now retain 50% more log files to reduce the chance of running out. * Reduce the scope of keys reserved for internal use. ## [0.4.0] - 2023-09-01 diff --git a/Cargo.toml b/Cargo.toml index 4330fa6b..bf4a35ba 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "raft-engine" -version = "0.4.0" +version = "0.4.1" authors = ["The TiKV Project Developers"] edition = "2018" rust-version = "1.66.0" diff --git a/README.md b/README.md index 0caff515..bbba26ee 100644 --- a/README.md +++ b/README.md @@ -54,7 +54,7 @@ Put this in your Cargo.toml: ```rust [dependencies] -raft-engine = "0.4.0" +raft-engine = "0.4" ``` Available Cargo features: diff --git a/ctl/Cargo.toml b/ctl/Cargo.toml index 54b7ee87..5bf23c8b 100644 --- a/ctl/Cargo.toml +++ b/ctl/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "raft-engine-ctl" -version = "0.4.0" +version = "0.4.1" authors = ["The TiKV Project Developers"] edition = "2018" rust-version = "1.61.0" @@ -11,4 +11,4 @@ license = "Apache-2.0" [dependencies] clap = { version = "3.1", features = ["derive", "cargo"] } env_logger = "0.10" -raft-engine = { path = "..", version = "0.4.0", features = ["scripting", "internals"] } +raft-engine = { path = "..", version = "0.4.1", features = ["scripting", "internals"] } diff --git a/stress/Cargo.toml b/stress/Cargo.toml index 7fa0b830..79d131e2 100644 --- a/stress/Cargo.toml +++ b/stress/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "stress" -version = "0.4.0" +version = "0.4.1" authors = ["The TiKV Authors"] edition = "2018"