diff --git a/Cargo.lock b/Cargo.lock index 4f2cfd860..687b319e6 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1012,6 +1012,7 @@ dependencies = [ "usdt", "uuid", "version_check", + "zerocopy 0.7.32", ] [[package]] diff --git a/Cargo.toml b/Cargo.toml index 35e39bb24..6fb4dbe2a 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -110,6 +110,7 @@ tracing-subscriber = "0.3.18" twox-hash = "1.6.3" usdt = "0.5.0" uuid = { version = "1", features = [ "serde", "v4" ] } +zerocopy = "0.7.32" # git dropshot = { git = "https://github.com/oxidecomputer/dropshot", branch = "main", features = [ "usdt-probes" ] } diff --git a/downstairs/Cargo.toml b/downstairs/Cargo.toml index bd0e3de84..6d0c36305 100644 --- a/downstairs/Cargo.toml +++ b/downstairs/Cargo.toml @@ -54,6 +54,7 @@ tracing-subscriber.workspace = true tracing.workspace = true usdt.workspace = true uuid.workspace = true +zerocopy.workspace = true crucible-workspace-hack.workspace = true [dev-dependencies] diff --git a/downstairs/src/extent.rs b/downstairs/src/extent.rs index a4ac70f84..79cb5cfb4 100644 --- a/downstairs/src/extent.rs +++ b/downstairs/src/extent.rs @@ -141,6 +141,7 @@ pub const EXTENT_META_SQLITE: u32 = 1; /// /// See [`extent_inner_raw::RawInner`] for the implementation. pub const EXTENT_META_RAW: u32 = 2; +pub const EXTENT_META_RAW_V2: u32 = 3; impl ExtentMeta { pub fn new(ext_version: u32) -> ExtentMeta { @@ -294,6 +295,7 @@ impl Extent { def: &RegionDefinition, number: ExtentId, read_only: bool, + recordsize: u64, log: &Logger, ) -> Result { /* @@ -445,6 +447,11 @@ impl Extent { dir, def, number, read_only, log, )?) } + EXTENT_META_RAW_V2 => { + Box::new(extent_inner_raw_v2::RawInnerV2::open( + dir, def, number, read_only, recordsize, log, + )?) + } i => { return Err(CrucibleError::IoError(format!( "raw extent {number} has unknown tag {i}" @@ -489,6 +496,7 @@ impl Extent { def: &RegionDefinition, number: ExtentId, backend: Backend, + recordsize: u64, ) -> Result { /* * Store extent data in files within a directory hierarchy so that @@ -507,9 +515,15 @@ impl Extent { remove_copy_cleanup_dir(dir, number)?; let inner: Box = match backend { + #[cfg(any(test, feature = "integration-tests"))] Backend::RawFile => { Box::new(extent_inner_raw::RawInner::create(dir, def, number)?) } + Backend::RawFileV2 => { + Box::new(extent_inner_raw_v2::RawInnerV2::create( + dir, def, number, recordsize, + )?) + } #[cfg(any(test, feature = "integration-tests"))] Backend::SQLite => Box::new( extent_inner_sqlite::SqliteInner::create(dir, def, number)?, diff --git a/downstairs/src/extent_inner_raw.rs b/downstairs/src/extent_inner_raw.rs index a2bad6695..a7c58c7e1 100644 --- a/downstairs/src/extent_inner_raw.rs +++ b/downstairs/src/extent_inner_raw.rs @@ -358,6 +358,11 @@ impl ExtentInner for RawInner { req.offset.0 as i64 * block_size as i64, ) }; + + cdt::extent__read__file__done!(|| { + (job_id.0, self.extent_number.0, num_blocks) + }); + // Check against the expected number of bytes. We could do more // robust error handling here (e.g. retrying in a loop), but for // now, simply bailing out seems wise. @@ -380,10 +385,6 @@ impl ExtentInner for RawInner { buf.set_len(expected_bytes); } - cdt::extent__read__file__done!(|| { - (job_id.0, self.extent_number.0, num_blocks) - }); - Ok(ExtentReadResponse { data: buf, blocks }) } @@ -1104,18 +1105,11 @@ impl RawInner { } /// Data structure that implements the on-disk layout of a raw extent file +#[derive(Debug)] struct RawLayout { extent_size: Block, } -impl std::fmt::Debug for RawLayout { - fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { - f.debug_struct("RawLayout") - .field("extent_size", &self.extent_size) - .finish() - } -} - impl RawLayout { fn new(extent_size: Block) -> Self { RawLayout { extent_size } diff --git a/downstairs/src/extent_inner_raw_v2.rs b/downstairs/src/extent_inner_raw_v2.rs new file mode 100644 index 000000000..92a0242e0 --- /dev/null +++ b/downstairs/src/extent_inner_raw_v2.rs @@ -0,0 +1,1025 @@ +// Copyright 2024 Oxide Computer Company +use crate::{ + cdt, + extent::{check_input, extent_path, ExtentInner, EXTENT_META_RAW_V2}, + extent_inner_raw_common::{ + pread_all, pwrite_all, OnDiskMeta, BLOCK_META_SIZE_BYTES, + }, + mkdir_for_file, + region::JobOrReconciliationId, + Block, CrucibleError, ExtentReadRequest, ExtentReadResponse, ExtentWrite, + JobId, RegionDefinition, +}; + +use crucible_common::{BlockOffset, ExtentId}; +use crucible_protocol::ReadBlockContext; +use slog::{error, Logger}; + +use std::io::{BufReader, Read}; +use std::os::fd::{AsFd, AsRawFd}; +use std::path::Path; +use std::{ + fs::{File, OpenOptions}, + io::IoSlice, +}; +use zerocopy::AsBytes; + +/// Recordsize for non-ZFS systems (matching the ZFS default) +pub(crate) const DUMMY_RECORDSIZE: u64 = 128 * 1024; + +/// Max of serialized block context (which is a [`ReadBlockContext`]) +pub(crate) const BLOCK_CONTEXT_SIZE_BYTES: u64 = 32; + +/// `RawInnerV2` is a wrapper around a [`std::fs::File`] representing an extent +/// +/// The file contains [`extent_size`](RawInnerV2::extent_size) blocks, which I'm +/// writing as `N` for simplicity here. +/// +/// # File organization +/// The file is structured in four parts. Getting specific offsets within the +/// file is implemented in the [`RawLayout`] helper class. +/// +/// ## Block data and contexts +/// Block data and per-block contexts are paired up as follows: +/// ```text +/// [ ----- block ----- | context ] x R +/// [ --- padding to recordsize --- ] +/// [ ----- block ----- | context ] x R +/// [ --- padding to recordsize --- ] +/// [ ----- block ----- | context ] x R +/// [ --- padding to recordsize --- ] +/// ``` +/// +/// where `R` is the recordsize / (block + context size) +/// +/// We don't want to split blocks or block-context pairs across ZFS records, +/// because that could make writes non-atomic. +/// +/// ## Expected recordsize +/// After the block data, we store a single `u64` representing the expected +/// recordsize when the file was written. When the file is reopened, we detect +/// if its recordsize has changed, which would be surprising! +/// +/// ## Written blocks array +/// The next section of the file contains a bit-packed array indicating whether +/// each block is written (where 0 is unwritten and 1 is written). It takes up +/// `N.div_ceil(8)` bytes. It is only valid when the `dirty` bit is cleared. +/// This is an optimization that speeds up opening a clean extent file; +/// otherwise, we would have to read every block to find whether it has been +/// written or not. +/// +/// ## File metadata +/// The last [`BLOCK_META_SIZE_BYTES`] in the file contain an [`OnDiskMeta`] +/// serialized using `bincode`. The first byte of this range is `dirty`, +/// serialized as a `u8` (where `1` is dirty and `0` is clean). +/// +/// There are a few considerations that led to this particular ordering: +/// - The written blocks array and metadata must be contiguous, because we want +/// to write them atomically when clearing the `dirty` flag. +/// - We have multiple different raw file formats, but they all place an +/// [`OnDiskMeta`] in the last [`BLOCK_META_SIZE_BYTES`] bytes of the file. +/// This means we can read the metadata and pick the correct extent version. +/// +/// # Safety +/// This raw file format relies heavily on ZFS for guarantees. Specifically, it +/// relies on the following properties: +/// +/// - Data will not be corrupted on-disk (enforced by ZFS's checksums) +/// - A single `pwritev` call which only touches a single ZFS record is +/// guaranteed to be all-or-nothing; it cannot be split or partially land. +/// +/// The latter is not obvious, and requires reading the ZFS source code; see +/// [https://rfd.shared.oxide.computer/rfd/490#_designing_for_crash_consistency](RFD 492) +/// for a more detailed analysis. +/// +/// To enforce all-or-nothing writes, data which must land together cannot span +/// multiple ZFS records. This means that our file layout is dependent on +/// recordsize (see [`RawLayout`]). In addition, we store the expected +/// recordsize in the file and check it when reopening. +/// +/// Changing the dataset's recordsize (or copying an extent file to a dataset +/// with a different recordsize) will produce an error when loading the extent +/// file. +#[derive(Debug)] +pub struct RawInnerV2 { + file: File, + + /// Our extent number + extent_number: ExtentId, + + /// Extent size, in blocks + extent_size: Block, + + /// Helper `struct` controlling layout within the file + layout: RawLayout, + + /// Has this block been written? + block_written: Vec, + + /// Local cache for the `dirty` value + /// + /// This allows us to only write the flag when the value changes + dirty: bool, +} + +impl ExtentInner for RawInnerV2 { + fn flush_number(&self) -> Result { + self.get_metadata().map(|v| v.flush_number) + } + + fn gen_number(&self) -> Result { + self.get_metadata().map(|v| v.gen_number) + } + + fn dirty(&self) -> Result { + Ok(self.dirty) + } + + fn write( + &mut self, + job_id: JobId, + write: &ExtentWrite, + only_write_unwritten: bool, + iov_max: usize, + ) -> Result<(), CrucibleError> { + check_input(self.extent_size, write.offset, write.data.len())?; + let n_blocks = write.block_contexts.len(); + + let start_block = write.offset; + let block_size = self.extent_size.block_size_in_bytes(); + let mut block = start_block; + + struct WriteChunk<'a> { + offset: BlockOffset, + iovecs: Vec>, + } + let mut iovecs = vec![]; + + let ctxs = write + .block_contexts + .iter() + .map(|ctx| { + let mut buf = [0u8; BLOCK_CONTEXT_SIZE_BYTES as usize]; + let ctx = ReadBlockContext::from(*ctx); + bincode::serialize_into(buf.as_mut_slice(), &ctx).unwrap(); + buf + }) + .collect::>(); + + let padding = vec![0u8; self.layout.padding_size() as usize]; + let mut expected_bytes = 0; + let mut start_new_chunk = true; + for (data, ctx) in write.data.chunks(block_size as usize).zip(&ctxs) { + if only_write_unwritten && self.block_written[block.0 as usize] { + start_new_chunk = true; + block.0 += 1; + continue; + } + if start_new_chunk { + iovecs.push(WriteChunk { + offset: block, + iovecs: vec![], + }); + start_new_chunk = false + } + let vs = &mut iovecs.last_mut().unwrap().iovecs; + vs.push(IoSlice::new(data)); + vs.push(IoSlice::new(ctx)); + expected_bytes += data.len() + ctx.len(); + if self.layout.has_padding_after(block) { + vs.push(IoSlice::new(&padding)); + expected_bytes += padding.len(); + } + block.0 += 1; + + // If the next write would exceed iov_max, set the `start_new_chunk` + // flag so that we begin a new chunk before pushing the write. + start_new_chunk |= + vs.len() + 2 + self.layout.has_padding_after(block) as usize + > iov_max; + } + + if iovecs.is_empty() { + return Ok(()); + } + + self.set_dirty()?; + + cdt::extent__write__file__start!(|| { + (job_id.0, self.extent_number.0, n_blocks as u64) + }); + + // Now execute each chunk in a separate `pwritev` call + let mut total_bytes = 0; + let mut err = None; + for c in iovecs { + let start_pos = self.layout.block_pos(c.offset); + let r = nix::sys::uio::pwritev( + self.file.as_fd(), + &c.iovecs, + start_pos as i64, + ) + .map_err(|e| { + CrucibleError::IoError(format!( + "extent {}: write failed: {e}", + self.extent_number + )) + }); + match r { + Err(e) => err = Some(e), + Ok(num_bytes) => total_bytes += num_bytes, + }; + } + + let r = match err { + Some(e) => Err(e), + None if total_bytes != expected_bytes => { + Err(CrucibleError::IoError(format!( + "extent {}: incomplete write \ + (expected {expected_bytes}, got {total_bytes})", + self.extent_number + ))) + } + None => Ok(()), + }; + + if r.is_err() { + for i in 0..n_blocks { + let block = BlockOffset(write.offset.0 + i as u64); + + // If the write failed, then the block may or may not have + // landed on disk. Read it back and update `self.block_written` + // accordingly. If this fails, then we _really_ can't recover, + // so bail out unceremoniously. + self.recompute_block_written_from_file(block).unwrap(); + } + } else { + // Now that writes have gone through, mark as written + self.block_written[(write.offset.0) as usize..][..n_blocks] + .fill(true); + } + cdt::extent__write__file__done!(|| { + (job_id.0, self.extent_number.0, n_blocks as u64) + }); + + r + } + + fn read( + &mut self, + job_id: JobId, + req: ExtentReadRequest, + iov_max: usize, + ) -> Result { + let mut buf = req.data; + let block_size = self.extent_size.block_size_in_bytes() as usize; + let start_block = req.offset; + let num_blocks = buf.capacity() / block_size; + check_input(self.extent_size, start_block, buf.capacity())?; + + let start_pos = self.layout.block_pos(start_block); + let mut buf_ptr = + buf.spare_capacity_mut().as_mut_ptr() as *mut libc::c_void; + + let mut ctxs = + vec![[0u8; BLOCK_CONTEXT_SIZE_BYTES as usize]; num_blocks]; + let mut padding = vec![0u8; self.layout.padding_size() as usize]; + let mut iovecs = Vec::with_capacity(num_blocks * 3); + + let mut block = start_block; + let mut padding_count = 0; + for ctx in &mut ctxs { + iovecs.push(libc::iovec { + iov_base: buf_ptr, + iov_len: block_size, + }); + iovecs.push(libc::iovec { + iov_base: ctx as *mut _ as *mut _, + iov_len: BLOCK_CONTEXT_SIZE_BYTES as usize, + }); + if self.layout.has_padding_after(block) { + iovecs.push(libc::iovec { + iov_base: padding.as_mut_ptr() as *mut _, + iov_len: padding.len(), + }); + padding_count += 1; + } + buf_ptr = buf_ptr.wrapping_add(block_size); + block.0 += 1; + } + + // How many bytes do we expect `preadv` to return? + let expected_bytes = num_blocks + * (block_size + BLOCK_CONTEXT_SIZE_BYTES as usize) + + padding.len() * padding_count as usize; + + // Finally we get to read the actual data. That's why we're here + cdt::extent__read__file__start!(|| { + (job_id.0, self.extent_number.0, num_blocks as u64) + }); + let mut total_bytes = 0; + for iov in iovecs.chunks(iov_max) { + let r = unsafe { + libc::preadv( + self.file.as_raw_fd(), + iov.as_ptr(), + iov.len() as libc::c_int, + start_pos as i64 + total_bytes as i64, + ) + }; + + // Check against the expected number of bytes. We could do more + // robust error handling here (e.g. retrying in a loop), but for + // now, simply bailing out seems wise. + let r = nix::errno::Errno::result(r).map(|r| r as usize); + let num_bytes = match r.map_err(|e| { + CrucibleError::IoError(format!( + "extent {}: read failed: {e}", + self.extent_number + )) + }) { + Ok(n) => n, + Err(e) => { + // Early exit, so fire the DTrace probe here + cdt::extent__read__file__done!(|| { + (job_id.0, self.extent_number.0, num_blocks as u64) + }); + return Err(e); + } + }; + total_bytes += num_bytes; + } + + // Fire the DTrace probe to indicate the read is done + cdt::extent__read__file__done!(|| { + (job_id.0, self.extent_number.0, num_blocks as u64) + }); + + if total_bytes != expected_bytes as usize { + return Err(CrucibleError::IoError(format!( + "extent {}: incomplete read \ + (expected {expected_bytes}, got {total_bytes})", + self.extent_number + ))); + } + + // SAFETY: we just initialized this chunk of the buffer + unsafe { + buf.set_len(num_blocks * block_size); + } + + let blocks = ctxs + .into_iter() + .map(|ctx| bincode::deserialize(&ctx)) + .collect::, _>>() + .map_err(|e| CrucibleError::BadContextSlot(e.to_string()))?; + + Ok(ExtentReadResponse { data: buf, blocks }) + } + + fn flush( + &mut self, + new_flush: u64, + new_gen: u64, + job_id: JobOrReconciliationId, + ) -> Result<(), CrucibleError> { + cdt::extent__flush__start!(|| { + (job_id.get(), self.extent_number.0, 0) + }); + + // We put all of our metadata updates into a single write to make this + // operation atomic. + self.set_flush_number(new_flush, new_gen)?; + + // Now, we fsync to ensure data is flushed to disk. It's okay to crash + // before this point, because setting the flush number is atomic. + cdt::extent__flush__file__start!(|| { + (job_id.get(), self.extent_number.0, 0) + }); + if let Err(e) = self.file.sync_all() { + /* + * XXX Retry? Mark extent as broken? + */ + return Err(CrucibleError::IoError(format!( + "extent {}: fsync 1 failure: {e:?}", + self.extent_number, + ))); + } + cdt::extent__flush__file__done!(|| { + (job_id.get(), self.extent_number.0, 0) + }); + + cdt::extent__flush__done!(|| { + (job_id.get(), self.extent_number.0, 0) + }); + + Ok(()) + } + + #[cfg(test)] + fn get_block_contexts( + &mut self, + _block: u64, + _count: u64, + ) -> Result>, CrucibleError> + { + panic!("cannot get block contexts outside of a read"); + } + + #[cfg(test)] + fn set_dirty_and_block_context( + &mut self, + _block_context: &crate::extent::DownstairsBlockContext, + ) -> Result<(), CrucibleError> { + panic!("cannot set block contexts outside of a write"); + } +} + +impl RawInnerV2 { + pub fn create( + dir: &Path, + def: &RegionDefinition, + extent_number: ExtentId, + extent_recordsize: u64, + ) -> Result { + let path = extent_path(dir, extent_number); + let extent_size = def.extent_size(); + let layout = RawLayout::new(extent_size, extent_recordsize); + let size = layout.file_size(); + + mkdir_for_file(&path)?; + let file = OpenOptions::new() + .read(true) + .write(true) + .create(true) + .open(&path)?; + + // All 0s are fine for everything except recordsize and metadata + file.set_len(size)?; + layout.write_recordsize(&file, extent_recordsize)?; + let mut out = Self { + file, + dirty: false, + extent_size, + block_written: vec![false; def.extent_size().value as usize], + layout, + extent_number, + }; + // Setting the flush number also writes the extent version, since + // they're serialized together in the same block. + out.set_flush_number(0, 0)?; + + // Sync the file to disk, to avoid any questions + if let Err(e) = out.file.sync_all() { + return Err(CrucibleError::IoError(format!( + "extent {}: fsync 1 failure during initial sync: {e}", + out.extent_number, + ))); + } + Ok(out) + } + + /// Constructs a new `Inner` object from files that already exist on disk + pub fn open( + dir: &Path, + def: &RegionDefinition, + extent_number: ExtentId, + read_only: bool, + recordsize: u64, + log: &Logger, + ) -> Result { + let path = extent_path(dir, extent_number); + let extent_size = def.extent_size(); + let layout = RawLayout::new(extent_size, recordsize); + let size = layout.file_size(); + + /* + * Open the extent file and verify the size is as we expect. + */ + let file = + match OpenOptions::new().read(true).write(!read_only).open(&path) { + Err(e) => { + error!( + log, + "Open of {path:?} for extent#{extent_number} \ + returned: {e}", + ); + return Err(CrucibleError::IoError(format!( + "extent {extent_number}: open of {path:?} failed: {e}", + ))); + } + Ok(f) => { + let cur_size = f.metadata().unwrap().len(); + if size != cur_size { + return Err(CrucibleError::IoError(format!( + "extent {extent_number}: file size {cur_size:?} \ + does not match expected {size:?}", + ))); + } + f + } + }; + + // Just in case, let's be very sure that the file on disk is what it + // should be + if !read_only { + if let Err(e) = file.sync_all() { + return Err(CrucibleError::IoError(format!( + "extent {extent_number}: \ + fsync 1 failure during initial rehash: {e}", + ))); + } + } + + let expected_recordsize = layout.get_recordsize(&file)?; + if expected_recordsize != recordsize { + return Err(CrucibleError::IoError(format!( + "recordsize for extent {extent_number} has changed: \ + expected {expected_recordsize}, \ + got {recordsize} from filesystem", + ))); + } + + let meta = layout.get_metadata(&file)?; + + // If the file is dirty, then we have to recompute whether blocks are + // written or not. This is slow, but can't be avoided; we closed the + // file without a flush so we can't be confident about the data that was + // on disk. + let block_written = if !meta.dirty { + // Easy case first: if it's **not** dirty, then just assign active + // slots based on the bitpacked active context buffer from the file. + layout.get_block_written_array(&file)? + } else { + // Now that we've read the context slot arrays, read file data and + // figure out which context slot is active. + let mut file_buffered = BufReader::with_capacity(64 * 1024, &file); + let mut block_written = vec![]; + for _ in 0..layout.block_count() { + // Read the variant tag, which is 0 for ReadBlockContext::Empty + let mut tag = 0u32; + file_buffered.read_exact(tag.as_bytes_mut())?; + block_written.push(tag != 0); + + // Skip the bulk data, on to the next block's context slot + file_buffered + .seek_relative(extent_size.block_size_in_bytes() as i64)?; + } + block_written + }; + + Ok(Self { + file, + dirty: meta.dirty, + extent_number, + extent_size: def.extent_size(), + block_written, + layout, + }) + } + + fn set_dirty(&mut self) -> Result<(), CrucibleError> { + if !self.dirty { + self.layout.set_dirty(&self.file)?; + self.dirty = true; + } + Ok(()) + } + + /// Updates `self.block_written[block]` based on data read from the file + /// + /// Specifically, if the context is written (has a non-zero `tag`), then the + /// block is guaranteed to be written, because they are always written + /// together in an atomic operation. + /// + /// We expect to call this function rarely, so it does not attempt to + /// minimize the number of syscalls it executes. + fn recompute_block_written_from_file( + &mut self, + block: BlockOffset, + ) -> Result<(), CrucibleError> { + let pos = self.layout.context_slot(block) as i64; + let mut tag = 0u32; + pread_all(self.file.as_fd(), tag.as_bytes_mut(), pos).map_err(|e| { + CrucibleError::IoError(format!( + "extent {}: reading block {} data failed: {e}", + self.extent_number, block.0 + )) + })?; + + self.block_written[block.0 as usize] = tag != 0; + Ok(()) + } + + fn get_metadata(&self) -> Result { + self.layout.get_metadata(&self.file) + } + + /// Update the flush number, generation number, and clear the dirty bit + fn set_flush_number( + &mut self, + new_flush: u64, + new_gen: u64, + ) -> Result<(), CrucibleError> { + self.layout.write_block_written_and_metadata( + &self.file, + &self.block_written, + false, // dirty + new_flush, + new_gen, + )?; + self.dirty = false; + Ok(()) + } +} + +/// Data structure that implements the on-disk layout of a raw extent file +#[derive(Debug)] +struct RawLayout { + extent_size: Block, + recordsize: u64, +} + +impl RawLayout { + fn new(extent_size: Block, recordsize: u64) -> Self { + RawLayout { + extent_size, + recordsize, + } + } + + /// Sets the dirty flag in the file true + /// + /// This unconditionally writes to the file; to avoid extra syscalls, it + /// would be wise to cache this at a higher level and only write if it has + /// changed. + fn set_dirty(&self, file: &File) -> Result<(), CrucibleError> { + let offset = self.metadata_offset(); + pwrite_all(file.as_fd(), &[1u8], offset as i64).map_err(|e| { + CrucibleError::IoError(format!("writing dirty byte failed: {e}",)) + })?; + Ok(()) + } + + /// Returns the total size of the raw data file + /// + /// This includes block data, context slots, active slot array, and metadata + fn file_size(&self) -> u64 { + self.metadata_offset() + BLOCK_META_SIZE_BYTES + } + + /// Number of blocks in the extent file + fn block_count(&self) -> u64 { + self.extent_size.value + } + + /// Records the position of the 8-byte recordsize field + fn recordsize_offset(&self) -> u64 { + let bpr = self.blocks_per_record(); + let bc = self.block_count(); + + if bc % bpr == 0 { + (bc / bpr) * self.recordsize + } else { + let record_count = bc / bpr; + let trailing_blocks = bc - record_count * bpr; + + // Make sure that metadata isn't torn across two records; if that's + // the case, then snap to the next recordsize boundary + let start_offset = record_count * self.recordsize + + trailing_blocks + * (self.block_size() + BLOCK_CONTEXT_SIZE_BYTES); + let end_offset = start_offset + + 8 + + self.block_written_array_size() + + BLOCK_META_SIZE_BYTES; + + let start_record = start_offset / self.recordsize; + let end_record = end_offset / self.recordsize; + if start_record == end_record { + start_offset + } else { + end_record * self.recordsize + } + } + } + + /// Returns the byte offset of the `block_written` bitpacked array + fn block_written_array_offset(&self) -> u64 { + self.recordsize_offset() + std::mem::size_of::() as u64 + } + + /// Returns the size of the `block_written` bitpacked array, in bytes + fn block_written_array_size(&self) -> u64 { + self.block_count().div_ceil(8) + } + + /// Returns the offset of the metadata chunk of the file + fn metadata_offset(&self) -> u64 { + self.block_written_array_offset() + self.block_written_array_size() + } + + /// Number of bytes in each block + fn block_size(&self) -> u64 { + self.extent_size.block_size_in_bytes() as u64 + } + + /// Reads the expected recordsize from the file + fn get_recordsize(&self, file: &File) -> Result { + let mut v = 0u64; + let offset = self.recordsize_offset(); + pread_all(file.as_fd(), v.as_bytes_mut(), offset as i64).map_err( + |e| { + CrucibleError::IoError(format!( + "reading recordsize failed: {e}" + )) + }, + )?; + Ok(v) + } + + /// Writes the expected recordsize from the file + fn write_recordsize( + &self, + file: &File, + recordsize: u64, + ) -> Result<(), CrucibleError> { + let offset = self.recordsize_offset(); + pwrite_all(file.as_fd(), recordsize.as_bytes(), offset as i64) + .map_err(|e| { + CrucibleError::IoError(format!("writing metadata failed: {e}")) + })?; + Ok(()) + } + + /// Reads metadata from the file + fn get_metadata(&self, file: &File) -> Result { + let mut buf = [0u8; BLOCK_META_SIZE_BYTES as usize]; + let offset = self.metadata_offset(); + pread_all(file.as_fd(), &mut buf, offset as i64).map_err(|e| { + CrucibleError::IoError(format!("reading metadata failed: {e}")) + })?; + let out: OnDiskMeta = bincode::deserialize(&buf) + .map_err(|e| CrucibleError::BadMetadata(e.to_string()))?; + Ok(out) + } + + /// Write out the metadata section of the file + /// + /// This is done in a single write, so it should be atomic. + /// + /// # Panics + /// `block_written.len()` must match `self.block_count()`, and the function + /// will panic otherwise. + fn write_block_written_and_metadata( + &self, + file: &File, + block_written: &[bool], + dirty: bool, + flush_number: u64, + gen_number: u64, + ) -> Result<(), CrucibleError> { + assert_eq!(block_written.len(), self.block_count() as usize); + + let mut buf = vec![]; + for c in block_written.chunks(8) { + let mut v = 0; + for (i, w) in c.iter().enumerate() { + v |= (*w as u8) << i; + } + buf.push(v); + } + + let d = OnDiskMeta { + dirty, + flush_number, + gen_number, + ext_version: EXTENT_META_RAW_V2, + }; + let mut meta = [0u8; BLOCK_META_SIZE_BYTES as usize]; + bincode::serialize_into(meta.as_mut_slice(), &d).unwrap(); + buf.extend(meta); + + let offset = self.block_written_array_offset(); + + pwrite_all(file.as_fd(), &buf, offset as i64).map_err(|e| { + CrucibleError::IoError(format!("writing metadata failed: {e}")) + })?; + + Ok(()) + } + + /// Decodes the block written array from the given file + /// + /// The file descriptor offset is not changed by this function + fn get_block_written_array( + &self, + file: &File, + ) -> Result, CrucibleError> { + let mut buf = vec![0u8; self.block_written_array_size() as usize]; + let offset = self.block_written_array_offset(); + pread_all(file.as_fd(), &mut buf, offset as i64).map_err(|e| { + CrucibleError::IoError(format!( + "could not read active contexts: {e}" + )) + })?; + + let mut block_written = vec![]; + for bit in buf + .iter() + .flat_map(|b| (0..8).map(move |i| b & (1 << i))) + .take(self.block_count() as usize) + { + // Unpack bits from each byte + block_written.push(bit != 0); + } + assert_eq!(block_written.len(), self.block_count() as usize); + Ok(block_written) + } + + /// Returns the starting point in the file for the given block + fn block_pos(&self, block: BlockOffset) -> u64 { + let bpr = self.blocks_per_record(); + let record = block.0 / bpr; + let block = block.0 % bpr; + record * self.recordsize + + block * (self.block_size() + BLOCK_CONTEXT_SIZE_BYTES) + } + + /// Returns the position of the given block's context + fn context_slot(&self, block: BlockOffset) -> u64 { + self.block_pos(block) + self.block_size() + } + + /// Returns the number of blocks that fit into a ZFS recordsize + fn blocks_per_record(&self) -> u64 { + // Each block contains data and a single context slot + let bytes_per_block = self.block_size() + BLOCK_CONTEXT_SIZE_BYTES; + self.recordsize / bytes_per_block + } + + /// Checks whether there is padding after the given block + fn has_padding_after(&self, block: BlockOffset) -> bool { + // No padding at the end of the data section + if block.0 == self.block_count() - 1 { + return false; + } + // Otherwise, there's padding at the end of each block-pair-group + let bpr = self.blocks_per_record(); + (block.0 % bpr) == bpr - 1 + } + + /// Returns the size of `recordsize` padding + fn padding_size(&self) -> u64 { + let bpr = self.blocks_per_record(); + self.recordsize - bpr * (self.block_size() + BLOCK_CONTEXT_SIZE_BYTES) + } +} + +#[cfg(test)] +mod test { + use super::*; + use anyhow::Result; + use bytes::{Bytes, BytesMut}; + use crucible_common::{build_logger, integrity_hash}; + use crucible_protocol::BlockContext; + use tempfile::tempdir; + + const IOV_MAX_TEST: usize = 1000; + + fn new_region_definition() -> RegionDefinition { + let opt = crate::region::test::new_region_options(); + RegionDefinition::from_options(&opt).unwrap() + } + + #[test] + fn test_recordsize_change() -> Result<()> { + let dir = tempdir()?; + + let def = new_region_definition(); + let eid = ExtentId(0); + let inner = + RawInnerV2::create(dir.as_ref(), &def, eid, DUMMY_RECORDSIZE) + .unwrap(); + let recordsize = inner.layout.recordsize; + + // Manually tweak the recordsize in the raw file's on-disk data + inner + .layout + .write_recordsize(&inner.file, recordsize / 2) + .unwrap(); + + // Reopen, which should fail due to a recordsize mismatch + let reopen = RawInnerV2::open( + dir.as_ref(), + &def, + eid, + false, + DUMMY_RECORDSIZE, + &build_logger(), + ); + assert!(reopen.is_err()); + + Ok(()) + } + + #[test] + fn test_metadata_position() { + let layout = RawLayout::new(Block::new(240, 9), DUMMY_RECORDSIZE); + assert!(layout.file_size() > DUMMY_RECORDSIZE); + assert!(layout.recordsize_offset() == DUMMY_RECORDSIZE); + + let layout = RawLayout::new(Block::new(230, 9), DUMMY_RECORDSIZE); + assert!(layout.file_size() < DUMMY_RECORDSIZE); + assert!(layout.recordsize_offset() < DUMMY_RECORDSIZE); + } + + #[test] + fn test_write_unwritten_without_flush() -> Result<()> { + let dir = tempdir()?; + let mut inner = RawInnerV2::create( + dir.as_ref(), + &new_region_definition(), + ExtentId(0), + DUMMY_RECORDSIZE, + ) + .unwrap(); + + // Write a block, but don't flush. + let data = Bytes::from(vec![0x55; 512]); + let hash = integrity_hash(&[&data[..]]); + let write = ExtentWrite { + offset: BlockOffset(0), + data, + block_contexts: vec![BlockContext { + encryption_context: None, + hash, + }], + }; + inner.write(JobId(10), &write, false, IOV_MAX_TEST)?; + let prev_hash = hash; + + // The context should be in place, though we haven't flushed yet + + // Therefore, we expect that write_unwritten to the first block won't + // do anything. + { + let data = Bytes::from(vec![0x66; 512]); + let hash = integrity_hash(&[&data[..]]); + let block_context = BlockContext { + encryption_context: None, + hash, + }; + let write = ExtentWrite { + offset: BlockOffset(0), + data: data.clone(), + block_contexts: vec![block_context], + }; + inner.write(JobId(20), &write, true, IOV_MAX_TEST)?; + + let read = ExtentReadRequest { + offset: BlockOffset(0), + data: BytesMut::with_capacity(512), + }; + let resp = inner.read(JobId(21), read, IOV_MAX_TEST)?; + + // We should get back our old data, because block 0 was written. + assert_eq!( + resp.blocks, + vec![ReadBlockContext::Unencrypted { hash: prev_hash }] + ); + assert_ne!(resp.data, BytesMut::from(data.as_ref())); + } + + // But, writing to the second block still should! + { + let data = Bytes::from(vec![0x66; 512]); + let hash = integrity_hash(&[&data[..]]); + let block_context = BlockContext { + encryption_context: None, + hash, + }; + let write = ExtentWrite { + offset: BlockOffset(1), + data: data.clone(), + block_contexts: vec![block_context], + }; + inner.write(JobId(30), &write, true, IOV_MAX_TEST)?; + + let read = ExtentReadRequest { + offset: BlockOffset(1), + data: BytesMut::with_capacity(512), + }; + let resp = inner.read(JobId(31), read, IOV_MAX_TEST)?; + + // We should get back our data! Block 1 was never written. + assert_eq!( + resp.blocks, + vec![ReadBlockContext::Unencrypted { hash }] + ); + assert_eq!(resp.data, BytesMut::from(data.as_ref())); + } + + Ok(()) + } +} diff --git a/downstairs/src/lib.rs b/downstairs/src/lib.rs index 686c7595d..5dc896ae5 100644 --- a/downstairs/src/lib.rs +++ b/downstairs/src/lib.rs @@ -46,6 +46,7 @@ mod stats; mod extent_inner_raw; pub(crate) mod extent_inner_raw_common; +mod extent_inner_raw_v2; mod extent_inner_sqlite; use extent::ExtentState; @@ -3416,9 +3417,12 @@ enum WrappedStream { /// tests, it can be useful to create volumes using older backends. #[derive(Copy, Clone, Default, Debug, PartialEq)] pub enum Backend { - #[default] + #[cfg(any(test, feature = "integration-tests"))] RawFile, + #[default] + RawFileV2, + #[cfg(any(test, feature = "integration-tests"))] SQLite, } @@ -5463,7 +5467,10 @@ mod test { Ok(()) } - fn build_test_downstairs(read_only: bool) -> Result { + fn build_test_downstairs( + read_only: bool, + dir: &Path, + ) -> Result { let block_size: u64 = 512; let extent_size = 4; @@ -5476,13 +5483,12 @@ mod test { )); region_options.set_uuid(Uuid::new_v4()); - let dir = tempdir()?; - mkdir_for_file(dir.path())?; + mkdir_for_file(dir)?; - let mut region = Region::create(&dir, region_options, csl())?; + let mut region = Region::create(dir, region_options, csl())?; region.extend(2, Backend::default())?; - let path_dir = dir.as_ref().to_path_buf(); + let path_dir = dir.to_path_buf(); let mut ds = Downstairs::new_builder(&path_dir, read_only) .set_logger(csl()) @@ -5493,7 +5499,8 @@ mod test { #[test] fn test_promote_to_active_one_read_write() -> Result<()> { - let mut ds = build_test_downstairs(false)?; + let dir = tempdir()?; + let mut ds = build_test_downstairs(false, dir.path())?; let upstairs_connection = UpstairsConnection { upstairs_id: Uuid::new_v4(), @@ -5511,7 +5518,8 @@ mod test { #[test] fn test_promote_to_active_one_read_only() -> Result<()> { - let mut ds = build_test_downstairs(true)?; + let dir = tempdir()?; + let mut ds = build_test_downstairs(true, dir.path())?; let upstairs_connection = UpstairsConnection { upstairs_id: Uuid::new_v4(), @@ -5532,7 +5540,8 @@ mod test { ) -> Result<()> { // Attempting to activate multiple read-write (where it's different // Upstairs) but with the same gen should be blocked - let mut ds = build_test_downstairs(false)?; + let dir = tempdir()?; + let mut ds = build_test_downstairs(false, dir.path())?; let upstairs_connection_1 = UpstairsConnection { upstairs_id: Uuid::new_v4(), @@ -5577,7 +5586,8 @@ mod test { ) -> Result<()> { // Attempting to activate multiple read-write (where it's different // Upstairs) but with a lower gen should be blocked. - let mut ds = build_test_downstairs(false)?; + let dir = tempdir()?; + let mut ds = build_test_downstairs(false, dir.path())?; let upstairs_connection_1 = UpstairsConnection { upstairs_id: Uuid::new_v4(), @@ -5626,7 +5636,8 @@ mod test { // Attempting to activate multiple read-write (where it's the same // Upstairs but a different session) will block the "new" connection // if it has the same generation number. - let mut ds = build_test_downstairs(false)?; + let dir = tempdir()?; + let mut ds = build_test_downstairs(false, dir.path())?; let upstairs_connection_1 = UpstairsConnection { upstairs_id: Uuid::new_v4(), @@ -5671,7 +5682,8 @@ mod test { // Attempting to activate multiple read-write where it's the same // Upstairs, but a different session, and with a larger generation // should allow the new connection to take over. - let mut ds = build_test_downstairs(false)?; + let dir = tempdir()?; + let mut ds = build_test_downstairs(false, dir.path())?; let upstairs_connection_1 = UpstairsConnection { upstairs_id: Uuid::new_v4(), @@ -5713,7 +5725,8 @@ mod test { fn test_promote_to_active_multi_read_only_different_uuid() -> Result<()> { // Activating multiple read-only with different Upstairs UUIDs should // work. - let mut ds = build_test_downstairs(true)?; + let dir = tempdir()?; + let mut ds = build_test_downstairs(true, dir.path())?; let upstairs_connection_1 = UpstairsConnection { upstairs_id: Uuid::new_v4(), @@ -5754,7 +5767,8 @@ mod test { fn test_promote_to_active_multi_read_only_same_uuid() -> Result<()> { // Activating multiple read-only with the same Upstairs UUID should // kick out the other active one. - let mut ds = build_test_downstairs(true)?; + let dir = tempdir()?; + let mut ds = build_test_downstairs(true, dir.path())?; let upstairs_connection_1 = UpstairsConnection { upstairs_id: Uuid::new_v4(), @@ -5795,7 +5809,8 @@ mod test { #[test] fn test_multiple_read_only_no_job_id_collision() -> Result<()> { // Two read-only Upstairs shouldn't see each other's jobs - let mut ds = build_test_downstairs(true)?; + let dir = tempdir()?; + let mut ds = build_test_downstairs(true, dir.path())?; let upstairs_connection_1 = UpstairsConnection { upstairs_id: Uuid::new_v4(), diff --git a/downstairs/src/region.rs b/downstairs/src/region.rs index 5c06a8e1f..66ec7b416 100644 --- a/downstairs/src/region.rs +++ b/downstairs/src/region.rs @@ -354,6 +354,9 @@ impl Region { fn open_extents(&mut self) -> Result<()> { let next_eid = self.extents.len() as u32; + std::fs::create_dir_all(&self.dir)?; + let recordsize = self.get_recordsize()?; + let eid_range = next_eid..self.def.extent_count(); for eid in eid_range.map(ExtentId) { let extent = Extent::open( @@ -361,6 +364,7 @@ impl Region { &self.def, eid, self.read_only, + recordsize, &self.log, )?; @@ -380,8 +384,13 @@ impl Region { let eid_range = next_eid..self.def.extent_count(); + // Get ZFS recordsize, which matters for certain extent formats + std::fs::create_dir_all(&self.dir)?; + let recordsize = self.get_recordsize()?; + for eid in eid_range.map(ExtentId) { - let extent = Extent::create(&self.dir, &self.def, eid, backend)?; + let extent = + Extent::create(&self.dir, &self.def, eid, backend, recordsize)?; self.extents.push(ExtentState::Opened(extent)); } self.check_extents(); @@ -389,6 +398,57 @@ impl Region { Ok(()) } + #[cfg(not(target_os = "illumos"))] + fn get_recordsize(&self) -> Result { + Ok(extent_inner_raw_v2::DUMMY_RECORDSIZE) + } + + /// Looks up the recordsize for the base path + #[cfg(target_os = "illumos")] + fn get_recordsize(&self) -> Result { + let recordsize = { + let p = std::process::Command::new("zfs") + .arg("get") + .arg("-Hp") // scripting mode + .arg("-ovalue") + .arg("recordsize") + .arg(&self.dir) + .output(); + match p { + Ok(p) => { + let err = std::str::from_utf8(&p.stderr).map_err(|e| { + CrucibleError::IoError(format!( + "zfs returned invalid UTF-8 string: {e}" + )) + })?; + if err.contains("not a ZFS filesystem") { + extent_inner_raw_v2::DUMMY_RECORDSIZE + } else { + let out = + std::str::from_utf8(&p.stdout).map_err(|e| { + CrucibleError::IoError(format!( + "zfs returned invalid UTF-8 string: {e}, \ + stderr: {err}" + )) + })?; + out.trim().parse::().map_err(|e| { + CrucibleError::IoError(format!( + "zfs returned non-integer for recordsize: \ + {out:?} ({e}), stderr: {err}" + )) + })? + } + } + Err(e) => { + return Err(CrucibleError::IoError(format!( + "could not call `zfs` executable: {e:?} {e}" + ))) + } + } + }; + Ok(recordsize) + } + /// Checks that all extents are open and have the correct `number` /// /// # Panics @@ -427,19 +487,37 @@ impl Region { } } + let recordsize = self.get_recordsize()?; + for eid in to_open { - self.reopen_extent(eid)?; + self.reopen_extent_with_recordsize(eid, recordsize)?; } Ok(()) } + /// Reopens an extent that was previously closed + /// + /// This function is expensive, because it calls `zfs` to get the current + /// recordsize for the dataset. If you are going to open many extents, it's + /// recommended to call [`Self::get_recordsize`] once, then use + /// [`reopen_extent_with_recordsize`](Self::reopen_extent_with_recordsize) + /// to open each extent. + pub fn reopen_extent( + &mut self, + eid: ExtentId, + ) -> Result<(), CrucibleError> { + let recordsize = self.get_recordsize()?; + self.reopen_extent_with_recordsize(eid, recordsize) + } + /** * Re open an extent that was previously closed */ - pub fn reopen_extent( + pub fn reopen_extent_with_recordsize( &mut self, eid: ExtentId, + recordsize: u64, ) -> Result<(), CrucibleError> { /* * Make sure the extent : @@ -452,8 +530,14 @@ impl Region { assert!(matches!(mg, ExtentState::Closed)); assert!(!self.read_only); - let new_extent = - Extent::open(&self.dir, &self.def, eid, self.read_only, &self.log)?; + let new_extent = Extent::open( + &self.dir, + &self.def, + eid, + self.read_only, + recordsize, + &self.log, + )?; if new_extent.dirty() { self.dirty_extents.insert(eid); @@ -696,6 +780,7 @@ impl Region { let current_dir = extent_dir(&self.dir, eid); sync_path(current_dir, &self.log)?; + Ok(()) } @@ -1162,6 +1247,7 @@ pub(crate) mod test { completed_dir, copy_dir, extent_path, remove_copy_cleanup_dir, DownstairsBlockContext, }; + use crate::extent_inner_raw_v2::DUMMY_RECORDSIZE; use super::*; @@ -2060,14 +2146,31 @@ pub(crate) mod test { let extent_data_size = (ddef.extent_size().value * ddef.block_size()) as usize; for i in (0..ddef.extent_count()).map(ExtentId) { + let path = extent_path(dir, i); + let data = std::fs::read(path).expect("Unable to read file"); + match backend { Backend::RawFile | Backend::SQLite => { - let path = extent_path(dir, i); - let data = - std::fs::read(path).expect("Unable to read file"); - out.extend(&data[..extent_data_size]); } + Backend::RawFileV2 => { + use extent_inner_raw_v2::BLOCK_CONTEXT_SIZE_BYTES; + let blocks_per_record = (DUMMY_RECORDSIZE + / (ddef.block_size() + BLOCK_CONTEXT_SIZE_BYTES)) + as usize; + println!("BLOCKS PER RECORD: {blocks_per_record}"); + for i in 0..ddef.extent_size().value as usize { + let record = i / blocks_per_record; + let block = i % blocks_per_record; + let start = record * DUMMY_RECORDSIZE as usize + + block + * (ddef.block_size() + BLOCK_CONTEXT_SIZE_BYTES) + as usize; + out.extend( + &data[start..][..ddef.block_size() as usize], + ); + } + } } } out @@ -2211,6 +2314,12 @@ pub(crate) mod test { } fn test_region_open_removes_partial_writes(backend: Backend) { + // The RawFileV2 backend cannot write contexts separately from block + // data, so there's no such thing as a partial write. + if backend == Backend::RawFileV2 { + return; + } + // Opening a dirty extent should fully rehash the extent to remove any // contexts that don't correlate with data on disk. This is necessary // for write_unwritten to work after a crash, and to move us into a @@ -3680,6 +3789,10 @@ pub(crate) mod test { use super::*; region_test_suite!(RawFile); } + mod raw_file_v2 { + use super::*; + region_test_suite!(RawFileV2); + } mod sqlite { use super::*; region_test_suite!(SQLite); diff --git a/integration_tests/src/lib.rs b/integration_tests/src/lib.rs index a88911da4..cc42642e5 100644 --- a/integration_tests/src/lib.rs +++ b/integration_tests/src/lib.rs @@ -187,7 +187,10 @@ mod test { impl TestDownstairsSet { /// Spin off three downstairs, with a 5120b region - pub async fn small(read_only: bool) -> Result { + pub async fn small( + read_only: bool, + backend: Backend, + ) -> Result { // 5 * 2 * 512 = 5120b let blocks_per_extent = 5; let extent_count = 2; @@ -196,7 +199,7 @@ mod test { blocks_per_extent, extent_count, false, - Backend::RawFile, + backend, ) .await } @@ -219,7 +222,10 @@ mod test { } /// Spin off three downstairs, with a 50 MB region - pub async fn big(read_only: bool) -> Result { + pub async fn big( + read_only: bool, + backend: Backend, + ) -> Result { // 512 * 188 * 512 = 49283072b ~= 50MB let blocks_per_extent = 512; let extent_count = 188; @@ -228,13 +234,13 @@ mod test { blocks_per_extent, extent_count, false, - Backend::RawFile, + backend, ) .await } /// Spin off three problematic downstairs, with a 10 MB region - pub async fn problem() -> Result { + pub async fn problem(backend: Backend) -> Result { // 512 * 40 * 512 = 10485760b = 10MB let blocks_per_extent = 512; let extent_count = 188; @@ -243,7 +249,7 @@ mod test { blocks_per_extent, extent_count, true, // problematic - Backend::RawFile, + backend, ) .await } @@ -393,12 +399,11 @@ mod test { } } - #[tokio::test] - async fn integration_test_region() -> Result<()> { + async fn integration_test_region(backend: Backend) -> Result<()> { // Test a simple single layer volume with a read, write, read const BLOCK_SIZE: usize = 512; - let tds = TestDownstairsSet::small(false).await?; + let tds = TestDownstairsSet::small(false, backend).await?; let opts = tds.opts(); let vcr = VolumeConstructionRequest::Volume { @@ -441,13 +446,12 @@ mod test { Ok(()) } - #[tokio::test] - async fn integration_test_region_huge_io() -> Result<()> { + async fn integration_test_region_huge_io(backend: Backend) -> Result<()> { // Test a simple single layer volume with a read, write, read, with IOs // that exceed our MDTS (and should be split automatically) const BLOCK_SIZE: usize = 512; - let tds = TestDownstairsSet::big(false).await?; + let tds = TestDownstairsSet::big(false, backend).await?; let opts = tds.opts(); let vcr = VolumeConstructionRequest::Volume { @@ -499,11 +503,10 @@ mod test { Ok(()) } - #[tokio::test] - async fn volume_zero_length_io() -> Result<()> { + async fn volume_zero_length_io(backend: Backend) -> Result<()> { const BLOCK_SIZE: usize = 512; - let tds = TestDownstairsSet::small(false).await?; + let tds = TestDownstairsSet::small(false, backend).await?; let opts = tds.opts(); let vcr = VolumeConstructionRequest::Volume { @@ -535,16 +538,16 @@ mod test { Ok(()) } - #[tokio::test] - async fn integration_test_two_layers() -> Result<()> { - let tds = TestDownstairsSet::small(false).await?; + async fn integration_test_two_layers(backend: Backend) -> Result<()> { + let tds = TestDownstairsSet::small(false, backend).await?; let opts = tds.opts(); integration_test_two_layers_common(tds, opts, false).await } - #[tokio::test] - async fn integration_test_two_layers_write_unwritten() -> Result<()> { - let tds = TestDownstairsSet::small(false).await?; + async fn integration_test_two_layers_write_unwritten( + backend: Backend, + ) -> Result<()> { + let tds = TestDownstairsSet::small(false, backend).await?; let opts = tds.opts(); integration_test_two_layers_common(tds, opts, true).await } @@ -629,11 +632,10 @@ mod test { Ok(()) } - #[tokio::test] - async fn integration_test_three_layers() -> Result<()> { + async fn integration_test_three_layers(backend: Backend) -> Result<()> { const BLOCK_SIZE: usize = 512; - let tds = TestDownstairsSet::small(false).await?; + let tds = TestDownstairsSet::small(false, backend).await?; let opts = tds.opts(); // Create in memory block io full of 11 @@ -711,11 +713,10 @@ mod test { Ok(()) } - #[tokio::test] - async fn integration_test_url() -> Result<()> { + async fn integration_test_url(backend: Backend) -> Result<()> { const BLOCK_SIZE: usize = 512; - let tds = TestDownstairsSet::small(false).await?; + let tds = TestDownstairsSet::small(false, backend).await?; let opts = tds.opts(); let server = Server::run(); @@ -782,12 +783,11 @@ mod test { Ok(()) } - #[tokio::test] - async fn integration_test_just_read() -> Result<()> { + async fn integration_test_just_read(backend: Backend) -> Result<()> { // Just do a read of a new volume. const BLOCK_SIZE: usize = 512; - let tds = TestDownstairsSet::small(true).await?; + let tds = TestDownstairsSet::small(true, backend).await?; let opts = tds.opts(); let vcr = VolumeConstructionRequest::Volume { @@ -817,8 +817,9 @@ mod test { Ok(()) } - #[tokio::test] - async fn integration_test_volume_write_unwritten_1() -> Result<()> { + async fn integration_test_volume_write_unwritten_1( + backend: Backend, + ) -> Result<()> { // Test a simple single layer volume, verify write_unwritten // works as expected. // Volume with a subvolume: @@ -833,7 +834,7 @@ mod test { // |AAAAAAAAAA| const BLOCK_SIZE: usize = 512; - let tds = TestDownstairsSet::small(false).await?; + let tds = TestDownstairsSet::small(false, backend).await?; let opts = tds.opts(); let vcr = VolumeConstructionRequest::Volume { @@ -884,8 +885,9 @@ mod test { Ok(()) } - #[tokio::test] - async fn integration_test_volume_write_unwritten_2() -> Result<()> { + async fn integration_test_volume_write_unwritten_2( + backend: Backend, + ) -> Result<()> { // Test a simple single layer volume, verify a first write_unwritten // won't be altered by a 2nd write_unwritten. // Volume with a subvolume: @@ -899,7 +901,7 @@ mod test { // Should result in: // |AAAAAAAAAA| const BLOCK_SIZE: usize = 512; - let tds = TestDownstairsSet::small(false).await?; + let tds = TestDownstairsSet::small(false, backend).await?; let opts = tds.opts(); let vcr = VolumeConstructionRequest::Volume { @@ -950,8 +952,9 @@ mod test { Ok(()) } - #[tokio::test] - async fn integration_test_volume_write_unwritten_sparse() -> Result<()> { + async fn integration_test_volume_write_unwritten_sparse( + backend: Backend, + ) -> Result<()> { // Test a simple single layer volume // Perform a smaller write, then a larger write_unwritten and // verify the smaller write is not over-written. @@ -967,7 +970,7 @@ mod test { // |ABBBBBBBBBB| const BLOCK_SIZE: usize = 512; - let tds = TestDownstairsSet::small(false).await?; + let tds = TestDownstairsSet::small(false, backend).await?; let opts = tds.opts(); let vcr = VolumeConstructionRequest::Volume { @@ -1019,8 +1022,9 @@ mod test { Ok(()) } - #[tokio::test] - async fn integration_test_volume_write_unwritten_subvols() -> Result<()> { + async fn integration_test_volume_write_unwritten_subvols( + backend: Backend, + ) -> Result<()> { // Test a single layer volume with two subvolumes, // verify a first write_unwritten that crosses the subvols // works as expected. @@ -1036,7 +1040,7 @@ mod test { const BLOCK_SIZE: usize = 512; let mut sv = Vec::new(); - let tds1 = TestDownstairsSet::small(false).await?; + let tds1 = TestDownstairsSet::small(false, backend).await?; let opts = tds1.opts(); sv.push(VolumeConstructionRequest::Region { block_size: BLOCK_SIZE as u64, @@ -1045,7 +1049,7 @@ mod test { opts, gen: 1, }); - let tds2 = TestDownstairsSet::small(false).await?; + let tds2 = TestDownstairsSet::small(false, backend).await?; let opts = tds2.opts(); sv.push(VolumeConstructionRequest::Region { block_size: BLOCK_SIZE as u64, @@ -1099,8 +1103,8 @@ mod test { Ok(()) } - #[tokio::test] async fn integration_test_volume_write_unwritten_subvols_sparse( + backend: Backend, ) -> Result<()> { // Test a single layer volume with two subvolumes, // verify a first write_unwritten that crosses the subvols @@ -1118,7 +1122,7 @@ mod test { const BLOCK_SIZE: usize = 512; let mut sv = Vec::new(); - let tds1 = TestDownstairsSet::small(false).await?; + let tds1 = TestDownstairsSet::small(false, backend).await?; let opts = tds1.opts(); sv.push(VolumeConstructionRequest::Region { block_size: BLOCK_SIZE as u64, @@ -1127,7 +1131,7 @@ mod test { opts, gen: 1, }); - let tds2 = TestDownstairsSet::small(false).await?; + let tds2 = TestDownstairsSet::small(false, backend).await?; let opts = tds2.opts(); sv.push(VolumeConstructionRequest::Region { block_size: BLOCK_SIZE as u64, @@ -1198,8 +1202,9 @@ mod test { Ok(()) } - #[tokio::test] - async fn integration_test_volume_write_unwritten_subvols_3() -> Result<()> { + async fn integration_test_volume_write_unwritten_subvols_3( + backend: Backend, + ) -> Result<()> { // Test a single layer volume with two subvolumes, // A first write_unwritten that crosses the subvols // A 2nd write unwritten. @@ -1219,7 +1224,7 @@ mod test { const BLOCK_SIZE: usize = 512; let mut sv = Vec::new(); - let tds1 = TestDownstairsSet::small(false).await?; + let tds1 = TestDownstairsSet::small(false, backend).await?; let opts = tds1.opts(); sv.push(VolumeConstructionRequest::Region { block_size: BLOCK_SIZE as u64, @@ -1228,7 +1233,7 @@ mod test { opts, gen: 1, }); - let tds2 = TestDownstairsSet::small(false).await?; + let tds2 = TestDownstairsSet::small(false, backend).await?; let opts = tds2.opts(); sv.push(VolumeConstructionRequest::Region { block_size: BLOCK_SIZE as u64, @@ -1294,17 +1299,18 @@ mod test { Ok(()) } - #[tokio::test] - async fn integration_test_two_layers_parent_smaller() -> Result<()> { - let tds = TestDownstairsSet::small(false).await?; + async fn integration_test_two_layers_parent_smaller( + backend: Backend, + ) -> Result<()> { + let tds = TestDownstairsSet::small(false, backend).await?; let opts = tds.opts(); integration_test_two_layers_small_common(tds, opts, false).await } - #[tokio::test] - async fn integration_test_two_layers_parent_smaller_unwritten() -> Result<()> - { - let tds = TestDownstairsSet::small(false).await?; + async fn integration_test_two_layers_parent_smaller_unwritten( + backend: Backend, + ) -> Result<()> { + let tds = TestDownstairsSet::small(false, backend).await?; let opts = tds.opts(); integration_test_two_layers_small_common(tds, opts, true).await } @@ -1381,8 +1387,7 @@ mod test { Ok(()) } - #[tokio::test] - async fn integration_test_scrub() -> Result<()> { + async fn integration_test_scrub(backend: Backend) -> Result<()> { // Volume with a subvolume and a RO parent: // SV: |----------| // RO: |1111111111| @@ -1396,7 +1401,7 @@ mod test { // |1111111111| const BLOCK_SIZE: usize = 512; - let tds = TestDownstairsSet::small(false).await?; + let tds = TestDownstairsSet::small(false, backend).await?; let opts = tds.opts(); // Create in_memory block_io @@ -1459,8 +1464,7 @@ mod test { Ok(()) } - #[tokio::test] - async fn integration_test_scrub_short() -> Result<()> { + async fn integration_test_scrub_short(backend: Backend) -> Result<()> { // Volume with a subvolume and a smaller RO parent: // SV: |----------| // RO: |11111| @@ -1475,7 +1479,7 @@ mod test { // |1111155555| const BLOCK_SIZE: usize = 512; - let tds = TestDownstairsSet::small(false).await?; + let tds = TestDownstairsSet::small(false, backend).await?; let opts = tds.opts(); // Create in_memory block_io @@ -1550,8 +1554,9 @@ mod test { Ok(()) } - #[tokio::test] - async fn integration_test_scrub_short_sparse() -> Result<()> { + async fn integration_test_scrub_short_sparse( + backend: Backend, + ) -> Result<()> { // Volume with a subvolume and a smaller RO parent: // SV: |----------| // RO: |11111| @@ -1570,7 +1575,7 @@ mod test { // |1121100300| const BLOCK_SIZE: usize = 512; - let tds = TestDownstairsSet::small(false).await?; + let tds = TestDownstairsSet::small(false, backend).await?; let opts = tds.opts(); // Create in_memory block_io @@ -1647,8 +1652,7 @@ mod test { Ok(()) } - #[tokio::test] - async fn integration_test_scrub_useless() -> Result<()> { + async fn integration_test_scrub_useless(backend: Backend) -> Result<()> { // Volume with a subvolume and a RO parent: // SV: |----------| // RO: |1111111111| @@ -1663,7 +1667,7 @@ mod test { // SV |5555555555| const BLOCK_SIZE: usize = 512; - let tds = TestDownstairsSet::small(false).await?; + let tds = TestDownstairsSet::small(false, backend).await?; let opts = tds.opts(); // Create in_memory block_io @@ -1724,9 +1728,9 @@ mod test { Ok(()) } - #[tokio::test] - async fn integration_test_volume_subvols_parent_scrub_sparse() -> Result<()> - { + async fn integration_test_volume_subvols_parent_scrub_sparse( + backend: Backend, + ) -> Result<()> { // Test a volume with two sub volumes, and RO parent // verify two writes, one that crosses the subvolume boundary // are persisted and data from the RO parent fills in @@ -1761,7 +1765,7 @@ mod test { .await?; let mut sv = Vec::new(); - let tds1 = TestDownstairsSet::small(false).await?; + let tds1 = TestDownstairsSet::small(false, backend).await?; let opts = tds1.opts(); sv.push(VolumeConstructionRequest::Region { block_size: BLOCK_SIZE as u64, @@ -1770,7 +1774,7 @@ mod test { opts, gen: 1, }); - let tds2 = TestDownstairsSet::small(false).await?; + let tds2 = TestDownstairsSet::small(false, backend).await?; let opts = tds2.opts(); sv.push(VolumeConstructionRequest::Region { block_size: BLOCK_SIZE as u64, @@ -1844,8 +1848,8 @@ mod test { Ok(()) } - #[tokio::test] async fn integration_test_volume_subvols_parent_scrub_sparse_2( + backend: Backend, ) -> Result<()> { // Test a volume with two sub volumes, and 3/4th RO parent // Write a few spots, one spanning the sub vols. @@ -1882,7 +1886,7 @@ mod test { .await?; let mut sv = Vec::new(); - let tds1 = TestDownstairsSet::small(false).await?; + let tds1 = TestDownstairsSet::small(false, backend).await?; let opts = tds1.opts(); sv.push(VolumeConstructionRequest::Region { block_size: BLOCK_SIZE as u64, @@ -1891,7 +1895,7 @@ mod test { opts, gen: 1, }); - let tds2 = TestDownstairsSet::small(false).await?; + let tds2 = TestDownstairsSet::small(false, backend).await?; let opts = tds2.opts(); sv.push(VolumeConstructionRequest::Region { block_size: BLOCK_SIZE as u64, @@ -1981,11 +1985,10 @@ mod test { } // Test that multiple upstairs can connect to a single read-only downstairs - #[tokio::test] - async fn integration_test_multi_read_only() -> Result<()> { + async fn integration_test_multi_read_only(backend: Backend) -> Result<()> { const BLOCK_SIZE: usize = 512; - let tds = TestDownstairsSet::small(true).await?; + let tds = TestDownstairsSet::small(true, backend).await?; let mut opts = tds.opts(); let vcr_1 = VolumeConstructionRequest::Volume { @@ -2042,8 +2045,7 @@ mod test { Ok(()) } - #[tokio::test] - async fn integration_test_scrub_no_rop() -> Result<()> { + async fn integration_test_scrub_no_rop(backend: Backend) -> Result<()> { // Volume with a subvolume and no RO parent:, verify the scrub // does no work // SV: |----------| @@ -2058,7 +2060,7 @@ mod test { // SV |55555-----| const BLOCK_SIZE: usize = 512; - let tds = TestDownstairsSet::small(false).await?; + let tds = TestDownstairsSet::small(false, backend).await?; let opts = tds.opts(); let mut volume = Volume::new(BLOCK_SIZE as u64, csl()); @@ -2108,8 +2110,9 @@ mod test { Ok(()) } - #[tokio::test] - async fn integration_test_snapshot_backed_vol() -> Result<()> { + async fn integration_test_snapshot_backed_vol( + backend: Backend, + ) -> Result<()> { // Test using a "snapshot" (for this test, downstairs booted read-only) // as a read-only parent. @@ -2117,7 +2120,8 @@ mod test { // boot three downstairs, write some data to them, then change to // read-only. - let mut test_downstairs_set = TestDownstairsSet::small(false).await?; + let mut test_downstairs_set = + TestDownstairsSet::small(false, backend).await?; let mut volume = Volume::new(BLOCK_SIZE as u64, csl()); volume @@ -2191,7 +2195,7 @@ mod test { // create a new volume, layering a new set of downstairs on top of the // read-only one we just (re)booted - let top_layer_tds = TestDownstairsSet::small(false).await?; + let top_layer_tds = TestDownstairsSet::small(false, backend).await?; let top_layer_opts = top_layer_tds.opts(); let bottom_layer_opts = test_downstairs_set.opts(); @@ -2262,8 +2266,9 @@ mod test { Ok(()) } - #[tokio::test] - async fn integration_test_sqlite_backed_vol() -> Result<()> { + async fn integration_test_sqlite_backed_vol( + backend: Backend, + ) -> Result<()> { // Test using an old SQLite backend as a read-only parent. const BLOCK_SIZE: usize = 512; @@ -2361,17 +2366,19 @@ mod test { // create a new volume, layering a new set of downstairs on top of the // read-only one we just (re)booted - let top_layer_tds = TestDownstairsSet::small(false).await?; + let top_layer_tds = TestDownstairsSet::small(false, backend).await?; let top_layer_opts = top_layer_tds.opts(); let bottom_layer_opts = test_downstairs_set.opts(); // The new volume is **not** using the SQLite backend! - assert!(!top_layer_tds - .downstairs1 - .tempdir - .path() - .join("00/000/000.db") - .exists()); + if !matches!(backend, Backend::SQLite) { + assert!(!top_layer_tds + .downstairs1 + .tempdir + .path() + .join("00/000/000.db") + .exists()); + } let vcr = VolumeConstructionRequest::Volume { id: Uuid::new_v4(), @@ -2530,10 +2537,9 @@ mod test { Ok(()) } - #[tokio::test] - async fn integration_test_clone_raw() -> Result<()> { + async fn integration_test_clone_raw(backend: Backend) -> Result<()> { // Test downstairs region clone. - // Create three downstairs with raw backend, write some data to them. + // Create three downstairs with some backend, write some data to them. // Restart them all read only. // Create a new downstairs. // Clone a read only downstairs to the new downstairs @@ -2546,7 +2552,8 @@ mod test { // boot three downstairs, write some data to them, then change to // read-only. - let mut test_downstairs_set = TestDownstairsSet::small(false).await?; + let mut test_downstairs_set = + TestDownstairsSet::small(false, backend).await?; let mut volume = Volume::new(BLOCK_SIZE as u64, csl()); volume @@ -3012,13 +3019,15 @@ mod test { Ok(()) } - #[tokio::test] - async fn integration_test_volume_replace_downstairs() -> Result<()> { + async fn integration_test_volume_replace_downstairs( + backend: Backend, + ) -> Result<()> { // Replace a downstairs with a new one const BLOCK_SIZE: usize = 512; // boot three downstairs, write some data to them - let test_downstairs_set = TestDownstairsSet::small(false).await?; + let test_downstairs_set = + TestDownstairsSet::small(false, backend).await?; let mut volume = Volume::new(BLOCK_SIZE as u64, csl()); volume @@ -3099,16 +3108,17 @@ mod test { Ok(()) } - #[tokio::test] - async fn integration_test_volume_clone_replace_ro_downstairs() -> Result<()> - { + async fn integration_test_volume_clone_replace_ro_downstairs( + backend: Backend, + ) -> Result<()> { // Replace a read only downstairs with a new one, that we cloned // from the original ro downstairs. const BLOCK_SIZE: usize = 512; // boot three downstairs, write some data to them, then change to // read-only. - let mut test_downstairs_set = TestDownstairsSet::small(false).await?; + let mut test_downstairs_set = + TestDownstairsSet::small(false, backend).await?; let mut volume = Volume::new(BLOCK_SIZE as u64, csl()); volume @@ -3230,13 +3240,15 @@ mod test { Ok(()) } - #[tokio::test] - async fn integration_test_volume_replace_bad_downstairs() -> Result<()> { + async fn integration_test_volume_replace_bad_downstairs( + backend: Backend, + ) -> Result<()> { // Attempt to replace a downstairs that we don't have. const BLOCK_SIZE: usize = 512; // Create three downstairs. - let test_downstairs_set = TestDownstairsSet::small(false).await?; + let test_downstairs_set = + TestDownstairsSet::small(false, backend).await?; let mut volume = Volume::new(BLOCK_SIZE as u64, csl()); volume @@ -3273,14 +3285,15 @@ mod test { Ok(()) } - #[tokio::test] - async fn integration_test_volume_inactive_replace_downstairs() -> Result<()> - { + async fn integration_test_volume_inactive_replace_downstairs( + backend: Backend, + ) -> Result<()> { // Replace a downstairs before the volume is active. const BLOCK_SIZE: usize = 512; // Create three downstairs. - let test_downstairs_set = TestDownstairsSet::small(false).await?; + let test_downstairs_set = + TestDownstairsSet::small(false, backend).await?; let mut volume = Volume::new(BLOCK_SIZE as u64, csl()); volume @@ -3313,13 +3326,15 @@ mod test { Ok(()) } - #[tokio::test] - async fn integration_test_volume_twice_replace_downstairs() -> Result<()> { + async fn integration_test_volume_twice_replace_downstairs( + backend: Backend, + ) -> Result<()> { // Replace a downstairs, then replace it again. const BLOCK_SIZE: usize = 512; // Create three downstairs. - let test_downstairs_set = TestDownstairsSet::small(false).await?; + let test_downstairs_set = + TestDownstairsSet::small(false, backend).await?; let mut volume = Volume::new(BLOCK_SIZE as u64, csl()); volume @@ -3365,15 +3380,17 @@ mod test { Ok(()) } - #[tokio::test] - async fn integration_test_volume_replace_active() -> Result<()> { + async fn integration_test_volume_replace_active( + backend: Backend, + ) -> Result<()> { // Attempt to replace a downstairs with one of our other // active downstairs. This should return error as its is not // a legal replacement. const BLOCK_SIZE: usize = 512; // Create three downstairs. - let test_downstairs_set = TestDownstairsSet::small(false).await?; + let test_downstairs_set = + TestDownstairsSet::small(false, backend).await?; let mut volume = Volume::new(BLOCK_SIZE as u64, csl()); volume @@ -3416,9 +3433,14 @@ mod test { Ok(()) } - #[tokio::test] async fn integration_test_volume_replace_downstairs_then_takeover( + backend: Backend, ) -> Result<()> { + // This test is only valid for backends that will undergo replacement + if backend == Backend::SQLite { + return Ok(()); + } + let log = csl(); // Replace a downstairs with a new one which will kick off // LiveRepair. Then spin up a new Upstairs with a newer @@ -3427,7 +3449,8 @@ mod test { const BLOCK_SIZE: usize = 512; // boot three downstairs, write some data to them - let test_downstairs_set = TestDownstairsSet::big(false).await?; + let test_downstairs_set = + TestDownstairsSet::big(false, backend).await?; let mut volume = Volume::new(BLOCK_SIZE as u64, csl()); volume @@ -3535,13 +3558,14 @@ mod test { Ok(()) } - #[tokio::test] - async fn integration_test_problematic_downstairs() -> Result<()> { + async fn integration_test_problematic_downstairs( + backend: Backend, + ) -> Result<()> { // Make sure problematic downstairs don't cause problems Upstairs. const BLOCK_SIZE: usize = 512; // Create three problematic downstairs. - let test_downstairs_set = TestDownstairsSet::problem().await?; + let test_downstairs_set = TestDownstairsSet::problem(backend).await?; let mut volume = Volume::new(BLOCK_SIZE as u64, csl()); volume @@ -3606,14 +3630,13 @@ mod test { // ZZZ Make a test of guest.activate_with_gen both fail and pass. // Maybe in a different place? We need downstairs to do this. - #[tokio::test] - async fn integration_test_guest_downstairs() -> Result<()> { + async fn integration_test_guest_downstairs(backend: Backend) -> Result<()> { // Test using the guest layer to verify a new region is // what we expect, and a write and read work as expected const BLOCK_SIZE: usize = 512; // Spin off three downstairs, build our Crucible struct. - let tds = TestDownstairsSet::small(false).await?; + let tds = TestDownstairsSet::small(false, backend).await?; let opts = tds.opts(); let (guest, io) = Guest::new(None); @@ -3647,10 +3670,9 @@ mod test { } /// Drop the guest right away and confirm that the worker thread stops - #[tokio::test] - async fn integration_test_guest_drop_early() -> Result<()> { + async fn integration_test_guest_drop_early(backend: Backend) -> Result<()> { // Spin off three downstairs, build our Crucible struct. - let tds = TestDownstairsSet::small(false).await?; + let tds = TestDownstairsSet::small(false, backend).await?; let opts = tds.opts(); let (guest, io) = Guest::new(None); @@ -3666,11 +3688,11 @@ mod test { /// Same as `integration_test_guest_downstairs`, but dropping the Guest at /// the end and confirming that the worker thread stops - #[tokio::test] - async fn integration_test_guest_drop() -> Result<()> { + async fn integration_test_guest_drop(backend: Backend) -> Result<()> { const BLOCK_SIZE: usize = 512; + // Spin off three downstairs, build our Crucible struct. - let tds = TestDownstairsSet::small(false).await?; + let tds = TestDownstairsSet::small(false, backend).await?; let opts = tds.opts(); let (guest, io) = Guest::new(None); @@ -3706,13 +3728,14 @@ mod test { Ok(()) } - #[tokio::test] - async fn integration_test_guest_zero_length_io() -> Result<()> { + async fn integration_test_guest_zero_length_io( + backend: Backend, + ) -> Result<()> { // Test the guest layer with a write and read of zero length const BLOCK_SIZE: usize = 512; // Spin off three downstairs, build our Crucible struct. - let tds = TestDownstairsSet::small(false).await?; + let tds = TestDownstairsSet::small(false, backend).await?; let opts = tds.opts(); let (guest, io) = Guest::new(None); @@ -3732,14 +3755,15 @@ mod test { Ok(()) } - #[tokio::test] - async fn integration_test_guest_replace_downstairs() -> Result<()> { + async fn integration_test_guest_replace_downstairs( + backend: Backend, + ) -> Result<()> { // Test using the guest layer to verify we can replace // a downstairs const BLOCK_SIZE: usize = 512; // Spin off three downstairs, build our Crucible struct. - let tds = TestDownstairsSet::small(false).await?; + let tds = TestDownstairsSet::small(false, backend).await?; let opts = tds.opts(); let log = csl(); @@ -3806,14 +3830,15 @@ mod test { Ok(()) } - #[tokio::test] - async fn integration_test_guest_replace_ds_before_active() -> Result<()> { + async fn integration_test_guest_replace_ds_before_active( + backend: Backend, + ) -> Result<()> { // Test using the guest layer to verify we can replace a downstairs // before the upstairs is active. const BLOCK_SIZE: usize = 512; // Spin off three downstairs, build our Crucible struct. - let tds = TestDownstairsSet::small(false).await?; + let tds = TestDownstairsSet::small(false, backend).await?; let opts = tds.opts(); let log = csl(); @@ -3854,12 +3879,13 @@ mod test { Ok(()) } - #[tokio::test] - async fn integration_test_upstairs_read_only_rejects_write() -> Result<()> { + async fn integration_test_upstairs_read_only_rejects_write( + backend: Backend, + ) -> Result<()> { const BLOCK_SIZE: usize = 512; // Spin up three read-only downstairs - let tds = TestDownstairsSet::small(true).await?; + let tds = TestDownstairsSet::small(true, backend).await?; let opts = tds.opts(); let (guest, io) = Guest::new(None); @@ -3885,13 +3911,14 @@ mod test { Ok(()) } - #[tokio::test] - async fn integration_test_guest_replace_many_downstairs() -> Result<()> { + async fn integration_test_guest_replace_many_downstairs( + backend: Backend, + ) -> Result<()> { // Test using the guest layer to verify we can replace one // downstairs, but not another while the replace is active. // Spin off three downstairs, build our Crucible struct. - let tds = TestDownstairsSet::small(false).await?; + let tds = TestDownstairsSet::small(false, backend).await?; let opts = tds.opts(); let log = csl(); @@ -3930,15 +3957,16 @@ mod test { Ok(()) } - #[tokio::test] - async fn integration_test_guest_downstairs_unwritten() -> Result<()> { + async fn integration_test_guest_downstairs_unwritten( + backend: Backend, + ) -> Result<()> { // Test using the guest layer to verify a new region is // what we expect, and a write_unwritten and read work as expected // The size here spans two extents. const BLOCK_SIZE: usize = 512; // Spin off three downstairs, build our Crucible struct. - let tds = TestDownstairsSet::small(false).await?; + let tds = TestDownstairsSet::small(false, backend).await?; let opts = tds.opts(); let (guest, io) = Guest::new(None); @@ -3995,16 +4023,16 @@ mod test { Ok(()) } - #[tokio::test] - async fn integration_test_guest_downstairs_unwritten_sparse_1() -> Result<()> - { + async fn integration_test_guest_downstairs_unwritten_sparse_1( + backend: Backend, + ) -> Result<()> { // Test using the guest layer to verify a new region is // what we expect, and a write_unwritten and read work as expected, // this time with sparse writes const BLOCK_SIZE: usize = 512; // Spin off three downstairs, build our Crucible struct. - let tds = TestDownstairsSet::small(false).await?; + let tds = TestDownstairsSet::small(false, backend).await?; let opts = tds.opts(); let (guest, io) = Guest::new(None); @@ -4048,8 +4076,8 @@ mod test { Ok(()) } - #[tokio::test] async fn integration_test_guest_downstairs_unwritten_sparse_mid( + backend: Backend, ) -> Result<()> { // Test using the guest layer to verify a new region is // what we expect, and a write_unwritten and read work as expected, @@ -4057,7 +4085,7 @@ mod test { const BLOCK_SIZE: usize = 512; // Spin off three downstairs, build our Crucible struct. - let tds = TestDownstairsSet::small(false).await?; + let tds = TestDownstairsSet::small(false, backend).await?; let opts = tds.opts(); let (guest, io) = Guest::new(None); @@ -4105,15 +4133,15 @@ mod test { Ok(()) } - #[tokio::test] async fn integration_test_guest_downstairs_unwritten_sparse_end( + backend: Backend, ) -> Result<()> { // Test write_unwritten and read work as expected, // this time with sparse writes where the last block is written const BLOCK_SIZE: usize = 512; // Spin off three downstairs, build our Crucible struct. - let tds = TestDownstairsSet::small(false).await?; + let tds = TestDownstairsSet::small(false, backend).await?; let opts = tds.opts(); let (guest, io) = Guest::new(None); @@ -4157,14 +4185,15 @@ mod test { Ok(()) } - #[tokio::test] - async fn integration_test_guest_downstairs_unwritten_span() -> Result<()> { + async fn integration_test_guest_downstairs_unwritten_span( + backend: Backend, + ) -> Result<()> { // Test write_unwritten and read work as expected, // Have the IO span an extent boundary. const BLOCK_SIZE: usize = 512; // Spin off three downstairs, build our Crucible struct. - let tds = TestDownstairsSet::small(false).await?; + let tds = TestDownstairsSet::small(false, backend).await?; let opts = tds.opts(); let (guest, io) = Guest::new(None); @@ -4207,15 +4236,15 @@ mod test { Ok(()) } - #[tokio::test] - async fn integration_test_guest_downstairs_unwritten_span_2() -> Result<()> - { + async fn integration_test_guest_downstairs_unwritten_span_2( + backend: Backend, + ) -> Result<()> { // Test write_unwritten and read work as expected, // Have the IO span an extent boundary. const BLOCK_SIZE: usize = 512; // Spin off three downstairs, build our Crucible struct. - let tds = TestDownstairsSet::small(false).await?; + let tds = TestDownstairsSet::small(false, backend).await?; let opts = tds.opts(); let (guest, io) = Guest::new(None); @@ -4258,13 +4287,12 @@ mod test { Ok(()) } - #[tokio::test] - async fn integration_test_io_out_of_range() { + async fn integration_test_io_out_of_range(backend: Backend) -> Result<()> { // Test reads and writes outside the valid region return error. const BLOCK_SIZE: usize = 512; // Spin off three downstairs, build our Crucible struct. - let tds = TestDownstairsSet::small(false).await.unwrap(); + let tds = TestDownstairsSet::small(false, backend).await.unwrap(); let opts = tds.opts(); let (guest, io) = Guest::new(None); @@ -4288,16 +4316,18 @@ mod test { let res = guest.read(BlockIndex(11), &mut buffer).await; assert!(res.is_err()); + Ok(()) } - #[tokio::test] - async fn integration_test_io_span_out_of_range() { + async fn integration_test_io_span_out_of_range( + backend: Backend, + ) -> Result<()> { // Test reads and writes that start inside and extend // past the end of the region will return error. const BLOCK_SIZE: usize = 512; // Spin off three downstairs, build our Crucible struct. - let tds = TestDownstairsSet::small(false).await.unwrap(); + let tds = TestDownstairsSet::small(false, backend).await.unwrap(); let opts = tds.opts(); let (guest, io) = Guest::new(None); @@ -4321,6 +4351,7 @@ mod test { let res = guest.read(BlockIndex(10), &mut buffer).await; assert!(res.is_err()); + Ok(()) } // The following tests are for the Pantry @@ -4385,12 +4416,11 @@ mod test { (pantry, volume_id, client) } - #[tokio::test] - async fn test_pantry_import_from_url_ovmf() { + async fn test_pantry_import_from_url_ovmf(backend: Backend) -> Result<()> { const BLOCK_SIZE: usize = 512; // Spin off three downstairs, build our Crucible struct. - let tds = TestDownstairsSet::big(false).await.unwrap(); + let tds = TestDownstairsSet::big(false, backend).await.unwrap(); let opts = tds.opts(); // Start a pantry, and get the client for it @@ -4477,12 +4507,14 @@ mod test { assert_eq!(&bytes[..][start..end], &buffer[start..end]); eprintln!("{} {} ok", start, end); } + Ok(()) } - #[tokio::test] - async fn test_pantry_import_from_url_ovmf_bad_digest() { + async fn test_pantry_import_from_url_ovmf_bad_digest( + backend: Backend, + ) -> Result<()> { // Spin off three downstairs, build our Crucible struct. - let tds = TestDownstairsSet::big(false).await.unwrap(); + let tds = TestDownstairsSet::big(false, backend).await.unwrap(); // Start a pantry, and get the client for it let (_pantry, volume_id, client) = @@ -4522,10 +4554,12 @@ mod test { assert!(!result.job_result_ok); client.detach(&volume_id.to_string()).await.unwrap(); + Ok(()) } - #[tokio::test] - async fn test_pantry_import_from_local_server() { + async fn test_pantry_import_from_local_server( + backend: Backend, + ) -> Result<()> { const BLOCK_SIZE: usize = 512; let log = csl(); @@ -4546,7 +4580,7 @@ mod test { // Spin off three downstairs, build our Crucible struct. - let tds = TestDownstairsSet::small(false).await.unwrap(); + let tds = TestDownstairsSet::small(false, backend).await.unwrap(); let opts = tds.opts(); let volume_id = Uuid::new_v4(); @@ -4622,12 +4656,12 @@ mod test { volume.read(BlockIndex(0), &mut buffer).await.unwrap(); assert_eq!(vec![0x55; 5120], &buffer[..]); + Ok(()) } - #[tokio::test] - async fn test_pantry_snapshot() { + async fn test_pantry_snapshot(backend: Backend) -> Result<()> { // Spin off three downstairs, build our Crucible struct. - let tds = TestDownstairsSet::small(false).await.unwrap(); + let tds = TestDownstairsSet::small(false, backend).await.unwrap(); // Start a pantry, get the client for it, then use it to snapshot let (_pantry, volume_id, client) = @@ -4644,15 +4678,15 @@ mod test { .unwrap(); client.detach(&volume_id.to_string()).await.unwrap(); + Ok(()) } - #[tokio::test] - async fn test_pantry_bulk_write() { + async fn test_pantry_bulk_write(backend: Backend) -> Result<()> { const BLOCK_SIZE: usize = 512; // Spin off three downstairs, build our Crucible struct. - let tds = TestDownstairsSet::small(false).await.unwrap(); + let tds = TestDownstairsSet::small(false, backend).await.unwrap(); let opts = tds.opts(); // Start a pantry, get the client for it, then use it to bulk_write in data @@ -4702,15 +4736,17 @@ mod test { let end = (i + 1) * 512; assert_eq!(vec![i as u8; 512], buffer_data[start..end]); } + Ok(()) } - #[tokio::test] - async fn test_pantry_bulk_write_max_chunk_size() { + async fn test_pantry_bulk_write_max_chunk_size( + backend: Backend, + ) -> Result<()> { const BLOCK_SIZE: usize = 512; // Spin off three downstairs, build our Crucible struct. - let tds = TestDownstairsSet::big(false).await.unwrap(); + let tds = TestDownstairsSet::big(false, backend).await.unwrap(); let opts = tds.opts(); // Start a pantry, get the client for it, then use it to bulk_write in data @@ -4758,14 +4794,16 @@ mod test { volume.read(BlockIndex(0), &mut buffer).await.unwrap(); assert_eq!(vec![0x99; PantryEntry::MAX_CHUNK_SIZE], &buffer[..]); + Ok(()) } /// Assert that the Pantry will fail for non-block sized writes - #[tokio::test] - async fn test_pantry_fail_bulk_write_one_byte() { + async fn test_pantry_fail_bulk_write_one_byte( + backend: Backend, + ) -> Result<()> { // Spin off three downstairs, build our Crucible struct. - let tds = TestDownstairsSet::small(false).await.unwrap(); + let tds = TestDownstairsSet::small(false, backend).await.unwrap(); // Start a pantry, get the client for it, then use it to bulk_write in data let (_pantry, volume_id, client) = @@ -4790,14 +4828,16 @@ mod test { assert_eq!(e.status(), reqwest::StatusCode::BAD_REQUEST); client.detach(&volume_id.to_string()).await.unwrap(); + Ok(()) } /// Assert that the Pantry will fail for non-block sized reads - #[tokio::test] - async fn test_pantry_fail_bulk_read_one_byte() { + async fn test_pantry_fail_bulk_read_one_byte( + backend: Backend, + ) -> Result<()> { // Spin off three downstairs, build our Crucible struct. - let tds = TestDownstairsSet::small(false).await.unwrap(); + let tds = TestDownstairsSet::small(false, backend).await.unwrap(); // Start a pantry, get the client for it, then use it to bulk_read data let (_pantry, volume_id, client) = @@ -4821,10 +4861,10 @@ mod test { assert_eq!(e.status(), reqwest::StatusCode::BAD_REQUEST); client.detach(&volume_id.to_string()).await.unwrap(); + Ok(()) } - #[tokio::test] - async fn test_pantry_scrub() { + async fn test_pantry_scrub(backend: Backend) -> Result<()> { // Test scrubbing the OVMF image from a URL // XXX httptest::Server does not support range requests, otherwise that // should be used here instead. @@ -4855,7 +4895,7 @@ mod test { // Spin off three downstairs, build our Crucible struct (with a // read-only parent pointing to the random data above) - let tds = TestDownstairsSet::big(false).await.unwrap(); + let tds = TestDownstairsSet::big(false, backend).await.unwrap(); let opts = tds.opts(); let volume_id = Uuid::new_v4(); @@ -4980,13 +5020,13 @@ mod test { volume.read(BlockIndex(0), &mut buffer).await.unwrap(); assert_eq!(data, &buffer[..]); + Ok(()) } - #[tokio::test] - async fn test_pantry_bulk_read() { + async fn test_pantry_bulk_read(backend: Backend) -> Result<()> { // Spin off three downstairs, build our Crucible struct. - let tds = TestDownstairsSet::small(false).await.unwrap(); + let tds = TestDownstairsSet::small(false, backend).await.unwrap(); // Start a pantry, get the client for it, then use it to bulk_write then // bulk_read in data @@ -5062,13 +5102,15 @@ mod test { ); client.detach(&volume_id.to_string()).await.unwrap(); + Ok(()) } - #[tokio::test] - async fn test_pantry_bulk_read_max_chunk_size() { + async fn test_pantry_bulk_read_max_chunk_size( + backend: Backend, + ) -> Result<()> { // Spin off three downstairs, build our Crucible struct. - let tds = TestDownstairsSet::big(false).await.unwrap(); + let tds = TestDownstairsSet::big(false, backend).await.unwrap(); // Start a pantry, get the client for it, then use it to bulk_write in // data @@ -5115,15 +5157,15 @@ mod test { ); client.detach(&volume_id.to_string()).await.unwrap(); + Ok(()) } - #[tokio::test] - async fn test_pantry_validate() { + async fn test_pantry_validate(backend: Backend) -> Result<()> { const BLOCK_SIZE: usize = 512; // Spin off three downstairs, build our Crucible struct. - let tds = TestDownstairsSet::small(false).await.unwrap(); + let tds = TestDownstairsSet::small(false, backend).await.unwrap(); // Start a pantry, get the client for it, then use it to bulk_write in // data @@ -5244,16 +5286,16 @@ mod test { assert!(response.job_result_ok); client.detach(&volume_id.to_string()).await.unwrap(); + Ok(()) } // Test validating a subset of the beginning of the volume - #[tokio::test] - async fn test_pantry_validate_subset() { + async fn test_pantry_validate_subset(backend: Backend) -> Result<()> { const BLOCK_SIZE: usize = 512; // Spin off three downstairs, build our Crucible struct. - let tds = TestDownstairsSet::small(false).await.unwrap(); + let tds = TestDownstairsSet::small(false, backend).await.unwrap(); // Start a pantry, get the client for it, then use it to bulk_write in // data @@ -5375,14 +5417,14 @@ mod test { assert!(response.job_result_ok); client.detach(&volume_id.to_string()).await.unwrap(); + Ok(()) } // Test validating a non-block size amount fails - #[tokio::test] - async fn test_pantry_validate_fail() { + async fn test_pantry_validate_fail(backend: Backend) -> Result<()> { // Spin off three downstairs, build our Crucible struct. - let tds = TestDownstairsSet::small(false).await.unwrap(); + let tds = TestDownstairsSet::small(false, backend).await.unwrap(); // Start a pantry, get the client for it let (_pantry, volume_id, client) = @@ -5417,10 +5459,10 @@ mod test { assert!(!response.job_result_ok); client.detach(&volume_id.to_string()).await.unwrap(); + Ok(()) } - #[tokio::test] - async fn test_volume_replace_vcr() { + async fn test_volume_replace_vcr(backend: Backend) -> Result<()> { // Test of a replacement of a downstairs given two // VolumeConstructionRequests. // We create a volume, write some data to it, then replace a downstairs @@ -5431,7 +5473,7 @@ mod test { info!(log, "test_volume_replace of a volume"); // Make three downstairs - let tds = TestDownstairsSet::small(false).await.unwrap(); + let tds = TestDownstairsSet::small(false, backend).await.unwrap(); let opts = tds.opts(); let volume_id = Uuid::new_v4(); @@ -5499,17 +5541,17 @@ mod test { volume.read(BlockIndex(0), &mut buffer).await.unwrap(); assert_eq!(vec![0x55_u8; BLOCK_SIZE * 10], &buffer[..]); + Ok(()) } - #[tokio::test] - async fn test_volume_replace_rop_in_vcr() { + async fn test_volume_replace_rop_in_vcr(backend: Backend) -> Result<()> { // Verify that we can do replacement of a read_only_parent target // for a volume. const BLOCK_SIZE: usize = 512; let log = csl(); // Make three downstairs - let tds = TestDownstairsSet::small(false).await.unwrap(); + let tds = TestDownstairsSet::small(false, backend).await.unwrap(); let opts = tds.opts(); let volume_id = Uuid::new_v4(); @@ -5552,7 +5594,7 @@ mod test { // Make a new volume, and make the original volume the read only // parent. // Make three new downstairs for the new volume. - let sv_tds = TestDownstairsSet::small(false).await.unwrap(); + let sv_tds = TestDownstairsSet::small(false, backend).await.unwrap(); let sv_opts = sv_tds.opts(); let sv_volume_id = Uuid::new_v4(); @@ -5620,16 +5662,18 @@ mod test { volume.read(BlockIndex(0), &mut buffer).await.unwrap(); assert_eq!(vec![0x55_u8; BLOCK_SIZE * 10], &buffer[..]); + Ok(()) } /// Getting a volume's status should work even if something else took over - #[tokio::test] - async fn test_pantry_get_status_after_activation() { + async fn test_pantry_get_status_after_activation( + backend: Backend, + ) -> Result<()> { const BLOCK_SIZE: usize = 512; // Spin off three downstairs, build our Crucible struct. - let tds = TestDownstairsSet::big(false).await.unwrap(); + let tds = TestDownstairsSet::big(false, backend).await.unwrap(); let opts = tds.opts(); // Start a pantry, get the client for it, then use it to bulk_write @@ -5683,5 +5727,109 @@ mod test { num_job_handles: 0, } )); + Ok(()) + } + + /// Macro defining the full integration test suite + /// + /// Functions in the test suite should take a `b: Backend` parameter and + /// return a `Result<()>`, which is unwrapped by the generated test function + /// + /// Add new functions here to ensure that they're tested for every backend! + macro_rules! integration_test_suite { + ($b:ident) => { + integration_test_suite!( + $b, + integration_test_region, + integration_test_region_huge_io, + volume_zero_length_io, + integration_test_two_layers, + integration_test_two_layers_write_unwritten, + integration_test_three_layers, + integration_test_url, + integration_test_just_read, + integration_test_volume_write_unwritten_1, + integration_test_volume_write_unwritten_2, + integration_test_volume_write_unwritten_sparse, + integration_test_volume_write_unwritten_subvols, + integration_test_volume_write_unwritten_subvols_sparse, + integration_test_volume_write_unwritten_subvols_3, + integration_test_two_layers_parent_smaller, + integration_test_two_layers_parent_smaller_unwritten, + integration_test_scrub, + integration_test_scrub_short, + integration_test_scrub_short_sparse, + integration_test_scrub_useless, + integration_test_volume_subvols_parent_scrub_sparse, + integration_test_volume_subvols_parent_scrub_sparse_2, + integration_test_multi_read_only, + integration_test_scrub_no_rop, + integration_test_snapshot_backed_vol, + test_pantry_get_status_after_activation, + integration_test_clone_raw, + integration_test_volume_clone_replace_ro_downstairs, + integration_test_volume_replace_bad_downstairs, + integration_test_volume_twice_replace_downstairs, + integration_test_volume_replace_active, + integration_test_volume_replace_downstairs, + integration_test_volume_inactive_replace_downstairs, + integration_test_volume_replace_downstairs_then_takeover, + integration_test_problematic_downstairs, + integration_test_sqlite_backed_vol, + integration_test_guest_replace_many_downstairs, + integration_test_guest_downstairs_unwritten, + integration_test_guest_drop, + integration_test_guest_zero_length_io, + integration_test_guest_replace_downstairs, + integration_test_guest_replace_ds_before_active, + integration_test_upstairs_read_only_rejects_write, + integration_test_guest_downstairs_unwritten_sparse_1, + integration_test_guest_downstairs_unwritten_sparse_mid, + integration_test_guest_downstairs_unwritten_sparse_end, + integration_test_guest_downstairs_unwritten_span, + integration_test_guest_downstairs_unwritten_span_2, + integration_test_io_out_of_range, + integration_test_io_span_out_of_range, + test_pantry_import_from_url_ovmf, + test_pantry_import_from_url_ovmf_bad_digest, + test_pantry_import_from_local_server, + test_pantry_snapshot, + test_pantry_bulk_write, + test_pantry_bulk_write_max_chunk_size, + test_pantry_fail_bulk_write_one_byte, + test_pantry_fail_bulk_read_one_byte, + test_pantry_scrub, + test_pantry_bulk_read, + test_pantry_bulk_read_max_chunk_size, + test_pantry_validate, + test_pantry_validate_subset, + test_pantry_validate_fail, + test_volume_replace_vcr, + test_volume_replace_rop_in_vcr, + integration_test_guest_drop_early, + integration_test_guest_downstairs, + ); + }; + + ($b:ident, $($fs:ident),+ $(,)?) => { + $( + #[tokio::test] + async fn $fs() { + super::$fs(Backend::$b).await.unwrap() + } + )+}; + } + + mod raw_file { + use super::*; + integration_test_suite!(RawFile); + } + mod raw_file_v2 { + use super::*; + integration_test_suite!(RawFileV2); + } + mod sqlite { + use super::*; + integration_test_suite!(SQLite); } } diff --git a/protocol/src/lib.rs b/protocol/src/lib.rs index d493a6318..b3c596ade 100644 --- a/protocol/src/lib.rs +++ b/protocol/src/lib.rs @@ -580,6 +580,15 @@ pub enum ReadBlockContext { Unencrypted { hash: u64 }, } +impl From for ReadBlockContext { + fn from(ctx: BlockContext) -> ReadBlockContext { + match ctx.encryption_context { + Some(ctx) => ReadBlockContext::Encrypted { ctx }, + None => ReadBlockContext::Unencrypted { hash: ctx.hash }, + } + } +} + #[derive(Debug, PartialEq, Clone, Serialize, Deserialize)] pub struct ReadResponseHeader { pub upstairs_id: Uuid,