Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: support MVCC #68

Merged
merged 69 commits into from
Aug 17, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
69 commits
Select commit Hold shift + click to select a range
bccf249
init
Jun 1, 2024
2bc1f69
add tests
Jun 1, 2024
ea9ef89
add test
Jun 1, 2024
c13c21a
add test
Jun 1, 2024
7f8de7a
add test
Jun 1, 2024
e6a4a86
add test
Jun 1, 2024
4159641
add test
Jun 1, 2024
3450d93
add test
Jun 1, 2024
22537a3
support watermark
Jun 2, 2024
631a4f5
wip
Jun 2, 2024
cd84f3e
wip
Jun 3, 2024
a680bdd
wip
Jun 3, 2024
c87ccf0
wip
Jun 9, 2024
cdad59d
wip
Jun 10, 2024
cc05cab
fix: cargo check
Jun 10, 2024
158587c
fix: check
Jun 13, 2024
722359a
wip
Jun 13, 2024
05f088c
wip
Jun 14, 2024
ab97b93
wip
Jun 23, 2024
03cf677
fix: cargo check --tests
Jun 23, 2024
e003bf3
fix: cargo fmt
Jun 23, 2024
08d6ea9
refactor test utils
Jun 23, 2024
1af0539
fix: clippy
Jun 23, 2024
47253e5
fix: clippy & fmt
Jun 23, 2024
328d67e
fix: clippy & fmt
Jun 24, 2024
21945d6
fix: block meta encoding/decoding
Jul 18, 2024
f588be2
fix: block builder test
Jul 18, 2024
696b7b7
fix: test
Jul 18, 2024
cdfbd4d
fix: tests
Jul 19, 2024
220d703
fix: test
Jul 20, 2024
d3d7bbe
fix: time dedup iter
Jul 20, 2024
9c356c1
add test
Jul 21, 2024
1f8cb7f
wip: txn iterator
Jul 22, 2024
bcbadea
fix: txn iterator
Jul 23, 2024
a91ee88
wip
Jul 25, 2024
14c1a54
wip
Jul 25, 2024
90ef246
wip: time provider
Jul 25, 2024
9b2c423
feat: use external time provider
Aug 2, 2024
41130b7
fix: scan with ts
Aug 2, 2024
7e500e0
wip
Aug 2, 2024
7d0e16e
fix: test_task2_memtable_mvcc
Aug 4, 2024
45a70bd
fix: test_task1_storage_get
Aug 4, 2024
5ee394a
fix: test_wal_integration
Aug 4, 2024
ed5ac31
wip
Aug 4, 2024
f66da10
add tests
Aug 4, 2024
3649653
wip
Aug 5, 2024
df8f7b7
wip
Aug 5, 2024
4794080
wip
Aug 5, 2024
02b4a15
wip
Aug 7, 2024
b2bd12b
wip
Aug 7, 2024
65b164b
wip
Aug 8, 2024
964da58
wip
Aug 8, 2024
b136bc9
use ATPIT instead of TAIT
Aug 11, 2024
325bfd8
upgrade toolchain
Aug 11, 2024
7795bd3
fix: test
Aug 13, 2024
40105d6
fix: warning
Aug 13, 2024
619781c
fix: test
Aug 13, 2024
372ff76
add inspect iterator
Aug 13, 2024
22d9b25
fix: Cargo.toml
Aug 14, 2024
8080fc4
add test & fix clippy
Aug 14, 2024
67b0977
add test
Aug 14, 2024
6f7ff34
fix: test
Aug 14, 2024
8b5a715
feat: add WatermarkGcIter
Aug 15, 2024
bfdb22f
fix: clippy
Aug 15, 2024
e568af4
try fix mvcc gc
Aug 16, 2024
281004d
add full compaction
Aug 17, 2024
9d2ae1f
fix: test_task3_mvcc_compaction
Aug 17, 2024
96f0d3c
fix: async runtime panic
Aug 17, 2024
89098e1
fix: ycsb bench
Aug 17, 2024
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
36 changes: 27 additions & 9 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 2 additions & 2 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -46,8 +46,9 @@ tokio-util = "0.7.11"
futures-concurrency = "7.6.1"
ordered-float = "4.2.2"
getset = "0.1.2"
derive_more = "1.0.0"
derive_more = { version = "1.0.0", features = ["full"] }
tracing-futures = { version = "0.2.5", features = ["futures-03"] }
num-traits = "0.2.19"

[dev-dependencies]
tempfile = "3"
Expand All @@ -56,7 +57,6 @@ criterion = { version = "0.5.1", features = ["async_tokio"] }
maplit = "1.0.2"

[lints.rust]
unused = "allow"
unsafe_code = "forbid"

