Skip to content

Commit

Permalink
fix: SST deletion (#95)
Browse files Browse the repository at this point in the history
  • Loading branch information
byte-sourcerer authored Sep 18, 2024
1 parent c4966d7 commit 8543300
Show file tree
Hide file tree
Showing 13 changed files with 141 additions and 94 deletions.
12 changes: 12 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@ tracing-futures = { version = "0.2.5", features = ["futures-03"] }
num-traits = "0.2.19"
maplit = "1.0.2"
rocksdb = { version = "0.22" }
atomic_enum = "0.3.0"

[dev-dependencies]
tempfile = "3"
Expand Down
16 changes: 16 additions & 0 deletions src/persistent/dummy.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
use crate::persistent::SstHandle;
use anyhow::anyhow;

impl SstHandle for () {
async fn read(&self, _offset: u64, _len: usize) -> anyhow::Result<Vec<u8>> {
Err(anyhow!("unimplemented"))
}

fn size(&self) -> u64 {
unreachable!()
}

async fn delete(&self) -> anyhow::Result<()> {
Ok(())
}
}
30 changes: 20 additions & 10 deletions src/persistent/file_object.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ impl LocalFs {
Self { dir: dir.into() }
}

fn build_sst_path(&self, id: usize) -> PathBuf {
pub fn build_sst_path(&self, id: usize) -> PathBuf {
self.dir.join(format!("{}.sst", id))
}

Expand All @@ -47,14 +47,18 @@ impl Persistent for LocalFs {
async fn create_sst(&self, id: usize, data: Vec<u8>) -> anyhow::Result<Self::SstHandle> {
let size = data.len().try_into()?;
let path = self.build_sst_path(id);
let file = spawn_blocking(move || {
std::fs::write(&path, &data)?;
File::open(&path)?.sync_all()?;
let file = File::options().read(true).append(true).open(&path)?;
Ok::<_, anyhow::Error>(Arc::new(file))
})
.await??;
let handle = FileObject { file, size };
let file = {
let path = path.clone();
spawn_blocking(move || {
// todo: avoid clone
std::fs::write(&path, &data)?;
File::open(&path)?.sync_all()?;
let file = File::options().read(true).append(true).open(&path)?;
Ok::<_, anyhow::Error>(Arc::new(file))
})
.await??
};
let handle = FileObject { file, size, path };
Ok(handle)
}

Expand All @@ -68,7 +72,7 @@ impl Persistent for LocalFs {
.with_context(|| format!("id: {}, path: {:?}", id, &path))?;
let file = Arc::new(file);
let size = file.metadata()?.len();
let handle = FileObject { file, size };
let handle = FileObject { file, size, path };
Ok::<_, anyhow::Error>(handle)
})
.await??;
Expand Down Expand Up @@ -110,6 +114,7 @@ impl Persistent for LocalFs {
/// A file object.
pub struct FileObject {
file: Arc<File>,
path: PathBuf,
size: u64,
}

Expand All @@ -129,6 +134,11 @@ impl SstHandle for FileObject {
fn size(&self) -> u64 {
self.size
}

async fn delete(&self) -> anyhow::Result<()> {
tokio::fs::remove_file(&self.path).await?;
Ok(())
}
}

impl FileObject {}
11 changes: 1 addition & 10 deletions src/persistent/interface.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,4 @@
use std::future::Future;
use std::sync::Arc;
use tokio::io::{AsyncRead, AsyncWrite};

pub trait Persistent: Send + Sync + Clone + 'static {
Expand Down Expand Up @@ -30,16 +29,8 @@ pub trait SstHandle: Send + Sync + 'static {
-> impl Future<Output = anyhow::Result<Vec<u8>>> + Send;

fn size(&self) -> u64;
}

impl<T: SstHandle> SstHandle for Arc<T> {
async fn read(&self, offset: u64, len: usize) -> anyhow::Result<Vec<u8>> {
self.as_ref().read(offset, len).await
}

fn size(&self) -> u64 {
self.as_ref().size()
}
fn delete(&self) -> impl Future<Output = anyhow::Result<()>> + Send;
}

pub trait WalHandle: AsyncWrite + AsyncRead + Send + Sync + Unpin + 'static {
Expand Down
1 change: 1 addition & 0 deletions src/persistent/mod.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
pub mod dummy;
pub mod file_object;
pub mod interface;
mod manifest_handle;
Expand Down
45 changes: 3 additions & 42 deletions src/sst/compact/common.rs
Original file line number Diff line number Diff line change
Expand Up @@ -133,48 +133,6 @@ where
}
}

