Skip to content

Commit

Permalink
feat: optimize gc with rocksdb (#668)
Browse files Browse the repository at this point in the history
Signed-off-by: Gaius <[email protected]>
  • Loading branch information
gaius-qi authored Aug 14, 2024
1 parent 4ac259a commit 71efbe9
Show file tree
Hide file tree
Showing 9 changed files with 139 additions and 75 deletions.
34 changes: 22 additions & 12 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

17 changes: 8 additions & 9 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ members = [
]

[workspace.package]
version = "0.1.99"
version = "0.1.100"
authors = ["The Dragonfly Developers"]
homepage = "https://d7y.io/"
repository = "https://github.com/dragonflyoss/client.git"
Expand All @@ -22,13 +22,13 @@ readme = "README.md"
edition = "2021"

[workspace.dependencies]
dragonfly-client = { path = "dragonfly-client", version = "0.1.99" }
dragonfly-client-core = { path = "dragonfly-client-core", version = "0.1.99" }
dragonfly-client-config = { path = "dragonfly-client-config", version = "0.1.99" }
dragonfly-client-storage = { path = "dragonfly-client-storage", version = "0.1.99" }
dragonfly-client-backend = { path = "dragonfly-client-backend", version = "0.1.99" }
dragonfly-client-util = { path = "dragonfly-client-util", version = "0.1.99" }
dragonfly-client-init = { path = "dragonfly-client-init", version = "0.1.99" }
dragonfly-client = { path = "dragonfly-client", version = "0.1.100" }
dragonfly-client-core = { path = "dragonfly-client-core", version = "0.1.100" }
dragonfly-client-config = { path = "dragonfly-client-config", version = "0.1.100" }
dragonfly-client-storage = { path = "dragonfly-client-storage", version = "0.1.100" }
dragonfly-client-backend = { path = "dragonfly-client-backend", version = "0.1.100" }
dragonfly-client-util = { path = "dragonfly-client-util", version = "0.1.100" }
dragonfly-client-init = { path = "dragonfly-client-init", version = "0.1.100" }
thiserror = "1.0"
dragonfly-api = "2.0.148"
reqwest = { version = "0.12.4", features = ["stream", "native-tls", "default-tls", "rustls-tls"] }
Expand Down Expand Up @@ -57,7 +57,6 @@ hex = "0.4"
rocksdb = "0.22.0"
serde = { version = "1.0", features = ["derive"] }
serde_yaml = "0.9"
serde_json = "1.0"
http = "1"
tonic = { version = "0.12.1", features = ["zstd"] }
tokio = { version = "1.39.2", features = ["full"] }
Expand Down
3 changes: 2 additions & 1 deletion dragonfly-client-storage/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@ chrono.workspace = true
reqwest.workspace = true
rocksdb.workspace = true
serde.workspace = true
serde_json.workspace = true
tracing.workspace = true
prost-wkt-types.workspace = true
tokio.workspace = true
Expand All @@ -27,6 +26,8 @@ sha2.workspace = true
crc32fast.workspace = true
base16ct.workspace = true
num_cpus = "1.0"
bincode = "1.3.3"
rayon = "1.10.0"

[dev-dependencies]
tempdir = "0.3"
5 changes: 0 additions & 5 deletions dragonfly-client-storage/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -373,11 +373,6 @@ impl Storage {
self.metadata.get_piece(task_id, number)
}

// get_pieces returns the piece metadatas.
pub fn get_pieces(&self, task_id: &str) -> Result<Vec<metadata::Piece>> {
self.metadata.get_pieces(task_id)
}

// piece_id returns the piece id.
pub fn piece_id(&self, task_id: &str, number: u32) -> String {
self.metadata.piece_id(task_id, number)
Expand Down
68 changes: 38 additions & 30 deletions dragonfly-client-storage/src/metadata.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ use chrono::{NaiveDateTime, Utc};
use dragonfly_client_config::dfdaemon::Config;
use dragonfly_client_core::{Error, Result};
use dragonfly_client_util::http::reqwest_headermap_to_hashmap;
use rayon::prelude::*;
use reqwest::header::HeaderMap;
use serde::{Deserialize, Serialize};
use std::collections::HashMap;
Expand Down Expand Up @@ -500,8 +501,19 @@ impl<E: StorageEngineOwned> Metadata<E> {

// get_tasks gets the task metadatas.
pub fn get_tasks(&self) -> Result<Vec<Task>> {
let iter = self.db.iter::<Task>()?;
iter.map(|ele| ele.map(|(_, task)| task)).collect()
let tasks = self
.db
.iter_raw::<Task>()?
.map(|ele| {
let (_, value) = ele?;
Ok(value)
})
.collect::<Result<Vec<Box<[u8]>>>>()?;

tasks
.par_iter()
.map(|task| Task::deserialize_from(task))
.collect()
}

// delete_task deletes the task metadata.
Expand Down Expand Up @@ -783,12 +795,6 @@ impl<E: StorageEngineOwned> Metadata<E> {
self.db.get(self.piece_id(task_id, number).as_bytes())
}

// get_pieces gets the piece metadatas.
pub fn get_pieces(&self, task_id: &str) -> Result<Vec<Piece>> {
let iter = self.db.prefix_iter::<Piece>(task_id.as_bytes())?;
iter.map(|ele| ele.map(|(_, piece)| piece)).collect()
}

// delete_piece deletes the piece metadata.
pub fn delete_piece(&self, task_id: &str, number: u32) -> Result<()> {
info!("delete piece metadata {}", self.piece_id(task_id, number));
Expand All @@ -798,17 +804,29 @@ impl<E: StorageEngineOwned> Metadata<E> {

// delete_pieces deletes the piece metadatas.
pub fn delete_pieces(&self, task_id: &str) -> Result<()> {
let iter = self.db.prefix_iter::<Piece>(task_id.as_bytes())?;
for ele in iter {
let (key, _) = ele?;

info!(
"delete piece metadata {}",
std::str::from_utf8(&key).unwrap_or_default().to_string()
);
self.db.delete::<Piece>(&key)?;
}

let piece_ids = self
.db
.prefix_iter_raw::<Piece>(task_id.as_bytes())?
.map(|ele| {
let (key, _) = ele?;
Ok(key)
})
.collect::<Result<Vec<Box<[u8]>>>>()?;

let piece_ids_refs = piece_ids
.par_iter()
.map(|id| {
let id_ref = id.as_ref();
info!(
"delete piece metadata {} in batch",
std::str::from_utf8(id_ref).unwrap_or_default(),
);

id_ref
})
.collect::<Vec<&[u8]>>();

self.db.batch_delete::<Piece>(piece_ids_refs)?;
Ok(())
}

Expand Down Expand Up @@ -849,7 +867,6 @@ mod tests {
let log_dir = dir.path().join("log");
let metadata = Metadata::new(Arc::new(Config::default()), dir.path(), &log_dir).unwrap();
assert!(metadata.get_tasks().unwrap().is_empty());
assert!(metadata.get_pieces("task").unwrap().is_empty());
}

#[test]
Expand Down Expand Up @@ -960,13 +977,9 @@ mod tests {
"piece should be updated after download_piece_finished"
);

// Test get_pieces.
// Test download_piece_failed.
metadata.download_piece_started(task_id, 2).unwrap();
metadata.download_piece_started(task_id, 3).unwrap();
let pieces = metadata.get_pieces(task_id).unwrap();
assert_eq!(pieces.len(), 3, "should get 3 pieces in total");

// Test download_piece_failed.
metadata.download_piece_failed(task_id, 2).unwrap();
let piece = metadata.get_piece(task_id, 2).unwrap();
assert!(
Expand Down Expand Up @@ -1002,10 +1015,5 @@ mod tests {
piece.uploading_count, 0,
"piece should be updated after upload_piece_failed"
);

// Test delete_pieces.
metadata.delete_pieces(task_id).unwrap();
let pieces = metadata.get_pieces(task_id).unwrap();
assert!(pieces.is_empty(), "should get 0 pieces after delete_pieces");
}
}
18 changes: 16 additions & 2 deletions dragonfly-client-storage/src/storage_engine/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,12 +31,12 @@ pub trait DatabaseObject: Serialize + DeserializeOwned {

/// serialized serializes the object to bytes.
fn serialized(&self) -> Result<Vec<u8>> {
Ok(serde_json::to_vec(self).or_err(ErrorType::SerializeError)?)
Ok(bincode::serialize(self).or_err(ErrorType::SerializeError)?)
}

/// deserialize_from deserializes the object from bytes.
fn deserialize_from(bytes: &[u8]) -> Result<Self> {
Ok(serde_json::from_slice(bytes).or_err(ErrorType::SerializeError)?)
Ok(bincode::deserialize(bytes).or_err(ErrorType::SerializeError)?)
}
}

Expand All @@ -61,9 +61,23 @@ pub trait Operations {
/// iter iterates all objects.
fn iter<O: DatabaseObject>(&self) -> Result<impl Iterator<Item = Result<(Box<[u8]>, O)>>>;

/// iter_raw iterates all objects without serialization.
fn iter_raw<O: DatabaseObject>(
&self,
) -> Result<impl Iterator<Item = Result<(Box<[u8]>, Box<[u8]>)>>>;

/// prefix_iter iterates all objects with prefix.
fn prefix_iter<O: DatabaseObject>(
&self,
prefix: &[u8],
) -> Result<impl Iterator<Item = Result<(Box<[u8]>, O)>>>;

/// prefix_iter_raw iterates all objects with prefix without serialization.
fn prefix_iter_raw<O: DatabaseObject>(
&self,
prefix: &[u8],
) -> Result<impl Iterator<Item = Result<(Box<[u8]>, Box<[u8]>)>>>;

// batch_delete deletes objects by keys.
fn batch_delete<O: DatabaseObject>(&self, keys: Vec<&[u8]>) -> Result<()>;
}
Loading

0 comments on commit 71efbe9

Please sign in to comment.