From 1b0aa584bbb33a0b7005dfb046040cb5f4abbd16 Mon Sep 17 00:00:00 2001 From: Paul Masurel Date: Thu, 17 Oct 2024 15:04:33 +0900 Subject: [PATCH] Optionally prepending s3 compatible storage's key with a hash of the key. The point is to workaround S3 rate limiting. Since it is based on keys, our ULID naming scheme can lead to hotspot in the keyspace. This solution has a downside. External scripts listing files will have a their job multiplied. For this reason, the prefix cardinality is configurable. Closes #4824 --- quickwit/Cargo.lock | 1 + quickwit/Cargo.toml | 1 + .../quickwit-config/src/storage_config.rs | 36 +++++++- quickwit/quickwit-storage/Cargo.toml | 1 + .../object_storage/s3_compatible_storage.rs | 89 +++++++++++++++---- 5 files changed, 110 insertions(+), 18 deletions(-) diff --git a/quickwit/Cargo.lock b/quickwit/Cargo.lock index 721160f1ed4..735dc321972 100644 --- a/quickwit/Cargo.lock +++ b/quickwit/Cargo.lock @@ -6577,6 +6577,7 @@ dependencies = [ "lru", "md5", "mockall", + "murmurhash32", "once_cell", "opendal", "pin-project", diff --git a/quickwit/Cargo.toml b/quickwit/Cargo.toml index 6fcf67c2e10..4e34668f3a0 100644 --- a/quickwit/Cargo.toml +++ b/quickwit/Cargo.toml @@ -152,6 +152,7 @@ matches = "0.1.9" md5 = "0.7" mime_guess = "2.0.4" mockall = "0.11" +murmurhash32 = "0.3" mrecordlog = { git = "https://github.com/quickwit-oss/mrecordlog", rev = "306c0a7" } new_string_template = "1.5.1" nom = "7.1.3" diff --git a/quickwit/quickwit-config/src/storage_config.rs b/quickwit/quickwit-config/src/storage_config.rs index 651271d0c61..4d6da03f2b7 100644 --- a/quickwit/quickwit-config/src/storage_config.rs +++ b/quickwit/quickwit-config/src/storage_config.rs @@ -26,6 +26,7 @@ use itertools::Itertools; use quickwit_common::get_bool_from_env; use serde::{Deserialize, Serialize}; use serde_with::{serde_as, EnumMap}; +use tracing::warn; /// Lists the storage backends supported by Quickwit. #[derive(Debug, Clone, Copy, Eq, PartialEq, Ord, PartialOrd, Hash, Serialize, Deserialize)] @@ -93,6 +94,9 @@ impl StorageConfigs { } pub fn validate(&self) -> anyhow::Result<()> { + for storage_config in self.0.iter() { + storage_config.validate()?; + } let backends: Vec = self .0 .iter() @@ -216,6 +220,16 @@ impl StorageConfig { _ => None, } } + + pub fn validate(&self) -> anyhow::Result<()> { + if let StorageConfig::S3(config) = self { + config.validate() + } else { + Ok(()) + } + + } + } impl From for StorageConfig { @@ -313,6 +327,9 @@ impl fmt::Debug for AzureStorageConfig { } } + +const MAX_S3_HASH_PREFIX_CARDINALITY: usize = 16usize.pow(3); + #[derive(Clone, Default, Eq, PartialEq, Serialize, Deserialize)] #[serde(deny_unknown_fields)] pub struct S3StorageConfig { @@ -334,9 +351,26 @@ pub struct S3StorageConfig { pub disable_multi_object_delete: bool, #[serde(default)] pub disable_multipart_upload: bool, + #[serde(default)] + #[serde(skip_serializing_if = "lower_than_2")] + pub hash_prefix_cardinality: usize, +} + +fn lower_than_2(n: &usize) -> bool { + *n < 2 } impl S3StorageConfig { + fn validate(&self) -> anyhow::Result<()> { + if self.hash_prefix_cardinality == 1 { + warn!("A hash prefix of 1 will be ignored."); + } + if self.hash_prefix_cardinality > MAX_S3_HASH_PREFIX_CARDINALITY { + anyhow::bail!("hash_prefix_cardinality can take values of at most {MAX_S3_HASH_PREFIX_CARDINALITY}, currently set to {}", self.hash_prefix_cardinality); + } + Ok(()) + } + fn apply_flavor(&mut self) { match self.flavor { Some(StorageBackendFlavor::DigitalOcean) => { @@ -383,7 +417,7 @@ impl S3StorageConfig { } impl fmt::Debug for S3StorageConfig { - fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { f.debug_struct("S3StorageConfig") .field("access_key_id", &self.access_key_id) .field( diff --git a/quickwit/quickwit-storage/Cargo.toml b/quickwit/quickwit-storage/Cargo.toml index c883c61c265..385ec152683 100644 --- a/quickwit/quickwit-storage/Cargo.toml +++ b/quickwit/quickwit-storage/Cargo.toml @@ -26,6 +26,7 @@ once_cell = { workspace = true } pin-project = { workspace = true } rand = { workspace = true } regex = { workspace = true } +murmurhash32 = { workspace = true } serde = { workspace = true } serde_json = { workspace = true } tantivy = { workspace = true } diff --git a/quickwit/quickwit-storage/src/object_storage/s3_compatible_storage.rs b/quickwit/quickwit-storage/src/object_storage/s3_compatible_storage.rs index 86ef692c671..88b0470bd34 100644 --- a/quickwit/quickwit-storage/src/object_storage/s3_compatible_storage.rs +++ b/quickwit/quickwit-storage/src/object_storage/s3_compatible_storage.rs @@ -86,11 +86,13 @@ pub struct S3CompatibleObjectStorage { s3_client: S3Client, uri: Uri, bucket: String, - prefix: PathBuf, + prefix: String, multipart_policy: MultiPartPolicy, retry_params: RetryParams, disable_multi_object_delete: bool, disable_multipart_upload: bool, + // If 0, we don't have any prefix + hash_prefix_cardinality: usize, } impl fmt::Debug for S3CompatibleObjectStorage { @@ -99,6 +101,7 @@ impl fmt::Debug for S3CompatibleObjectStorage { .debug_struct("S3CompatibleObjectStorage") .field("bucket", &self.bucket) .field("prefix", &self.prefix) + .field("hash_prefix_cardinality", &self.hash_prefix_cardinality) .finish() } } @@ -181,11 +184,12 @@ impl S3CompatibleObjectStorage { s3_client, uri: uri.clone(), bucket, - prefix, + prefix: prefix.to_string_lossy().to_string(), multipart_policy: MultiPartPolicy::default(), retry_params, disable_multi_object_delete, disable_multipart_upload, + hash_prefix_cardinality: s3_storage_config.hash_prefix_cardinality, }) } @@ -193,7 +197,7 @@ impl S3CompatibleObjectStorage { /// /// This method overrides any existing prefix. (It does NOT /// append the argument to any existing prefix.) - pub fn with_prefix(self, prefix: PathBuf) -> Self { + pub fn with_prefix(self, prefix: String) -> Self { Self { s3_client: self.s3_client, uri: self.uri, @@ -203,6 +207,7 @@ impl S3CompatibleObjectStorage { retry_params: self.retry_params, disable_multi_object_delete: self.disable_multi_object_delete, disable_multipart_upload: self.disable_multipart_upload, + hash_prefix_cardinality: self.hash_prefix_cardinality, } } @@ -262,12 +267,47 @@ async fn compute_md5(mut read: T) -> io::Resu } } } +const HEX_ALPHABET: [u8; 16] = *b"0123456789abcdef"; +const UNINITIALIZED_HASH_PREFIX: &str = "00000000"; + +fn build_key(prefix: &str, relative_path: &str, hash_prefix_cardinality: usize) -> String { + let mut key = String::with_capacity(UNINITIALIZED_HASH_PREFIX.len() + 1 + prefix.len() + 1 + relative_path.len()); + if hash_prefix_cardinality > 1 { + key.push_str(UNINITIALIZED_HASH_PREFIX); + key.push('/'); + } + key.push_str(prefix); + if key.as_bytes().last().copied() != Some(b'/') { + key.push('/'); + } + key.push_str(relative_path); + // We then set up the prefix. + if hash_prefix_cardinality > 1 { + let key_without_prefix = &key.as_bytes()[UNINITIALIZED_HASH_PREFIX.len() + 1..]; + let mut prefix_hash: usize = + murmurhash32::murmurhash3(key_without_prefix) as usize % hash_prefix_cardinality; + unsafe { + let prefix_buf: &mut [u8] = &mut key.as_bytes_mut()[..UNINITIALIZED_HASH_PREFIX.len()]; + for prefix_byte in prefix_buf.iter_mut() { + let hex: u8 = HEX_ALPHABET[(prefix_hash % 16) as usize]; + *prefix_byte = hex; + if prefix_hash < 16 { + break; + } + prefix_hash /= 16; + } + } + } + key +} impl S3CompatibleObjectStorage { fn key(&self, relative_path: &Path) -> String { - // FIXME: This may not work on Windows. - let key_path = self.prefix.join(relative_path); - key_path.to_string_lossy().to_string() + build_key( + &self.prefix, + relative_path.to_string_lossy().as_ref(), + self.hash_prefix_cardinality, + ) } fn relative_path(&self, key: &str) -> PathBuf { @@ -945,13 +985,13 @@ mod tests { let s3_client = S3Client::new(&sdk_config); let uri = Uri::for_test("s3://bucket/indexes"); let bucket = "bucket".to_string(); - let prefix = PathBuf::new(); let mut s3_storage = S3CompatibleObjectStorage { s3_client, uri, bucket, - prefix, + prefix: String::new(), + hash_prefix_cardinality: 0, multipart_policy: MultiPartPolicy::default(), retry_params: RetryParams::for_test(), disable_multi_object_delete: false, @@ -962,7 +1002,7 @@ mod tests { PathBuf::from("indexes/foo") ); - s3_storage.prefix = PathBuf::from("indexes"); + s3_storage.prefix = "indexes".to_string(); assert_eq!( s3_storage.relative_path("indexes/foo"), @@ -1000,13 +1040,13 @@ mod tests { let s3_client = S3Client::from_conf(config); let uri = Uri::for_test("s3://bucket/indexes"); let bucket = "bucket".to_string(); - let prefix = PathBuf::new(); let s3_storage = S3CompatibleObjectStorage { s3_client, uri, bucket, - prefix, + prefix: String::new(), + hash_prefix_cardinality: 0, multipart_policy: MultiPartPolicy::default(), retry_params: RetryParams::for_test(), disable_multi_object_delete: true, @@ -1041,13 +1081,13 @@ mod tests { let s3_client = S3Client::from_conf(config); let uri = Uri::for_test("s3://bucket/indexes"); let bucket = "bucket".to_string(); - let prefix = PathBuf::new(); let s3_storage = S3CompatibleObjectStorage { s3_client, uri, bucket, - prefix, + prefix: String::new(), + hash_prefix_cardinality: 0, multipart_policy: MultiPartPolicy::default(), retry_params: RetryParams::for_test(), disable_multi_object_delete: false, @@ -1123,13 +1163,13 @@ mod tests { let s3_client = S3Client::from_conf(config); let uri = Uri::for_test("s3://bucket/indexes"); let bucket = "bucket".to_string(); - let prefix = PathBuf::new(); let s3_storage = S3CompatibleObjectStorage { s3_client, uri, bucket, - prefix, + prefix: String::new(), + hash_prefix_cardinality: 0, multipart_policy: MultiPartPolicy::default(), retry_params: RetryParams::for_test(), disable_multi_object_delete: false, @@ -1216,13 +1256,13 @@ mod tests { let s3_client = S3Client::from_conf(config); let uri = Uri::for_test("s3://bucket/indexes"); let bucket = "bucket".to_string(); - let prefix = PathBuf::new(); let s3_storage = S3CompatibleObjectStorage { s3_client, uri, bucket, - prefix, + prefix: String::new(), + hash_prefix_cardinality: 0, multipart_policy: MultiPartPolicy::default(), retry_params: RetryParams::for_test(), disable_multi_object_delete: false, @@ -1233,4 +1273,19 @@ mod tests { .await .unwrap(); } + + #[test] + fn test_build_key() { + assert_eq!(build_key("hello", "coucou", 0), "hello/coucou"); + assert_eq!(build_key("hello/", "coucou", 0), "hello/coucou"); + assert_eq!(build_key("hello/", "coucou", 1), "hello/coucou"); + assert_eq!(build_key("hello", "coucou", 1), "hello/coucou"); + assert_eq!(build_key("hello/", "coucou", 2), "10000000/hello/coucou"); + assert_eq!(build_key("hello", "coucou", 2), "10000000/hello/coucou"); + assert_eq!(build_key("hello/", "coucou", 16), "d0000000/hello/coucou"); + assert_eq!(build_key("hello", "coucou", 16), "d0000000/hello/coucou"); + assert_eq!(build_key("hello/", "coucou", 17), "50000000/hello/coucou"); + assert_eq!(build_key("hello", "coucou", 17), "50000000/hello/coucou"); + assert_eq!(build_key("hello/", "coucou", 70), "f0000000/hello/coucou"); + } }