diff --git a/mini-lsm-starter/src/iterators/merge_iterator.rs b/mini-lsm-starter/src/iterators/merge_iterator.rs index 16d0051..481ab6d 100644 --- a/mini-lsm-starter/src/iterators/merge_iterator.rs +++ b/mini-lsm-starter/src/iterators/merge_iterator.rs @@ -1,6 +1,3 @@ -#![allow(unused_variables)] // TODO(you): remove this lint after implementing this mod -#![allow(dead_code)] // TODO(you): remove this lint after implementing this mod - use std::cmp::{self}; use std::collections::binary_heap::PeekMut; use std::collections::BinaryHeap; diff --git a/mini-lsm-starter/src/iterators/two_merge_iterator.rs b/mini-lsm-starter/src/iterators/two_merge_iterator.rs index 8b02241..b9ac199 100644 --- a/mini-lsm-starter/src/iterators/two_merge_iterator.rs +++ b/mini-lsm-starter/src/iterators/two_merge_iterator.rs @@ -66,7 +66,7 @@ impl< self.a.next() } else if self.a.key() == self.b.key() { self.a.next()?; - self.b.next() // 如果 key 相等,则两个迭代器都要增加 + self.b.next() // 如果 key 相等,则两个迭代器都要增加 } else { self.b.next() } diff --git a/mini-lsm-starter/src/lsm_iterator.rs b/mini-lsm-starter/src/lsm_iterator.rs index 1eea6c1..019cc53 100644 --- a/mini-lsm-starter/src/lsm_iterator.rs +++ b/mini-lsm-starter/src/lsm_iterator.rs @@ -1,28 +1,52 @@ use anyhow::{anyhow, Result}; +use bytes::Bytes; +use std::ops::Bound; +use crate::iterators::two_merge_iterator::TwoMergeIterator; +use crate::table::SsTableIterator; use crate::{ iterators::{merge_iterator::MergeIterator, StorageIterator}, mem_table::MemTableIterator, }; /// Represents the internal type for an LSM iterator. This type will be changed across the tutorial for multiple times. -type LsmIteratorInner = MergeIterator; +type LsmIteratorInner = + TwoMergeIterator, MergeIterator>; pub struct LsmIterator { - inner: LsmIteratorInner, // MergeIterator + inner: LsmIteratorInner, + end: Bound, } impl LsmIterator { - pub(crate) fn new(iter: LsmIteratorInner) -> Result { - let mut res = Self { inner: iter }; - res.skip_delete_key().unwrap(); - Ok(res) + pub(crate) fn new(inner: LsmIteratorInner, end: Bound<&[u8]>) -> Result { + let end = match end { + Bound::Included(x) | Bound::Excluded(x) => Bound::Included(Bytes::copy_from_slice(x)), + Bound::Unbounded => Bound::Unbounded, + }; + + let mut iter = Self { inner, end }; + iter.skip_delete_key()?; + Ok(iter) } - pub fn skip_delete_key(&mut self) -> Result<()> { + fn is_valid(&self) -> bool { + if !self.inner.is_valid() { + return false; + }; + + match &self.end { + Bound::Included(x) => self.inner.key().raw_ref() <= x && !self.value().is_empty(), + Bound::Excluded(x) => self.inner.key().raw_ref() < x && !self.value().is_empty(), + Bound::Unbounded => true, + } + } + + fn skip_delete_key(&mut self) -> Result<()> { while self.is_valid() && !self.inner.key().is_empty() && self.inner.value().is_empty() { self.inner.next()?; } + Ok(()) } } @@ -30,16 +54,16 @@ impl LsmIterator { impl StorageIterator for LsmIterator { type KeyType<'a> = &'a [u8]; - fn value(&self) -> &[u8] { - self.inner.value() + fn is_valid(&self) -> bool { + self.is_valid() } fn key(&self) -> &[u8] { self.inner.key().raw_ref() } - fn is_valid(&self) -> bool { - return self.inner.is_valid(); + fn value(&self) -> &[u8] { + self.inner.value() } fn next(&mut self) -> Result<()> { @@ -53,7 +77,7 @@ impl StorageIterator for LsmIterator { /// invalid. If an iterator is already invalid, `next` does not do anything. If `next` returns an error, /// `is_valid` should return false, and `next` should always return an error. pub struct FusedIterator { - iter: I, // 一个迭代器,可以自身错误(无法获得 key),也可以自身仍有效,但无法执行 next,这是两种不同的情况 + iter: I, has_errored: bool, } @@ -69,30 +93,25 @@ impl FusedIterator { impl StorageIterator for FusedIterator { type KeyType<'a> = I::KeyType<'a> where Self: 'a; - fn value(&self) -> &[u8] { - self.iter.value() + fn is_valid(&self) -> bool { + !self.has_errored && self.iter.is_valid() } fn key(&self) -> Self::KeyType<'_> { self.iter.key() } - fn is_valid(&self) -> bool { - // 这里二者的顺序不能调换,如果在迭代器包含错误的情况下调用is_valid,可能会直接 panic - (!self.has_errored) && self.iter.is_valid() + fn value(&self) -> &[u8] { + self.iter.value() } fn next(&mut self) -> Result<()> { if self.has_errored { - // 有错误一定不能next - return Err(anyhow!("The iterator is invalid!")); - } - if let Ok(res) = self.iter.next() { - self.has_errored = false; - Ok(res) + Err(anyhow!("Invalid iterator")) } else { - self.has_errored = true; - Err(anyhow!("The iterator is invalid after next operation!")) + let res = self.iter.next(); + self.has_errored = res.is_err(); + res } } } diff --git a/mini-lsm-starter/src/lsm_storage.rs b/mini-lsm-starter/src/lsm_storage.rs index 6757753..0187c99 100644 --- a/mini-lsm-starter/src/lsm_storage.rs +++ b/mini-lsm-starter/src/lsm_storage.rs @@ -14,11 +14,14 @@ use crate::compact::{ SimpleLeveledCompactionController, SimpleLeveledCompactionOptions, TieredCompactionController, }; use crate::iterators::merge_iterator::MergeIterator; +use crate::iterators::two_merge_iterator::TwoMergeIterator; +use crate::iterators::StorageIterator; +use crate::key::KeySlice; use crate::lsm_iterator::{FusedIterator, LsmIterator}; use crate::manifest::Manifest; use crate::mem_table::MemTable; use crate::mvcc::LsmMvccInner; -use crate::table::SsTable; +use crate::table::{SsTable, SsTableIterator}; pub type BlockCache = moka::sync::Cache<(usize, usize), Arc>; @@ -423,20 +426,80 @@ impl LsmStorageInner { _lower: Bound<&[u8]>, _upper: Bound<&[u8]>, ) -> Result> { - let state = self.state.read(); - // memtable - let memtable = state.as_ref().memtable.as_ref(); - let memtable_iter = memtable.scan(_lower, _upper); - let mut iters = vec![Box::new(memtable_iter)]; - // imm_memtable - for imm_memtable in &state.imm_memtables { - let imm_memtable = Arc::as_ref(imm_memtable); - let imm_memtable_iter = imm_memtable.scan(_lower, _upper); - iters.push(Box::new(imm_memtable_iter)); + // let state = self.state.read(); + // // memtable + // let memtable = state.as_ref().memtable.as_ref(); + // + // let memtable_iter = memtable.scan(_lower, _upper); + // let mut iters = vec![Box::new(memtable_iter)]; + // // imm_memtable + // for imm_memtable in &state.imm_memtables { + // let imm_memtable = Arc::as_ref(imm_memtable); + // let imm_memtable_iter = imm_memtable.scan(_lower, _upper); + // iters.push(Box::new(imm_memtable_iter)); + // } + // // 用 vec 创建 + // let memtable_iterator = MergeIterator::create(iters); + // // SST + // let l0_sst = &state.as_ref().l0_sstables; + // let mut sst_iters = vec![]; + // for sst in l0_sst { + // let sst_ptr = state.sstables.get(sst).unwrap(); + // let sst_iter = SsTableIterator::create_and_seek_to_first(sst_ptr.clone())?; + // sst_iters.push(Box::new(sst_iter)); + // } + // let sst_iterator = MergeIterator::create(sst_iters); + // let lsm_iterator_inner = TwoMergeIterator::create(memtable_iterator, sst_iterator)?; + // let iter = LsmIterator::new(lsm_iterator_inner, _upper)?; + // Ok(FusedIterator::new(iter)) + let state = { + let guard = self.state.read(); + Arc::clone(&guard) + }; + let mut mem_iters = vec![]; + mem_iters.push(Box::new(state.memtable.scan(_lower, _upper))); + mem_iters.append( + &mut state + .imm_memtables + .clone() + .into_iter() + .map(|m| Box::new(m.scan(_lower, _upper))) + .collect(), + ); + + let mut sst_iters = vec![]; + let key_slice = match _lower { + Bound::Included(x) => KeySlice::from_slice(x), + Bound::Excluded(x) => KeySlice::from_slice(x), + Bound::Unbounded => KeySlice::default(), + }; + for idx in &state.l0_sstables { + let sst_table = match state.sstables.get(idx) { + Some(sst_table) => sst_table.clone(), + None => continue, + }; + let iter = { + let mut iter = SsTableIterator::create_and_seek_to_key(sst_table, key_slice)?; + if let Bound::Excluded(x) = _lower { + if x == key_slice.raw_ref() { + iter.next()?; + } + } + if !iter.is_valid() { + continue; + } + iter + }; + sst_iters.push(Box::new(iter)); } - // 用 vec 创建 - let lsm_iterator_inner = MergeIterator::create(iters); - let iter = LsmIterator::new(lsm_iterator_inner)?; + + let iter = LsmIterator::new( + TwoMergeIterator::create( + MergeIterator::create(mem_iters), + MergeIterator::create(sst_iters), + )?, + _upper, + )?; Ok(FusedIterator::new(iter)) } }