Skip to content

Commit

Permalink
[week1] 1.5 task2 is to fixed
Browse files Browse the repository at this point in the history
  • Loading branch information
HeartLinked committed Oct 3, 2024
1 parent 200c148 commit c8a9d3c
Show file tree
Hide file tree
Showing 4 changed files with 122 additions and 43 deletions.
3 changes: 0 additions & 3 deletions mini-lsm-starter/src/iterators/merge_iterator.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,3 @@
#![allow(unused_variables)] // TODO(you): remove this lint after implementing this mod
#![allow(dead_code)] // TODO(you): remove this lint after implementing this mod

use std::cmp::{self};
use std::collections::binary_heap::PeekMut;
use std::collections::BinaryHeap;
Expand Down
2 changes: 1 addition & 1 deletion mini-lsm-starter/src/iterators/two_merge_iterator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ impl<
self.a.next()
} else if self.a.key() == self.b.key() {
self.a.next()?;
self.b.next() // 如果 key 相等,则两个迭代器都要增加
self.b.next() // 如果 key 相等,则两个迭代器都要增加
} else {
self.b.next()
}
Expand Down
69 changes: 44 additions & 25 deletions mini-lsm-starter/src/lsm_iterator.rs
Original file line number Diff line number Diff line change
@@ -1,45 +1,69 @@
use anyhow::{anyhow, Result};
use bytes::Bytes;
use std::ops::Bound;

use crate::iterators::two_merge_iterator::TwoMergeIterator;
use crate::table::SsTableIterator;
use crate::{
iterators::{merge_iterator::MergeIterator, StorageIterator},
mem_table::MemTableIterator,
};

/// Represents the internal type for an LSM iterator. This type will be changed across the tutorial for multiple times.
type LsmIteratorInner = MergeIterator<MemTableIterator>;
type LsmIteratorInner =
TwoMergeIterator<MergeIterator<MemTableIterator>, MergeIterator<SsTableIterator>>;

pub struct LsmIterator {
inner: LsmIteratorInner, // MergeIterator<MemTableIterator>
inner: LsmIteratorInner,
end: Bound<Bytes>,
}

