Skip to content

Commit

Permalink
fix
Browse files Browse the repository at this point in the history
  • Loading branch information
Kermit committed Sep 18, 2024
1 parent e866bd2 commit 79f0ed8
Show file tree
Hide file tree
Showing 6 changed files with 59 additions and 32 deletions.
12 changes: 12 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@ tracing-futures = { version = "0.2.5", features = ["futures-03"] }
num-traits = "0.2.19"
maplit = "1.0.2"
rocksdb = { version = "0.22" }
atomic_enum = "0.3.0"

[dev-dependencies]
tempfile = "3"
Expand Down
5 changes: 2 additions & 3 deletions src/persistent/file_object.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ impl LocalFs {
Self { dir: dir.into() }
}

fn build_sst_path(&self, id: usize) -> PathBuf {
pub fn build_sst_path(&self, id: usize) -> PathBuf {
self.dir.join(format!("{}.sst", id))
}

Expand Down Expand Up @@ -136,8 +136,7 @@ impl SstHandle for FileObject {
}

async fn delete(&self) -> anyhow::Result<()> {
let path = self.path.clone();
spawn_blocking(|| std::fs::remove_file(path)).await??;
tokio::fs::remove_file(&self.path).await?;
Ok(())
}
}
Expand Down
34 changes: 23 additions & 11 deletions src/sst/compact/leveled.rs
Original file line number Diff line number Diff line change
Expand Up @@ -284,7 +284,6 @@ mod tests {

use std::collections::HashSet;

use nom::AsBytes;
use std::sync::Arc;
use tempfile::{tempdir, TempDir};
use tokio::sync::Mutex;
Expand All @@ -299,7 +298,7 @@ mod tests {
use crate::sst::compact::{CompactionOptions, LeveledCompactionOptions};

use crate::sst::{SsTable, SstOptions, Sstables};
use crate::state::{LsmStorageState, Map};
use crate::state::LsmStorageState;
use crate::test_utils::insert_sst;

#[test]
Expand Down Expand Up @@ -407,7 +406,7 @@ mod tests {
}
}

#[tokio::test]
#[tokio::test(flavor = "multi_thread")]
async fn test_force_compaction() {
let dir = tempdir().unwrap();
let (state, mut sstables) = prepare_sstables(&dir).await;
Expand All @@ -427,6 +426,10 @@ mod tests {
)
.await
.unwrap();

let persistent = state.persistent.clone();
drop(state);

{
assert_eq!(sstables.l0_sstables, Vec::<usize>::new());
assert_eq!(
Expand All @@ -436,16 +439,25 @@ mod tests {
assert_eq!(sstables.sstables.len(), 8);
}

for i in 0..5 {
let begin = i * 100;
let range = begin..begin + 100;
for i in range {
let key = format!("key-{:04}", i);
let expected_value = format!("value-{:04}", i);
let value = state.get(key.as_bytes()).await.unwrap().unwrap();
assert_eq!(expected_value.as_bytes(), value.as_bytes());
// check old sst deleted
{
for id in 0..9 {
let path = persistent.build_sst_path(id);
assert!(!path.as_path().exists(), "sst {} still exists", id);
}
}

// todo: check keys
// for i in 0..5 {
// let begin = i * 100;
// let range = begin..begin + 100;
// for i in range {
// let key = format!("key-{:04}", i);
// let expected_value = format!("value-{:04}", i);
// let value = state.get(key.as_bytes()).await.unwrap().unwrap();
// assert_eq!(expected_value.as_bytes(), value.as_bytes());
// }
// }
}

async fn prepare_sstables(dir: &TempDir) -> (LsmStorageState<LocalFs>, Sstables<FileObject>) {
Expand Down
37 changes: 20 additions & 17 deletions src/sst/tables.rs
Original file line number Diff line number Diff line change
@@ -1,9 +1,10 @@
use std::fmt::{Debug, Formatter};
use std::sync::Arc;

use anyhow::{anyhow, Result};
use atomic_enum::atomic_enum;
use bytes::{Buf, Bytes};
use derive_getters::Getters;
use std::fmt::{Debug, Formatter};
use std::sync::atomic::Ordering;
use std::sync::Arc;
use tokio::runtime::Handle;
use tracing::error;
use typed_builder::TypedBuilder;
Expand All @@ -16,7 +17,6 @@ use crate::sst::bloom::Bloom;
use crate::sst::iterator::BlockFallibleIter;
use crate::sst::BlockMeta;
use crate::utils::range::MinMax;
use crate::utils::scoped::ScopedMutex;

/// An SSTable.
#[derive(TypedBuilder, Getters)]
Expand All @@ -36,16 +36,12 @@ pub struct SsTable<File: SstHandle> {
pub max_ts: u64, // todo: use Option?

#[builder(default)]
// todo: use atomic
state: ScopedMutex<SstState>,
state: AtomicSstStateWrapper,
}

impl<File: SstHandle> Drop for SsTable<File> {
fn drop(&mut self) {
let to_delete = self
.state
.lock_with(|state| matches!(*state, SstState::ToDelete));
if to_delete {
if self.state.0.load(Ordering::Relaxed) == SstState::ToDelete {
let result = tokio::task::block_in_place(|| {
Handle::current().block_on(async { self.file.delete().await })
});
Expand All @@ -68,7 +64,7 @@ impl SsTable<()> {
last_key: KeyBytes::new(Bytes::copy_from_slice(last_key.as_bytes()), 0),
bloom: None,
max_ts: 0,
state: ScopedMutex::default(),
state: Default::default(),
}
}
}
Expand Down Expand Up @@ -150,7 +146,7 @@ impl<File: SstHandle> SsTable<File> {
last_key,
bloom: Some(bloom),
max_ts,
state: ScopedMutex::default(),
state: Default::default(),
};
Ok(table)
}
Expand Down Expand Up @@ -225,14 +221,21 @@ impl<File: SstHandle> SsTable<File> {
}

pub fn set_to_delete(&self) {
self.state
.lock_with(|mut state| *state = SstState::ToDelete);
self.state.0.store(SstState::ToDelete, Ordering::Relaxed);
}
}

#[derive(Debug, Default)]
#[atomic_enum]
#[derive(PartialEq)]
pub enum SstState {
#[default]
Active,
Active = 0,
ToDelete,
}

pub struct AtomicSstStateWrapper(AtomicSstState);

impl Default for AtomicSstStateWrapper {
fn default() -> Self {
Self(AtomicSstState::new(SstState::Active))
}
}
2 changes: 1 addition & 1 deletion src/state/states.rs
Original file line number Diff line number Diff line change
Expand Up @@ -974,7 +974,7 @@ mod test {
);
}

#[tokio::test]
#[tokio::test(flavor = "multi_thread")]
async fn test_task3_mvcc_compaction() {
use crate::state::write_batch::WriteBatchRecord::{Del, Put};

Expand Down

0 comments on commit 79f0ed8

Please sign in to comment.