// pub async fn compact_with_task<P: Persistent>(
// sstables: &mut Sstables<P::SstHandle>,
// next_sst_id: SstIdGeneratorImpl,
// options: Arc<SstOptions>,
// persistent: P,
// task: &CompactionTask,
// watermark: Option<u64>,
// ) -> anyhow::Result<Vec<usize>> {
// let source = task.source();
// let source_level: Vec<_> = match task.source_index() {
// SourceIndex::Index { index } => {
// let source_id = *sstables.table_ids(source).get(index).unwrap();
// let source_level = sstables.sstables.get(&source_id).unwrap().as_ref();
// let source = iter::once(source_level);
// source.collect()
// }
// SourceIndex::Full { .. } => {
// let source = sstables.tables(source);
// source.collect()
// }
// };
//
// let destination = task.destination();
//
// let new_sst = assert_send(compact_generate_new_sst(
// source_level,
// sstables.tables(destination),
// next_sst_id,
// options,
// persistent,
// watermark,
// ))
// .await?;
//
// let new_sst_ids: Vec<_> = new_sst.iter().map(|table| table.id()).copied().collect();
//
// sstables.apply_compaction_sst(new_sst, task);
// sstables.apply_compaction_sst_ids(task, new_sst_ids.clone());
//
// Ok(new_sst_ids)
// }

pub async fn force_compact<P: Persistent + Clone>(
old_sstables: Arc<Sstables<P::SstHandle>>,
sstables: &mut Sstables<P::SstHandle>,
Expand Down Expand Up @@ -240,6 +198,9 @@ pub async fn force_compact<P: Persistent + Clone>(
.iter()
.chain(record.task.destination_ids.iter())
{
if let Some(sst) = sstables.sstables.get(old_id) {
sst.set_to_delete();
}
sstables.sstables.remove(old_id);
}
}
Expand Down
5 changes: 4 additions & 1 deletion src/sst/compact/full.rs
Original file line number Diff line number Diff line change
@@ -1,10 +1,13 @@
use crate::persistent::SstHandle;
use crate::sst::compact::common::NewCompactionTask;
use crate::sst::Sstables;

#[derive(Debug, Clone, Copy)]
pub struct LeveledCompactionOptions;

pub fn generate_full_compaction_task<File>(sstables: &Sstables<File>) -> Option<NewCompactionTask> {
pub fn generate_full_compaction_task<File: SstHandle>(
sstables: &Sstables<File>,
) -> Option<NewCompactionTask> {
let source_ids = sstables.table_ids(0).clone();
let destination_level = 1;
let destination_ids = sstables.table_ids(1).clone();
Expand Down
40 changes: 26 additions & 14 deletions src/sst/compact/leveled.rs
Original file line number Diff line number Diff line change
Expand Up @@ -166,7 +166,7 @@ pub fn generate_tasks<File: SstHandle>(
}
}

fn generate_tasks_for_other_level<'a, File>(
fn generate_tasks_for_other_level<'a, File: SstHandle>(
source_level: usize,
sstables: &'a Sstables<File>,
tables_in_compaction: &'a mut HashSet<usize>,
Expand Down Expand Up @@ -203,7 +203,7 @@ fn generate_tasks_for_other_level<'a, File>(
.flatten()
}

fn generate_task_for_l0<'a, File>(
fn generate_task_for_l0<'a, File: SstHandle>(
source_level: usize,
sstables: &'a Sstables<File>,
target_sizes: &[u64],
Expand Down Expand Up @@ -258,7 +258,7 @@ pub async fn compact_task<'a, P: Persistent>(
.await
}

