Skip to content

Commit

Permalink
refactor: remove box of iters (#8)
Browse files Browse the repository at this point in the history
  • Loading branch information
byte-sourcerer authored Apr 12, 2024
1 parent 08183d3 commit ae99763
Show file tree
Hide file tree
Showing 5 changed files with 17 additions and 34 deletions.
10 changes: 4 additions & 6 deletions src/iterators/merge.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ mod heap;
/// iterators, prefer the one with smaller index.
pub type MergeIterator<Item, I> = NoDuplication<MergeIteratorInner<Item, I>>;

pub async fn create_merge_iter<Item, I>(iters: impl Stream<Item = Box<I>>) -> MergeIterator<Item, I>
pub async fn create_merge_iter<Item, I>(iters: impl Stream<Item = I>) -> MergeIterator<Item, I>
where
Item: Ord + Debug,
I: Stream<Item = anyhow::Result<Item>> + Unpin,
Expand All @@ -27,7 +27,7 @@ where
}

pub async fn create_merge_iter_from_non_empty_iters<Item, I>(
iters: impl Stream<Item = NonEmptyStream<Item, Box<I>>>,
iters: impl Stream<Item = NonEmptyStream<Item, I>>,
) -> MergeIterator<Item, I>
where
Item: Ord + Debug,
Expand All @@ -49,17 +49,15 @@ where
I: Stream<Item = anyhow::Result<Item>> + Unpin,
Item: Ord + Debug,
{
pub async fn create(iters: impl Stream<Item = Box<I>>) -> Self {
pub async fn create(iters: impl Stream<Item = I>) -> Self {
let iters = iters
.map(NonEmptyStream::try_new)
.flat_map(FutureExt::into_stream)
.filter_map(|x| ready(x.ok().flatten()));
Self::from_non_empty_iters(iters).await
}

pub async fn from_non_empty_iters(
iters: impl Stream<Item = NonEmptyStream<Item, Box<I>>>,
) -> Self {
pub async fn from_non_empty_iters(iters: impl Stream<Item = NonEmptyStream<Item, I>>) -> Self {
let iters: BinaryHeap<_> = iters
.enumerate()
.map(|(index, iter)| HeapWrapper { index, iter })
Expand Down
2 changes: 1 addition & 1 deletion src/iterators/merge/heap.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ use std::cmp;

pub(super) struct HeapWrapper<Item, I> {
pub index: usize,
pub iter: NonEmptyStream<Item, Box<I>>,
pub iter: NonEmptyStream<Item, I>,
}

impl<Item, I> HeapWrapper<Item, I>
Expand Down
4 changes: 2 additions & 2 deletions src/memtable/iterator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,8 +29,8 @@ type SkipMapRangeIter<'a> = map::Range<'a, [u8], BytesBound<'a>, Bytes, Bytes>;

type SkipMapRangeEntry<'a> = map::Entry<'a, Bytes, Bytes>;

pub type NonEmptyMemTableIterRef<'a> = NonEmptyStream<Entry, Box<MemTableIterator<'a>>>;
pub type MaybeEmptyMemTableIterRef<'a> = MaybeEmptyStream<Entry, Box<MemTableIterator<'a>>>;
pub type NonEmptyMemTableIterRef<'a> = NonEmptyStream<Entry, MemTableIterator<'a>>;
pub type MaybeEmptyMemTableIterRef<'a> = MaybeEmptyStream<Entry, MemTableIterator<'a>>;

#[cfg(test)]
mod test {
Expand Down
21 changes: 5 additions & 16 deletions src/memtable/mutable.rs
Original file line number Diff line number Diff line change
Expand Up @@ -111,26 +111,15 @@ impl MemTable {
lower: Bound<&'a [u8]>,
upper: Bound<&'a [u8]>,
) -> Result<MaybeEmptyMemTableIterRef<'a>> {
let iter = foo(&self.map, lower, upper);
// let range = (lower, upper);
// let iter = self.map.range(range);
let iter = self.map.range(BytesBound {
start: lower,
end: upper,
});
let iter = new_memtable_iter(iter);
NonEmptyStream::try_new(Box::new(iter)).await
NonEmptyStream::try_new(iter).await
}
}

fn foo<'a, 'b>(
m: &'a SkipMap<Bytes, Bytes>,
lower: Bound<&'b [u8]>,
upper: Bound<&'b [u8]>,
) -> Range<'a, [u8], BytesBound<'b>, Bytes, Bytes> {
let iter = m.range(BytesBound {
start: lower,
end: upper,
});
iter
}

#[cfg(test)]
impl MemTable {
pub fn for_testing_put_slice(&self, key: &[u8], value: &[u8]) -> Result<()> {
Expand Down
14 changes: 5 additions & 9 deletions src/sst/sstables.rs
Original file line number Diff line number Diff line change
Expand Up @@ -98,22 +98,18 @@ where
None
} else {
let iter = SsTableIterator::scan(table, lower, upper);
NonEmptyStream::try_new(Box::new(iter)).await.ok().flatten()
NonEmptyStream::try_new(iter).await.ok().flatten()
}
}
});
create_merge_iter_from_non_empty_iters(iters).await
};

let levels = {
let iters = self
.levels
.iter()
.filter_map(move |(_, ids)| {
let tables = ids.iter().map(|id| self.sstables.get(id).unwrap());
scan_sst_concat(tables, lower, upper).ok()
})
.map(Box::new);
let iters = self.levels.iter().filter_map(move |(_, ids)| {
let tables = ids.iter().map(|id| self.sstables.get(id).unwrap());
scan_sst_concat(tables, lower, upper).ok()
});
let iters = stream::iter(iters);
create_merge_iter(iters).await
};
Expand Down

0 comments on commit ae99763

Please sign in to comment.