From 71efbe9abd5344ca4fddf86be7bf8c142445e642 Mon Sep 17 00:00:00 2001 From: Gaius Date: Wed, 14 Aug 2024 21:14:42 +0800 Subject: [PATCH] feat: optimize gc with rocksdb (#668) Signed-off-by: Gaius --- Cargo.lock | 34 ++++++---- Cargo.toml | 17 +++-- dragonfly-client-storage/Cargo.toml | 3 +- dragonfly-client-storage/src/lib.rs | 5 -- dragonfly-client-storage/src/metadata.rs | 68 +++++++++++-------- .../src/storage_engine/mod.rs | 18 ++++- .../src/storage_engine/rocksdb.rs | 61 ++++++++++++++--- dragonfly-client/Cargo.toml | 2 +- dragonfly-client/src/resource/piece.rs | 6 -- 9 files changed, 139 insertions(+), 75 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index d59b48f8..222acda4 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -548,6 +548,15 @@ dependencies = [ "regex", ] +[[package]] +name = "bincode" +version = "1.3.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b1f45e9417d87227c7a56d22e471c6206462cba514c7590c09aff4cf6d1ddcad" +dependencies = [ + "serde", +] + [[package]] name = "bindgen" version = "0.69.4" @@ -1120,7 +1129,7 @@ dependencies = [ [[package]] name = "dragonfly-client" -version = "0.1.99" +version = "0.1.100" dependencies = [ "anyhow", "blake3", @@ -1190,7 +1199,7 @@ dependencies = [ [[package]] name = "dragonfly-client-backend" -version = "0.1.99" +version = "0.1.100" dependencies = [ "dragonfly-api", "dragonfly-client-core", @@ -1212,7 +1221,7 @@ dependencies = [ [[package]] name = "dragonfly-client-config" -version = "0.1.99" +version = "0.1.100" dependencies = [ "bytesize", "bytesize-serde", @@ -1233,7 +1242,7 @@ dependencies = [ [[package]] name = "dragonfly-client-core" -version = "0.1.99" +version = "0.1.100" dependencies = [ "hyper 1.4.1", "hyper-util", @@ -1248,7 +1257,7 @@ dependencies = [ [[package]] name = "dragonfly-client-init" -version = "0.1.99" +version = "0.1.100" dependencies = [ "anyhow", "clap", @@ -1264,9 +1273,10 @@ dependencies = [ [[package]] name = "dragonfly-client-storage" -version = "0.1.99" +version = "0.1.100" dependencies = [ "base16ct", + "bincode", "chrono", "crc32fast", "dragonfly-api", @@ -1275,10 +1285,10 @@ dependencies = [ "dragonfly-client-util", "num_cpus", "prost-wkt-types", + "rayon", "reqwest", "rocksdb", "serde", - "serde_json", "sha2", "tempdir", "tokio", @@ -1288,7 +1298,7 @@ dependencies = [ [[package]] name = "dragonfly-client-util" -version = "0.1.99" +version = "0.1.100" dependencies = [ "base16ct", "blake3", @@ -1758,7 +1768,7 @@ dependencies = [ [[package]] name = "hdfs" -version = "0.1.99" +version = "0.1.100" dependencies = [ "dragonfly-client-backend", "dragonfly-client-core", @@ -1977,7 +1987,7 @@ dependencies = [ "httpdate", "itoa", "pin-project-lite", - "socket2 0.4.10", + "socket2 0.5.5", "tokio", "tower-service", "tracing", @@ -3668,9 +3678,9 @@ dependencies = [ [[package]] name = "rayon" -version = "1.8.1" +version = "1.10.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "fa7237101a77a10773db45d62004a272517633fbcc3df19d96455ede1122e051" +checksum = "b418a60154510ca1a002a752ca9714984e21e4241e804d32555251faf8b78ffa" dependencies = [ "either", "rayon-core", diff --git a/Cargo.toml b/Cargo.toml index b55b2f79..64674b32 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -12,7 +12,7 @@ members = [ ] [workspace.package] -version = "0.1.99" +version = "0.1.100" authors = ["The Dragonfly Developers"] homepage = "https://d7y.io/" repository = "https://github.com/dragonflyoss/client.git" @@ -22,13 +22,13 @@ readme = "README.md" edition = "2021" [workspace.dependencies] -dragonfly-client = { path = "dragonfly-client", version = "0.1.99" } -dragonfly-client-core = { path = "dragonfly-client-core", version = "0.1.99" } -dragonfly-client-config = { path = "dragonfly-client-config", version = "0.1.99" } -dragonfly-client-storage = { path = "dragonfly-client-storage", version = "0.1.99" } -dragonfly-client-backend = { path = "dragonfly-client-backend", version = "0.1.99" } -dragonfly-client-util = { path = "dragonfly-client-util", version = "0.1.99" } -dragonfly-client-init = { path = "dragonfly-client-init", version = "0.1.99" } +dragonfly-client = { path = "dragonfly-client", version = "0.1.100" } +dragonfly-client-core = { path = "dragonfly-client-core", version = "0.1.100" } +dragonfly-client-config = { path = "dragonfly-client-config", version = "0.1.100" } +dragonfly-client-storage = { path = "dragonfly-client-storage", version = "0.1.100" } +dragonfly-client-backend = { path = "dragonfly-client-backend", version = "0.1.100" } +dragonfly-client-util = { path = "dragonfly-client-util", version = "0.1.100" } +dragonfly-client-init = { path = "dragonfly-client-init", version = "0.1.100" } thiserror = "1.0" dragonfly-api = "2.0.148" reqwest = { version = "0.12.4", features = ["stream", "native-tls", "default-tls", "rustls-tls"] } @@ -57,7 +57,6 @@ hex = "0.4" rocksdb = "0.22.0" serde = { version = "1.0", features = ["derive"] } serde_yaml = "0.9" -serde_json = "1.0" http = "1" tonic = { version = "0.12.1", features = ["zstd"] } tokio = { version = "1.39.2", features = ["full"] } diff --git a/dragonfly-client-storage/Cargo.toml b/dragonfly-client-storage/Cargo.toml index ac80db2c..41191762 100644 --- a/dragonfly-client-storage/Cargo.toml +++ b/dragonfly-client-storage/Cargo.toml @@ -18,7 +18,6 @@ chrono.workspace = true reqwest.workspace = true rocksdb.workspace = true serde.workspace = true -serde_json.workspace = true tracing.workspace = true prost-wkt-types.workspace = true tokio.workspace = true @@ -27,6 +26,8 @@ sha2.workspace = true crc32fast.workspace = true base16ct.workspace = true num_cpus = "1.0" +bincode = "1.3.3" +rayon = "1.10.0" [dev-dependencies] tempdir = "0.3" diff --git a/dragonfly-client-storage/src/lib.rs b/dragonfly-client-storage/src/lib.rs index 8cb93408..5cb8f244 100644 --- a/dragonfly-client-storage/src/lib.rs +++ b/dragonfly-client-storage/src/lib.rs @@ -373,11 +373,6 @@ impl Storage { self.metadata.get_piece(task_id, number) } - // get_pieces returns the piece metadatas. - pub fn get_pieces(&self, task_id: &str) -> Result> { - self.metadata.get_pieces(task_id) - } - // piece_id returns the piece id. pub fn piece_id(&self, task_id: &str, number: u32) -> String { self.metadata.piece_id(task_id, number) diff --git a/dragonfly-client-storage/src/metadata.rs b/dragonfly-client-storage/src/metadata.rs index ef931b39..e66482a0 100644 --- a/dragonfly-client-storage/src/metadata.rs +++ b/dragonfly-client-storage/src/metadata.rs @@ -18,6 +18,7 @@ use chrono::{NaiveDateTime, Utc}; use dragonfly_client_config::dfdaemon::Config; use dragonfly_client_core::{Error, Result}; use dragonfly_client_util::http::reqwest_headermap_to_hashmap; +use rayon::prelude::*; use reqwest::header::HeaderMap; use serde::{Deserialize, Serialize}; use std::collections::HashMap; @@ -500,8 +501,19 @@ impl Metadata { // get_tasks gets the task metadatas. pub fn get_tasks(&self) -> Result> { - let iter = self.db.iter::()?; - iter.map(|ele| ele.map(|(_, task)| task)).collect() + let tasks = self + .db + .iter_raw::()? + .map(|ele| { + let (_, value) = ele?; + Ok(value) + }) + .collect::>>>()?; + + tasks + .par_iter() + .map(|task| Task::deserialize_from(task)) + .collect() } // delete_task deletes the task metadata. @@ -783,12 +795,6 @@ impl Metadata { self.db.get(self.piece_id(task_id, number).as_bytes()) } - // get_pieces gets the piece metadatas. - pub fn get_pieces(&self, task_id: &str) -> Result> { - let iter = self.db.prefix_iter::(task_id.as_bytes())?; - iter.map(|ele| ele.map(|(_, piece)| piece)).collect() - } - // delete_piece deletes the piece metadata. pub fn delete_piece(&self, task_id: &str, number: u32) -> Result<()> { info!("delete piece metadata {}", self.piece_id(task_id, number)); @@ -798,17 +804,29 @@ impl Metadata { // delete_pieces deletes the piece metadatas. pub fn delete_pieces(&self, task_id: &str) -> Result<()> { - let iter = self.db.prefix_iter::(task_id.as_bytes())?; - for ele in iter { - let (key, _) = ele?; - - info!( - "delete piece metadata {}", - std::str::from_utf8(&key).unwrap_or_default().to_string() - ); - self.db.delete::(&key)?; - } - + let piece_ids = self + .db + .prefix_iter_raw::(task_id.as_bytes())? + .map(|ele| { + let (key, _) = ele?; + Ok(key) + }) + .collect::>>>()?; + + let piece_ids_refs = piece_ids + .par_iter() + .map(|id| { + let id_ref = id.as_ref(); + info!( + "delete piece metadata {} in batch", + std::str::from_utf8(id_ref).unwrap_or_default(), + ); + + id_ref + }) + .collect::>(); + + self.db.batch_delete::(piece_ids_refs)?; Ok(()) } @@ -849,7 +867,6 @@ mod tests { let log_dir = dir.path().join("log"); let metadata = Metadata::new(Arc::new(Config::default()), dir.path(), &log_dir).unwrap(); assert!(metadata.get_tasks().unwrap().is_empty()); - assert!(metadata.get_pieces("task").unwrap().is_empty()); } #[test] @@ -960,13 +977,9 @@ mod tests { "piece should be updated after download_piece_finished" ); - // Test get_pieces. + // Test download_piece_failed. metadata.download_piece_started(task_id, 2).unwrap(); metadata.download_piece_started(task_id, 3).unwrap(); - let pieces = metadata.get_pieces(task_id).unwrap(); - assert_eq!(pieces.len(), 3, "should get 3 pieces in total"); - - // Test download_piece_failed. metadata.download_piece_failed(task_id, 2).unwrap(); let piece = metadata.get_piece(task_id, 2).unwrap(); assert!( @@ -1002,10 +1015,5 @@ mod tests { piece.uploading_count, 0, "piece should be updated after upload_piece_failed" ); - - // Test delete_pieces. - metadata.delete_pieces(task_id).unwrap(); - let pieces = metadata.get_pieces(task_id).unwrap(); - assert!(pieces.is_empty(), "should get 0 pieces after delete_pieces"); } } diff --git a/dragonfly-client-storage/src/storage_engine/mod.rs b/dragonfly-client-storage/src/storage_engine/mod.rs index be4c7eeb..c73d9da0 100644 --- a/dragonfly-client-storage/src/storage_engine/mod.rs +++ b/dragonfly-client-storage/src/storage_engine/mod.rs @@ -31,12 +31,12 @@ pub trait DatabaseObject: Serialize + DeserializeOwned { /// serialized serializes the object to bytes. fn serialized(&self) -> Result> { - Ok(serde_json::to_vec(self).or_err(ErrorType::SerializeError)?) + Ok(bincode::serialize(self).or_err(ErrorType::SerializeError)?) } /// deserialize_from deserializes the object from bytes. fn deserialize_from(bytes: &[u8]) -> Result { - Ok(serde_json::from_slice(bytes).or_err(ErrorType::SerializeError)?) + Ok(bincode::deserialize(bytes).or_err(ErrorType::SerializeError)?) } } @@ -61,9 +61,23 @@ pub trait Operations { /// iter iterates all objects. fn iter(&self) -> Result, O)>>>; + /// iter_raw iterates all objects without serialization. + fn iter_raw( + &self, + ) -> Result, Box<[u8]>)>>>; + /// prefix_iter iterates all objects with prefix. fn prefix_iter( &self, prefix: &[u8], ) -> Result, O)>>>; + + /// prefix_iter_raw iterates all objects with prefix without serialization. + fn prefix_iter_raw( + &self, + prefix: &[u8], + ) -> Result, Box<[u8]>)>>>; + + // batch_delete deletes objects by keys. + fn batch_delete(&self, keys: Vec<&[u8]>) -> Result<()>; } diff --git a/dragonfly-client-storage/src/storage_engine/rocksdb.rs b/dragonfly-client-storage/src/storage_engine/rocksdb.rs index fa3006f8..29709052 100644 --- a/dragonfly-client-storage/src/storage_engine/rocksdb.rs +++ b/dragonfly-client-storage/src/storage_engine/rocksdb.rs @@ -48,17 +48,17 @@ impl RocksdbStorageEngine { /// DEFAULT_DIR_NAME is the default directory name to store metadata. const DEFAULT_DIR_NAME: &'static str = "metadata"; - /// DEFAULT_MEMTABLE_MEMORY_BUDGET is the default memory budget for memtable, default is 256MB. - const DEFAULT_MEMTABLE_MEMORY_BUDGET: usize = 256 * 1024 * 1024; + /// DEFAULT_MEMTABLE_MEMORY_BUDGET is the default memory budget for memtable, default is 512MB. + const DEFAULT_MEMTABLE_MEMORY_BUDGET: usize = 512 * 1024 * 1024; - /// DEFAULT_MAX_OPEN_FILES is the default max open files for rocksdb. - const DEFAULT_MAX_OPEN_FILES: i32 = 10_000; + // DEFAULT_MAX_BACKGROUND_JOBS is the default max background jobs for rocksdb, default is 2. + const DEFAULT_MAX_BACKGROUND_JOBS: i32 = 2; - /// DEFAULT_BLOCK_SIZE is the default block size for rocksdb. - const DEFAULT_BLOCK_SIZE: usize = 64 * 1024; + /// DEFAULT_BLOCK_SIZE is the default block size for rocksdb, default is 128KB. + const DEFAULT_BLOCK_SIZE: usize = 128 * 1024; - /// DEFAULT_CACHE_SIZE is the default cache size for rocksdb. - const DEFAULT_CACHE_SIZE: usize = 32 * 1024 * 1024; + /// DEFAULT_CACHE_SIZE is the default cache size for rocksdb, default is 512MB. + const DEFAULT_CACHE_SIZE: usize = 512 * 1024 * 1024; // DEFAULT_LOG_MAX_SIZE is the default max log size for rocksdb, default is 64MB. const DEFAULT_LOG_MAX_SIZE: usize = 64 * 1024 * 1024; @@ -75,7 +75,11 @@ impl RocksdbStorageEngine { options.create_missing_column_families(true); options.optimize_level_style_compaction(Self::DEFAULT_MEMTABLE_MEMORY_BUDGET); options.increase_parallelism(num_cpus::get() as i32); - options.set_max_open_files(Self::DEFAULT_MAX_OPEN_FILES); + options.set_compression_type(rocksdb::DBCompressionType::Lz4); + options.set_max_background_jobs(std::cmp::max( + num_cpus::get() as i32 / 2, + Self::DEFAULT_MAX_BACKGROUND_JOBS, + )); // Set rocksdb log options. options.set_db_log_dir(log_dir); @@ -156,6 +160,19 @@ impl Operations for RocksdbStorageEngine { })) } + // iter_raw iterates all objects without serialization. + fn iter_raw( + &self, + ) -> Result, Box<[u8]>)>>> { + let cf = cf_handle::(self)?; + Ok(self + .iterator_cf(cf, rocksdb::IteratorMode::Start) + .map(|ele| { + let (key, value) = ele.or_err(ErrorType::StorageError)?; + Ok((key, value)) + })) + } + // prefix_iter iterates all objects with prefix. fn prefix_iter( &self, @@ -168,6 +185,32 @@ impl Operations for RocksdbStorageEngine { Ok((key, O::deserialize_from(&value)?)) })) } + + // prefix_iter_raw iterates all objects with prefix without serialization. + fn prefix_iter_raw( + &self, + prefix: &[u8], + ) -> Result, Box<[u8]>)>>> { + let cf = cf_handle::(self)?; + Ok(self.prefix_iterator_cf(cf, prefix).map(|ele| { + let (key, value) = ele.or_err(ErrorType::StorageError)?; + Ok((key, value)) + })) + } + + fn batch_delete(&self, keys: Vec<&[u8]>) -> Result<()> { + let cf = cf_handle::(self)?; + let mut batch = rocksdb::WriteBatch::default(); + for key in keys { + batch.delete_cf(cf, key); + } + + let mut options = WriteOptions::default(); + options.set_sync(true); + Ok(self + .write_opt(batch, &options) + .or_err(ErrorType::StorageError)?) + } } // RocksdbStorageEngine implements the rocksdb of the storage engine. diff --git a/dragonfly-client/Cargo.toml b/dragonfly-client/Cargo.toml index cf10f4ed..152caf41 100644 --- a/dragonfly-client/Cargo.toml +++ b/dragonfly-client/Cargo.toml @@ -42,7 +42,6 @@ tracing.workspace = true validator.workspace = true humantime.workspace = true serde.workspace = true -serde_json.workspace = true chrono.workspace = true prost-wkt-types.workspace = true tokio.workspace = true @@ -62,6 +61,7 @@ blake3.workspace = true bytesize.workspace = true uuid.workspace = true percent-encoding.workspace = true +serde_json = "1.0" tracing-log = "0.2" tracing-subscriber = { version = "0.3", features = ["env-filter", "time", "chrono"] } tracing-appender = "0.2.3" diff --git a/dragonfly-client/src/resource/piece.rs b/dragonfly-client/src/resource/piece.rs index 0c216bd0..a25bf4db 100644 --- a/dragonfly-client/src/resource/piece.rs +++ b/dragonfly-client/src/resource/piece.rs @@ -113,12 +113,6 @@ impl Piece { self.storage.get_piece(task_id, number) } - // get_all gets all pieces from the local storage. - #[instrument(skip_all)] - pub fn get_all(&self, task_id: &str) -> Result> { - self.storage.get_pieces(task_id) - } - // calculate_interested calculates the interested pieces by content_length and range. #[instrument(skip_all)] pub fn calculate_interested(