[[bench]]
Expand Down
19 changes: 12 additions & 7 deletions benches/ycsb.rs
Original file line number Diff line number Diff line change
@@ -1,24 +1,28 @@
use better_mini_lsm::fibonacci;
use better_mini_lsm::persistent::LocalFs;
use better_mini_lsm::sst::SstOptions;
use better_mini_lsm::state::{LsmStorageState, Map};
use criterion::{black_box, criterion_group, criterion_main, Criterion};
use itertools::Itertools;
use maplit::hashmap;
use std::collections::HashMap;
use std::sync::Arc;

use criterion::{criterion_group, criterion_main, Criterion};
use itertools::Itertools;
use maplit::hashmap;
use tempfile::tempdir;
use ycsb::db::DB;
use ycsb::properties::Properties;
use ycsb::workload::CoreWorkload;

use better_mini_lsm::persistent::LocalFs;
use better_mini_lsm::sst::SstOptions;
use better_mini_lsm::state::{LsmStorageState, Map};

#[derive(Clone)]
struct LsmStorageStateBench(Arc<LsmStorageState<LocalFs>>);

impl IsSend for LsmStorageStateBench {}
impl IsSync for LsmStorageStateBench {}

#[allow(dead_code)]
trait IsSend: Send {}

#[allow(dead_code)]
trait IsSync: Sync {}