impl LsmIterator {
pub(crate) fn new(iter: LsmIteratorInner) -> Result<Self> {
let mut res = Self { inner: iter };
res.skip_delete_key().unwrap();
Ok(res)
pub(crate) fn new(inner: LsmIteratorInner, end: Bound<&[u8]>) -> Result<Self> {
let end = match end {
Bound::Included(x) | Bound::Excluded(x) => Bound::Included(Bytes::copy_from_slice(x)),
Bound::Unbounded => Bound::Unbounded,
};

let mut iter = Self { inner, end };
iter.skip_delete_key()?;
Ok(iter)
}

pub fn skip_delete_key(&mut self) -> Result<()> {
fn is_valid(&self) -> bool {
if !self.inner.is_valid() {
return false;
};

match &self.end {
Bound::Included(x) => self.inner.key().raw_ref() <= x && !self.value().is_empty(),
Bound::Excluded(x) => self.inner.key().raw_ref() < x && !self.value().is_empty(),
Bound::Unbounded => true,
}
}

fn skip_delete_key(&mut self) -> Result<()> {
while self.is_valid() && !self.inner.key().is_empty() && self.inner.value().is_empty() {
self.inner.next()?;
}

Ok(())
}
}

impl StorageIterator for LsmIterator {
type KeyType<'a> = &'a [u8];

fn value(&self) -> &[u8] {
self.inner.value()
fn is_valid(&self) -> bool {
self.is_valid()
}

fn key(&self) -> &[u8] {
self.inner.key().raw_ref()
}

fn is_valid(&self) -> bool {
return self.inner.is_valid();
fn value(&self) -> &[u8] {
self.inner.value()
}

fn next(&mut self) -> Result<()> {
Expand All @@ -53,7 +77,7 @@ impl StorageIterator for LsmIterator {
/// invalid. If an iterator is already invalid, `next` does not do anything. If `next` returns an error,
/// `is_valid` should return false, and `next` should always return an error.
pub struct FusedIterator<I: StorageIterator> {
iter: I, // 一个迭代器,可以自身错误(无法获得 key),也可以自身仍有效,但无法执行 next,这是两种不同的情况
iter: I,
has_errored: bool,
}

Expand All @@ -69,30 +93,25 @@ impl<I: StorageIterator> FusedIterator<I> {
impl<I: StorageIterator> StorageIterator for FusedIterator<I> {
type KeyType<'a> = I::KeyType<'a> where Self: 'a;

fn value(&self) -> &[u8] {
self.iter.value()
fn is_valid(&self) -> bool {
!self.has_errored && self.iter.is_valid()
}

fn key(&self) -> Self::KeyType<'_> {
self.iter.key()
}

fn is_valid(&self) -> bool {
// 这里二者的顺序不能调换,如果在迭代器包含错误的情况下调用is_valid,可能会直接 panic
(!self.has_errored) && self.iter.is_valid()
fn value(&self) -> &[u8] {
self.iter.value()
}

fn next(&mut self) -> Result<()> {
if self.has_errored {
// 有错误一定不能next
return Err(anyhow!("The iterator is invalid!"));
}
if let Ok(res) = self.iter.next() {
self.has_errored = false;
Ok(res)
Err(anyhow!("Invalid iterator"))
} else {
self.has_errored = true;
Err(anyhow!("The iterator is invalid after next operation!"))
let res = self.iter.next();
self.has_errored = res.is_err();
res
}
}
}
91 changes: 77 additions & 14 deletions mini-lsm-starter/src/lsm_storage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,11 +14,14 @@ use crate::compact::{
SimpleLeveledCompactionController, SimpleLeveledCompactionOptions, TieredCompactionController,
};
use crate::iterators::merge_iterator::MergeIterator;
use crate::iterators::two_merge_iterator::TwoMergeIterator;
use crate::iterators::StorageIterator;
use crate::key::KeySlice;
use crate::lsm_iterator::{FusedIterator, LsmIterator};
use crate::manifest::Manifest;
use crate::mem_table::MemTable;
use crate::mvcc::LsmMvccInner;
use crate::table::SsTable;
use crate::table::{SsTable, SsTableIterator};

pub type BlockCache = moka::sync::Cache<(usize, usize), Arc<Block>>;

Expand Down Expand Up @@ -423,20 +426,80 @@ impl LsmStorageInner {
_lower: Bound<&[u8]>,
_upper: Bound<&[u8]>,
) -> Result<FusedIterator<LsmIterator>> {
let state = self.state.read();
// memtable
let memtable = state.as_ref().memtable.as_ref();
let memtable_iter = memtable.scan(_lower, _upper);
let mut iters = vec![Box::new(memtable_iter)];
// imm_memtable
for imm_memtable in &state.imm_memtables {
let imm_memtable = Arc::as_ref(imm_memtable);
let imm_memtable_iter = imm_memtable.scan(_lower, _upper);
iters.push(Box::new(imm_memtable_iter));
// let state = self.state.read();
// // memtable
// let memtable = state.as_ref().memtable.as_ref();
//
// let memtable_iter = memtable.scan(_lower, _upper);
// let mut iters = vec![Box::new(memtable_iter)];
// // imm_memtable
// for imm_memtable in &state.imm_memtables {
// let imm_memtable = Arc::as_ref(imm_memtable);
// let imm_memtable_iter = imm_memtable.scan(_lower, _upper);
// iters.push(Box::new(imm_memtable_iter));
// }
// // 用 vec 创建
// let memtable_iterator = MergeIterator::create(iters);
// // SST
// let l0_sst = &state.as_ref().l0_sstables;
// let mut sst_iters = vec![];
// for sst in l0_sst {
// let sst_ptr = state.sstables.get(sst).unwrap();
// let sst_iter = SsTableIterator::create_and_seek_to_first(sst_ptr.clone())?;
// sst_iters.push(Box::new(sst_iter));
// }
// let sst_iterator = MergeIterator::create(sst_iters);
// let lsm_iterator_inner = TwoMergeIterator::create(memtable_iterator, sst_iterator)?;
// let iter = LsmIterator::new(lsm_iterator_inner, _upper)?;
// Ok(FusedIterator::new(iter))
let state = {
let guard = self.state.read();
Arc::clone(&guard)
};
let mut mem_iters = vec![];
mem_iters.push(Box::new(state.memtable.scan(_lower, _upper)));
mem_iters.append(
&mut state
.imm_memtables
.clone()
.into_iter()
.map(|m| Box::new(m.scan(_lower, _upper)))
.collect(),
);

let mut sst_iters = vec![];
let key_slice = match _lower {
Bound::Included(x) => KeySlice::from_slice(x),
Bound::Excluded(x) => KeySlice::from_slice(x),
Bound::Unbounded => KeySlice::default(),
};
for idx in &state.l0_sstables {
let sst_table = match state.sstables.get(idx) {
Some(sst_table) => sst_table.clone(),
None => continue,
};
let iter = {
let mut iter = SsTableIterator::create_and_seek_to_key(sst_table, key_slice)?;
if let Bound::Excluded(x) = _lower {
if x == key_slice.raw_ref() {
iter.next()?;
}
}
if !iter.is_valid() {
continue;
}
iter
};
sst_iters.push(Box::new(iter));
}
// 用 vec 创建
let lsm_iterator_inner = MergeIterator::create(iters);
let iter = LsmIterator::new(lsm_iterator_inner)?;

let iter = LsmIterator::new(
TwoMergeIterator::create(
MergeIterator::create(mem_iters),
MergeIterator::create(sst_iters),
)?,
_upper,
)?;
Ok(FusedIterator::new(iter))
}
}

0 comments on commit c8a9d3c

Please sign in to comment.