From cfdfd8e1ecaa8cea14cc84ed93eb153ff175ebdb Mon Sep 17 00:00:00 2001 From: HeartLinked Date: Mon, 5 Aug 2024 14:34:32 +0800 Subject: [PATCH] [week1] 1.1 task1 --- mini-lsm-starter/src/mem_table.rs | 18 +- mini-lsm-starter/src/tests.rs | 3 + mini-lsm-starter/src/tests/harness.rs | 442 +++++++++++++++++++++++ mini-lsm-starter/src/tests/week1_day1.rs | 148 ++++++++ 4 files changed, 608 insertions(+), 3 deletions(-) create mode 100644 mini-lsm-starter/src/tests/harness.rs create mode 100644 mini-lsm-starter/src/tests/week1_day1.rs diff --git a/mini-lsm-starter/src/mem_table.rs b/mini-lsm-starter/src/mem_table.rs index dc4fa5a..2bba9f9 100644 --- a/mini-lsm-starter/src/mem_table.rs +++ b/mini-lsm-starter/src/mem_table.rs @@ -38,7 +38,12 @@ pub(crate) fn map_bound(bound: Bound<&[u8]>) -> Bound { impl MemTable { /// Create a new mem-table. pub fn create(_id: usize) -> Self { - unimplemented!() + Self { + map: Arc::new(SkipMap::new()), + wal: None, // 假设 Wal 在初始化时为 None + id: _id, + approximate_size: Arc::new(AtomicUsize::new(0)), + } } /// Create a new mem-table with WAL @@ -69,7 +74,11 @@ impl MemTable { /// Get a value by key. pub fn get(&self, _key: &[u8]) -> Option { - unimplemented!() + let get_entry = (*self.map).get(_key); + match get_entry { + Some(entry) => Some(entry.value().clone()), + None => None, + } } /// Put a key-value pair into the mem-table. @@ -78,7 +87,10 @@ impl MemTable { /// In week 2, day 6, also flush the data to WAL. /// In week 3, day 5, modify the function to use the batch API. pub fn put(&self, _key: &[u8], _value: &[u8]) -> Result<()> { - unimplemented!() + let key = Bytes::copy_from_slice(_key); + let value = Bytes::copy_from_slice(_value); + self.map.insert(key, value); + Ok(()) } /// Implement this in week 3, day 5. diff --git a/mini-lsm-starter/src/tests.rs b/mini-lsm-starter/src/tests.rs index 688adfa..283b258 100644 --- a/mini-lsm-starter/src/tests.rs +++ b/mini-lsm-starter/src/tests.rs @@ -1,2 +1,5 @@ //! DO NOT MODIFY -- Mini-LSM tests modules //! This file will be automatically rewritten by the copy-test command. + +mod harness; +mod week1_day1; diff --git a/mini-lsm-starter/src/tests/harness.rs b/mini-lsm-starter/src/tests/harness.rs new file mode 100644 index 0000000..b41745b --- /dev/null +++ b/mini-lsm-starter/src/tests/harness.rs @@ -0,0 +1,442 @@ +use std::{ + collections::BTreeMap, ops::Bound, os::unix::fs::MetadataExt, path::Path, sync::Arc, + time::Duration, +}; + +use anyhow::{bail, Result}; +use bytes::Bytes; + +use crate::{ + compact::{ + CompactionOptions, LeveledCompactionOptions, SimpleLeveledCompactionOptions, + TieredCompactionOptions, + }, + iterators::{merge_iterator::MergeIterator, StorageIterator}, + key::{KeySlice, TS_ENABLED}, + lsm_storage::{BlockCache, LsmStorageInner, LsmStorageState, MiniLsm}, + table::{SsTable, SsTableBuilder, SsTableIterator}, +}; + +#[derive(Clone)] +pub struct MockIterator { + pub data: Vec<(Bytes, Bytes)>, + pub error_when: Option, + pub index: usize, +} + +impl MockIterator { + pub fn new(data: Vec<(Bytes, Bytes)>) -> Self { + Self { + data, + index: 0, + error_when: None, + } + } + + pub fn new_with_error(data: Vec<(Bytes, Bytes)>, error_when: usize) -> Self { + Self { + data, + index: 0, + error_when: Some(error_when), + } + } +} + +impl StorageIterator for MockIterator { + type KeyType<'a> = KeySlice<'a>; + + fn next(&mut self) -> Result<()> { + if self.index < self.data.len() { + self.index += 1; + } + if let Some(error_when) = self.error_when { + if self.index == error_when { + bail!("fake error!"); + } + } + Ok(()) + } + + fn key(&self) -> KeySlice { + if let Some(error_when) = self.error_when { + if self.index >= error_when { + panic!("invalid access after next returns an error!"); + } + } + KeySlice::for_testing_from_slice_no_ts(self.data[self.index].0.as_ref()) + } + + fn value(&self) -> &[u8] { + if let Some(error_when) = self.error_when { + if self.index >= error_when { + panic!("invalid access after next returns an error!"); + } + } + self.data[self.index].1.as_ref() + } + + fn is_valid(&self) -> bool { + if let Some(error_when) = self.error_when { + if self.index >= error_when { + panic!("invalid access after next returns an error!"); + } + } + self.index < self.data.len() + } +} + +pub fn as_bytes(x: &[u8]) -> Bytes { + Bytes::copy_from_slice(x) +} + +pub fn check_iter_result_by_key(iter: &mut I, expected: Vec<(Bytes, Bytes)>) +where + I: for<'a> StorageIterator = KeySlice<'a>>, +{ + for (k, v) in expected { + assert!(iter.is_valid()); + assert_eq!( + k, + iter.key().for_testing_key_ref(), + "expected key: {:?}, actual key: {:?}", + k, + as_bytes(iter.key().for_testing_key_ref()), + ); + assert_eq!( + v, + iter.value(), + "expected value: {:?}, actual value: {:?}", + v, + as_bytes(iter.value()), + ); + iter.next().unwrap(); + } + assert!(!iter.is_valid()); +} + +#[allow(dead_code)] +pub fn check_iter_result_by_key_and_ts(iter: &mut I, expected: Vec<((Bytes, u64), Bytes)>) +where + I: for<'a> StorageIterator = KeySlice<'a>>, +{ + for ((k, ts), v) in expected { + assert!(iter.is_valid()); + assert_eq!( + (&k[..], ts), + ( + iter.key().for_testing_key_ref(), + iter.key().for_testing_ts() + ), + "expected key: {:?}@{}, actual key: {:?}@{}", + k, + ts, + as_bytes(iter.key().for_testing_key_ref()), + iter.key().for_testing_ts(), + ); + assert_eq!( + v, + iter.value(), + "expected value: {:?}, actual value: {:?}", + v, + as_bytes(iter.value()), + ); + iter.next().unwrap(); + } + assert!(!iter.is_valid()); +} + +pub fn check_lsm_iter_result_by_key(iter: &mut I, expected: Vec<(Bytes, Bytes)>) +where + I: for<'a> StorageIterator = &'a [u8]>, +{ + for (k, v) in expected { + assert!(iter.is_valid()); + assert_eq!( + k, + iter.key(), + "expected key: {:?}, actual key: {:?}", + k, + as_bytes(iter.key()), + ); + assert_eq!( + v, + iter.value(), + "expected value: {:?}, actual value: {:?}", + v, + as_bytes(iter.value()), + ); + iter.next().unwrap(); + } + assert!(!iter.is_valid()); +} + +pub fn expect_iter_error(mut iter: impl StorageIterator) { + loop { + match iter.next() { + Ok(_) if iter.is_valid() => continue, + Ok(_) => panic!("expect an error"), + Err(_) => break, + } + } +} + +pub fn generate_sst( + id: usize, + path: impl AsRef, + data: Vec<(Bytes, Bytes)>, + block_cache: Option>, +) -> SsTable { + let mut builder = SsTableBuilder::new(128); + for (key, value) in data { + builder.add(KeySlice::for_testing_from_slice_no_ts(&key[..]), &value[..]); + } + builder.build(id, block_cache, path.as_ref()).unwrap() +} + +#[allow(dead_code)] +pub fn generate_sst_with_ts( + id: usize, + path: impl AsRef, + data: Vec<((Bytes, u64), Bytes)>, + block_cache: Option>, +) -> SsTable { + let mut builder = SsTableBuilder::new(128); + for ((key, ts), value) in data { + builder.add( + KeySlice::for_testing_from_slice_with_ts(&key[..], ts), + &value[..], + ); + } + builder.build(id, block_cache, path.as_ref()).unwrap() +} + +pub fn sync(storage: &LsmStorageInner) { + storage + .force_freeze_memtable(&storage.state_lock.lock()) + .unwrap(); + storage.force_flush_next_imm_memtable().unwrap(); +} + +pub fn compaction_bench(storage: Arc) { + let mut key_map = BTreeMap::::new(); + let gen_key = |i| format!("{:010}", i); // 10B + let gen_value = |i| format!("{:0110}", i); // 110B + let mut max_key = 0; + let overlaps = if TS_ENABLED { 10000 } else { 20000 }; + for iter in 0..10 { + let range_begin = iter * 5000; + for i in range_begin..(range_begin + overlaps) { + // 120B per key, 4MB data populated + let key: String = gen_key(i); + let version = key_map.get(&i).copied().unwrap_or_default() + 1; + let value = gen_value(version); + key_map.insert(i, version); + storage.put(key.as_bytes(), value.as_bytes()).unwrap(); + max_key = max_key.max(i); + } + } + + std::thread::sleep(Duration::from_secs(1)); // wait until all memtables flush + while { + let snapshot = storage.inner.state.read(); + !snapshot.imm_memtables.is_empty() + } { + storage.inner.force_flush_next_imm_memtable().unwrap(); + } + + let mut prev_snapshot = storage.inner.state.read().clone(); + while { + std::thread::sleep(Duration::from_secs(1)); + let snapshot = storage.inner.state.read().clone(); + let to_cont = prev_snapshot.levels != snapshot.levels + || prev_snapshot.l0_sstables != snapshot.l0_sstables; + prev_snapshot = snapshot; + to_cont + } { + println!("waiting for compaction to converge"); + } + + let mut expected_key_value_pairs = Vec::new(); + for i in 0..(max_key + 40000) { + let key = gen_key(i); + let value = storage.get(key.as_bytes()).unwrap(); + if let Some(val) = key_map.get(&i) { + let expected_value = gen_value(*val); + assert_eq!(value, Some(Bytes::from(expected_value.clone()))); + expected_key_value_pairs.push((Bytes::from(key), Bytes::from(expected_value))); + } else { + assert!(value.is_none()); + } + } + + check_lsm_iter_result_by_key( + &mut storage.scan(Bound::Unbounded, Bound::Unbounded).unwrap(), + expected_key_value_pairs, + ); + + storage.dump_structure(); + + println!("This test case does not guarantee your compaction algorithm produces a LSM state as expected. It only does minimal checks on the size of the levels. Please use the compaction simulator to check if the compaction is correctly going on."); +} + +pub fn check_compaction_ratio(storage: Arc) { + let state = storage.inner.state.read().clone(); + let compaction_options = storage.inner.options.compaction_options.clone(); + let mut level_size = Vec::new(); + let l0_sst_num = state.l0_sstables.len(); + for (_, files) in &state.levels { + let size = match &compaction_options { + CompactionOptions::Leveled(_) => files + .iter() + .map(|x| state.sstables.get(x).as_ref().unwrap().table_size()) + .sum::(), + CompactionOptions::Simple(_) | CompactionOptions::Tiered(_) => files.len() as u64, + _ => unreachable!(), + }; + level_size.push(size); + } + let extra_iterators = if TS_ENABLED { + 1 /* txn local iterator for OCC */ + } else { + 0 + }; + let num_iters = storage + .scan(Bound::Unbounded, Bound::Unbounded) + .unwrap() + .num_active_iterators(); + let num_memtables = storage.inner.state.read().imm_memtables.len() + 1; + match compaction_options { + CompactionOptions::NoCompaction => unreachable!(), + CompactionOptions::Simple(SimpleLeveledCompactionOptions { + size_ratio_percent, + level0_file_num_compaction_trigger, + max_levels, + }) => { + assert!(l0_sst_num < level0_file_num_compaction_trigger); + assert!(level_size.len() <= max_levels); + for idx in 1..level_size.len() { + let prev_size = level_size[idx - 1]; + let this_size = level_size[idx]; + if prev_size == 0 && this_size == 0 { + continue; + } + assert!( + this_size as f64 / prev_size as f64 >= size_ratio_percent as f64 / 100.0, + "L{}/L{}, {}/{}<{}%", + state.levels[idx - 1].0, + state.levels[idx].0, + this_size, + prev_size, + size_ratio_percent + ); + } + assert!( + num_iters <= l0_sst_num + num_memtables + max_levels + extra_iterators, + "we found {num_iters} iterators in your implementation, (l0_sst_num={l0_sst_num}, num_memtables={num_memtables}, max_levels={max_levels}) did you use concat iterators?" + ); + } + CompactionOptions::Leveled(LeveledCompactionOptions { + level_size_multiplier, + level0_file_num_compaction_trigger, + max_levels, + .. + }) => { + assert!(l0_sst_num < level0_file_num_compaction_trigger); + assert!(level_size.len() <= max_levels); + let last_level_size = *level_size.last().unwrap(); + let mut multiplier = 1.0; + for idx in (1..level_size.len()).rev() { + multiplier *= level_size_multiplier as f64; + let this_size = level_size[idx - 1]; + assert!( + // do not add hard requirement on level size multiplier considering bloom filters... + this_size as f64 / last_level_size as f64 <= 1.0 / multiplier + 0.5, + "L{}/L_max, {}/{}>>1.0/{}", + state.levels[idx - 1].0, + this_size, + last_level_size, + multiplier + ); + } + assert!( + num_iters <= l0_sst_num + num_memtables + max_levels + extra_iterators, + "we found {num_iters} iterators in your implementation, (l0_sst_num={l0_sst_num}, num_memtables={num_memtables}, max_levels={max_levels}) did you use concat iterators?" + ); + } + CompactionOptions::Tiered(TieredCompactionOptions { + num_tiers, + max_size_amplification_percent, + size_ratio, + min_merge_width, + }) => { + let size_ratio_trigger = (100.0 + size_ratio as f64) / 100.0; + assert_eq!(l0_sst_num, 0); + assert!(level_size.len() <= num_tiers); + let mut sum_size = level_size[0]; + for idx in 1..level_size.len() { + let this_size = level_size[idx]; + if level_size.len() > min_merge_width { + assert!( + sum_size as f64 / this_size as f64 <= size_ratio_trigger, + "violation of size ratio: sum(⬆️L{})/L{}, {}/{}>{}", + state.levels[idx - 1].0, + state.levels[idx].0, + sum_size, + this_size, + size_ratio_trigger + ); + } + if idx + 1 == level_size.len() { + assert!( + sum_size as f64 / this_size as f64 + <= max_size_amplification_percent as f64 / 100.0, + "violation of space amp: sum(⬆️L{})/L{}, {}/{}>{}%", + state.levels[idx - 1].0, + state.levels[idx].0, + sum_size, + this_size, + max_size_amplification_percent + ); + } + sum_size += this_size; + } + assert!( + num_iters <= num_memtables + num_tiers + extra_iterators, + "we found {num_iters} iterators in your implementation, (num_memtables={num_memtables}, num_tiers={num_tiers}) did you use concat iterators?" + ); + } + } +} + +pub fn dump_files_in_dir(path: impl AsRef) { + println!("--- DIR DUMP ---"); + for f in path.as_ref().read_dir().unwrap() { + let f = f.unwrap(); + print!("{}", f.path().display()); + println!( + ", size={:.3}KB", + f.metadata().unwrap().size() as f64 / 1024.0 + ); + } +} + +pub fn construct_merge_iterator_over_storage( + state: &LsmStorageState, +) -> MergeIterator { + let mut iters = Vec::new(); + for t in &state.l0_sstables { + iters.push(Box::new( + SsTableIterator::create_and_seek_to_first(state.sstables.get(t).cloned().unwrap()) + .unwrap(), + )); + } + for (_, files) in &state.levels { + for f in files { + iters.push(Box::new( + SsTableIterator::create_and_seek_to_first(state.sstables.get(f).cloned().unwrap()) + .unwrap(), + )); + } + } + MergeIterator::create(iters) +} diff --git a/mini-lsm-starter/src/tests/week1_day1.rs b/mini-lsm-starter/src/tests/week1_day1.rs new file mode 100644 index 0000000..c024c5e --- /dev/null +++ b/mini-lsm-starter/src/tests/week1_day1.rs @@ -0,0 +1,148 @@ +use std::sync::Arc; + +use tempfile::tempdir; + +use crate::{ + lsm_storage::{LsmStorageInner, LsmStorageOptions}, + mem_table::MemTable, +}; + +#[test] +fn test_task1_memtable_get() { + let memtable = MemTable::create(0); + memtable.for_testing_put_slice(b"key1", b"value1").unwrap(); + memtable.for_testing_put_slice(b"key2", b"value2").unwrap(); + memtable.for_testing_put_slice(b"key3", b"value3").unwrap(); + assert_eq!( + &memtable.for_testing_get_slice(b"key1").unwrap()[..], + b"value1" + ); + assert_eq!( + &memtable.for_testing_get_slice(b"key2").unwrap()[..], + b"value2" + ); + assert_eq!( + &memtable.for_testing_get_slice(b"key3").unwrap()[..], + b"value3" + ); +} + +#[test] +fn test_task1_memtable_overwrite() { + let memtable = MemTable::create(0); + memtable.for_testing_put_slice(b"key1", b"value1").unwrap(); + memtable.for_testing_put_slice(b"key2", b"value2").unwrap(); + memtable.for_testing_put_slice(b"key3", b"value3").unwrap(); + memtable.for_testing_put_slice(b"key1", b"value11").unwrap(); + memtable.for_testing_put_slice(b"key2", b"value22").unwrap(); + memtable.for_testing_put_slice(b"key3", b"value33").unwrap(); + assert_eq!( + &memtable.for_testing_get_slice(b"key1").unwrap()[..], + b"value11" + ); + assert_eq!( + &memtable.for_testing_get_slice(b"key2").unwrap()[..], + b"value22" + ); + assert_eq!( + &memtable.for_testing_get_slice(b"key3").unwrap()[..], + b"value33" + ); +} + +#[test] +fn test_task2_storage_integration() { + let dir = tempdir().unwrap(); + let storage = Arc::new( + LsmStorageInner::open(dir.path(), LsmStorageOptions::default_for_week1_test()).unwrap(), + ); + assert_eq!(&storage.get(b"0").unwrap(), &None); + storage.put(b"1", b"233").unwrap(); + storage.put(b"2", b"2333").unwrap(); + storage.put(b"3", b"23333").unwrap(); + assert_eq!(&storage.get(b"1").unwrap().unwrap()[..], b"233"); + assert_eq!(&storage.get(b"2").unwrap().unwrap()[..], b"2333"); + assert_eq!(&storage.get(b"3").unwrap().unwrap()[..], b"23333"); + storage.delete(b"2").unwrap(); + assert!(storage.get(b"2").unwrap().is_none()); + storage.delete(b"0").unwrap(); // should NOT report any error +} + +#[test] +fn test_task3_storage_integration() { + let dir = tempdir().unwrap(); + let storage = Arc::new( + LsmStorageInner::open(dir.path(), LsmStorageOptions::default_for_week1_test()).unwrap(), + ); + storage.put(b"1", b"233").unwrap(); + storage.put(b"2", b"2333").unwrap(); + storage.put(b"3", b"23333").unwrap(); + storage + .force_freeze_memtable(&storage.state_lock.lock()) + .unwrap(); + assert_eq!(storage.state.read().imm_memtables.len(), 1); + let previous_approximate_size = storage.state.read().imm_memtables[0].approximate_size(); + assert!(previous_approximate_size >= 15); + storage.put(b"1", b"2333").unwrap(); + storage.put(b"2", b"23333").unwrap(); + storage.put(b"3", b"233333").unwrap(); + storage + .force_freeze_memtable(&storage.state_lock.lock()) + .unwrap(); + assert_eq!(storage.state.read().imm_memtables.len(), 2); + assert!( + storage.state.read().imm_memtables[1].approximate_size() == previous_approximate_size, + "wrong order of memtables?" + ); + assert!(storage.state.read().imm_memtables[0].approximate_size() > previous_approximate_size); +} + +#[test] +fn test_task3_freeze_on_capacity() { + let dir = tempdir().unwrap(); + let mut options = LsmStorageOptions::default_for_week1_test(); + options.target_sst_size = 1024; + options.num_memtable_limit = 1000; + let storage = Arc::new(LsmStorageInner::open(dir.path(), options).unwrap()); + for _ in 0..1000 { + storage.put(b"1", b"2333").unwrap(); + } + let num_imm_memtables = storage.state.read().imm_memtables.len(); + assert!(num_imm_memtables >= 1, "no memtable frozen?"); + for _ in 0..1000 { + storage.delete(b"1").unwrap(); + } + assert!( + storage.state.read().imm_memtables.len() > num_imm_memtables, + "no more memtable frozen?" + ); +} + +#[test] +fn test_task4_storage_integration() { + let dir = tempdir().unwrap(); + let storage = Arc::new( + LsmStorageInner::open(dir.path(), LsmStorageOptions::default_for_week1_test()).unwrap(), + ); + assert_eq!(&storage.get(b"0").unwrap(), &None); + storage.put(b"1", b"233").unwrap(); + storage.put(b"2", b"2333").unwrap(); + storage.put(b"3", b"23333").unwrap(); + storage + .force_freeze_memtable(&storage.state_lock.lock()) + .unwrap(); + storage.delete(b"1").unwrap(); + storage.delete(b"2").unwrap(); + storage.put(b"3", b"2333").unwrap(); + storage.put(b"4", b"23333").unwrap(); + storage + .force_freeze_memtable(&storage.state_lock.lock()) + .unwrap(); + storage.put(b"1", b"233333").unwrap(); + storage.put(b"3", b"233333").unwrap(); + assert_eq!(storage.state.read().imm_memtables.len(), 2); + assert_eq!(&storage.get(b"1").unwrap().unwrap()[..], b"233333"); + assert_eq!(&storage.get(b"2").unwrap(), &None); + assert_eq!(&storage.get(b"3").unwrap().unwrap()[..], b"233333"); + assert_eq!(&storage.get(b"4").unwrap().unwrap()[..], b"23333"); +}