impl DB for LsmStorageStateBench {
Expand Down Expand Up @@ -60,6 +64,7 @@ fn ycsb_bench(c: &mut Criterion) {
.num_memtable_limit(1000)
.compaction_option(Default::default())
.enable_wal(false)
.enable_mvcc(true)
.build();
let runtime = tokio::runtime::Runtime::new().unwrap();
let state =
Expand Down
2 changes: 1 addition & 1 deletion rust-toolchain.toml
Original file line number Diff line number Diff line change
@@ -1,2 +1,2 @@
[toolchain]
channel = "nightly-2024-02-09"
channel = "nightly-2024-08-11"
45 changes: 29 additions & 16 deletions src/block/blocks.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
use crate::key::KeyBytes;
use bytes::Bytes;
use bytes::{Buf, Bytes};

use crate::entry::Entry;
use crate::entry::InnerEntry;
use crate::key::{KeyBytes, KeySlice};

/// A block is the smallest unit of read and caching in LSM tree. It is a collection of sorted key-value pairs.
#[derive(Debug)]
Expand Down Expand Up @@ -51,7 +51,7 @@ impl Block {
self.offsets.len()
}

pub fn get_entry_ref(&self, index: usize) -> (&[u8], &[u8]) {
pub fn get_entry_ref(&self, index: usize) -> (KeySlice, &[u8]) {
// get key
let (data, key) = self.parse_key_ref(index);

Expand All @@ -67,28 +67,34 @@ impl Block {
(data, key, value)
}

fn parse_key_ref(&self, index: usize) -> (&[u8], &[u8]) {
fn parse_key_ref(&self, index: usize) -> (&[u8], KeySlice) {
let data = &self.data[self.offsets[index] as usize..];

if index == 0 {
Self::get_uncompressed_key_ref(data)
} else {
let first_key = self.first_key_ref();
Self::get_compressed_key_ref(first_key, data)
Self::get_compressed_key_ref(first_key.raw_ref(), data)
}
}

fn get_uncompressed_key_ref(data: &[u8]) -> (&[u8], &[u8]) {
get_value(data)
fn get_uncompressed_key_ref(data: &[u8]) -> (&[u8], KeySlice) {
let (data, raw_key) = get_value(data);
let (data, timestamp) = get_u64(data);
let output = KeySlice::new(raw_key, timestamp);
(data, output)
}

fn get_compressed_key_ref<'b>(first_key: &[u8], data: &'b [u8]) -> (&'b [u8], &'b [u8]) {
// structure: [common_prefix_len, postfix_len, postfix, timestamp]
fn get_compressed_key_ref<'b>(first_key: &[u8], data: &'b [u8]) -> (&'b [u8], KeySlice<'b>) {
let (data, common_prefix_len) = get_u16(data);
let prefix = &first_key[..common_prefix_len];

let (data, postfix_len) = get_u16(data);
let (data, postfix) = get_data_by_len(data, postfix_len);

let (data, timestamp) = get_u64(data);

// todo: 这里需要能把 (prefix: &[u8], postfix: &[u8]) 当作 &[u8] 的相关数据结构 (tuple of slices)
let key = prefix
.iter()
Expand All @@ -97,29 +103,30 @@ impl Block {
.collect::<Vec<_>>()
.leak();

let key = KeySlice::new(key, timestamp);

(data, key)
}

pub fn get_entry(&self, index: usize) -> Entry {
pub fn get_entry(&self, index: usize) -> InnerEntry {
let (key, value) = self.get_entry_ref(index);
let key = Bytes::copy_from_slice(key);
let key = key.copy_to_key_bytes();
let value = Bytes::copy_from_slice(value);
Entry { key, value }
InnerEntry { key, value }
}

pub fn first_key(&self) -> KeyBytes {
let key = self.first_key_ref();
KeyBytes::from_bytes(Bytes::copy_from_slice(key))
self.first_key_ref().copy_to_key_bytes()
}

fn first_key_ref(&self) -> &[u8] {
fn first_key_ref(&self) -> KeySlice {
let (_, key) = self.parse_key_ref(0);
key
}

pub fn last_key(&self) -> KeyBytes {
let (_, key) = self.parse_key_ref(self.offsets.len() - 1);
KeyBytes::from_bytes(Bytes::copy_from_slice(key))
key.copy_to_key_bytes()
}
}

Expand All @@ -135,6 +142,12 @@ fn get_u16(data: &[u8]) -> (&[u8], usize) {
(new_data, value)
}

fn get_u64(data: &[u8]) -> (&[u8], u64) {
let new_data = &data[8..];
let value = (&data[..8]).get_u64();
(new_data, value)
}

fn get_data_by_len(data: &[u8], len: usize) -> (&[u8], &[u8]) {
(&data[len..], &data[..len])
}
30 changes: 21 additions & 9 deletions src/block/builder.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,9 @@
use crate::key::{KeySlice, KeyVec};
use bytes::BufMut;
use std::iter;

use bytes::BufMut;

use crate::key::{KeySlice, KeyVec};

use super::Block;

/// Builds a block.
Expand Down Expand Up @@ -74,8 +76,7 @@ impl BlockBuilder {
compress_key(first_key, key, &mut self.data);
} else {
// first key
self.data.extend((key.len() as u16).to_be_bytes());
self.data.extend(key.raw_ref());
encode_key(key, &mut self.data);
}

self.data.extend((value.len() as u16).to_be_bytes());
Expand All @@ -91,6 +92,7 @@ impl BlockBuilder {

fn compress_key(first_key: &KeyVec, key: KeySlice, buffer: &mut Vec<u8>) {
let first_key = first_key.raw_ref();
let timestamp = key.timestamp();
let key = key.raw_ref();

let common_prefix = iter::zip(first_key.iter(), key.iter())
Expand All @@ -102,15 +104,25 @@ fn compress_key(first_key: &KeyVec, key: KeySlice, buffer: &mut Vec<u8>) {
if postfix > 0 {
buffer.extend_from_slice(&key[common_prefix..]);
}
buffer.put_u64(timestamp);
}

// todo: 太多的 encoding 方法了,需要统一
fn encode_key(key: KeySlice, buffer: &mut Vec<u8>) {
buffer.put_u16(key.len() as u16);
buffer.extend(key.raw_ref());
buffer.put_u64(key.timestamp());
}

#[cfg(test)]
mod tests {
use crate::block::{Block, BlockBuilder, BlockIterator};
use crate::key::{KeySlice, KeyVec};
use std::sync::Arc;

use bytes::Bytes;
use nom::AsBytes;
use std::sync::Arc;

use crate::block::{Block, BlockBuilder, BlockIterator};
use crate::key::{KeySlice, KeyVec};

#[test]
fn test_block_build_single_key() {
Expand Down Expand Up @@ -199,7 +211,7 @@ mod tests {
for _ in 0..5 {
let mut iter = BlockIterator::create_and_seek_to_first(block.clone());
for i in 0..num_of_keys() {
let entry = iter.next().unwrap().unwrap();
let entry = iter.next().unwrap().unwrap().prune_ts();
let key = entry.key.as_bytes();
let value = entry.value.as_bytes();
assert_eq!(
Expand All @@ -226,7 +238,7 @@ mod tests {
let mut iter = BlockIterator::create_and_seek_to_key(block, key_of(0).as_key_slice());
for offset in 1..=5 {
for i in 0..num_of_keys() {
let entry = iter.next().unwrap().unwrap();
let entry = iter.next().unwrap().unwrap().prune_ts();
let key = entry.key.as_bytes();
let value = entry.value.as_bytes();
assert_eq!(
Expand Down
8 changes: 3 additions & 5 deletions src/block/iterator.rs
Original file line number Diff line number Diff line change
@@ -1,8 +1,7 @@
use crate::block::blocks::Block;
use crate::entry::Entry;
use crate::key::{Key, KeySlice};
use crate::entry::InnerEntry;
use crate::key::KeySlice;
use std::sync::Arc;
use tracing::info;

// Iterates on a block.
pub struct BlockIterator {
Expand Down Expand Up @@ -36,7 +35,6 @@ impl BlockIterator {
let mut current = self.block.len();
for index in 0..self.block.len() {
let (this_key, _) = self.block.get_entry_ref(index);
let this_key = Key::from_slice(this_key);
if this_key >= key {
current = index;
break;
Expand All @@ -47,7 +45,7 @@ impl BlockIterator {
}

impl Iterator for BlockIterator {
type Item = anyhow::Result<Entry>;
type Item = anyhow::Result<InnerEntry>;

fn next(&mut self) -> Option<Self::Item> {
if self.idx >= self.block.len() {
Expand Down
Loading