fn generate_next_level_table_ids<File>(
fn generate_next_level_table_ids<File: SstHandle>(
tables_in_compaction: &mut HashSet<usize>,
sstables: &Sstables<File>,
source_key_range: &MinMax<KeyBytes>,
Expand All @@ -284,7 +284,6 @@ mod tests {

use std::collections::HashSet;

use nom::AsBytes;
use std::sync::Arc;
use tempfile::{tempdir, TempDir};
use tokio::sync::Mutex;
Expand All @@ -299,7 +298,7 @@ mod tests {
use crate::sst::compact::{CompactionOptions, LeveledCompactionOptions};

use crate::sst::{SsTable, SstOptions, Sstables};
use crate::state::{LsmStorageState, Map};
use crate::state::LsmStorageState;
use crate::test_utils::insert_sst;

#[test]
Expand Down Expand Up @@ -407,7 +406,7 @@ mod tests {
}
}

#[tokio::test]
#[tokio::test(flavor = "multi_thread")]
async fn test_force_compaction() {
let dir = tempdir().unwrap();
let (state, mut sstables) = prepare_sstables(&dir).await;
Expand All @@ -427,6 +426,10 @@ mod tests {
)
.await
.unwrap();

let persistent = state.persistent.clone();
drop(state);

{
assert_eq!(sstables.l0_sstables, Vec::<usize>::new());
assert_eq!(
Expand All @@ -436,16 +439,25 @@ mod tests {
assert_eq!(sstables.sstables.len(), 8);
}

for i in 0..5 {
let begin = i * 100;
let range = begin..begin + 100;
for i in range {
let key = format!("key-{:04}", i);
let expected_value = format!("value-{:04}", i);
let value = state.get(key.as_bytes()).await.unwrap().unwrap();
assert_eq!(expected_value.as_bytes(), value.as_bytes());
// check old sst deleted
{
for id in 0..9 {
let path = persistent.build_sst_path(id);
assert!(!path.as_path().exists(), "sst {} still exists", id);
}
}

// todo: check keys
// for i in 0..5 {
// let begin = i * 100;
// let range = begin..begin + 100;
// for i in range {
// let key = format!("key-{:04}", i);
// let expected_value = format!("value-{:04}", i);
// let value = state.get(key.as_bytes()).await.unwrap().unwrap();
// assert_eq!(expected_value.as_bytes(), value.as_bytes());
// }
// }
}

async fn prepare_sstables(dir: &TempDir) -> (LsmStorageState<LocalFs>, Sstables<FileObject>) {
Expand Down
6 changes: 3 additions & 3 deletions src/sst/iterator/iter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -119,14 +119,14 @@ where
}

#[pin_project]
pub struct SsTableIterator<'a, File> {
pub struct SsTableIterator<'a, File: SstHandle> {
table: &'a SsTable<File>,
#[pin]
inner: InnerIter<'a>,
bloom: Option<&'a Bloom>,
}

impl<'a, File> SsTableIterator<'a, File> {
impl<'a, File: SstHandle> SsTableIterator<'a, File> {
pub fn may_contain(&self, key: &[u8]) -> bool {
bloom::may_contain(self.bloom, key)
}
Expand Down Expand Up @@ -161,7 +161,7 @@ where
}

// todo: 感觉没必要 impl Stream,使用 (Bloom, InnerIter) 比较好?
impl<'a, File> Stream for SsTableIterator<'a, File> {
impl<'a, File: SstHandle> Stream for SsTableIterator<'a, File> {
type Item = anyhow::Result<InnerEntry>;

fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
Expand Down
Loading

0 comments on commit 8543300

Please sign in to comment.