diff --git a/quickwit/quickwit-indexing/src/actors/indexing_service.rs b/quickwit/quickwit-indexing/src/actors/indexing_service.rs index f9c1c66397f..09530277194 100644 --- a/quickwit/quickwit-indexing/src/actors/indexing_service.rs +++ b/quickwit/quickwit-indexing/src/actors/indexing_service.rs @@ -147,10 +147,10 @@ impl IndexingService { storage_resolver: StorageResolver, event_broker: EventBroker, ) -> anyhow::Result { - let split_store_space_quota = SplitStoreQuota::new( + let split_store_space_quota = SplitStoreQuota::try_new( indexer_config.split_store_max_num_splits, indexer_config.split_store_max_num_bytes, - ); + )?; let merge_io_throughput_limiter_opt = indexer_config.max_merge_write_throughput.map(io::limiter); let split_cache_dir_path = get_cache_directory_path(&data_dir_path); diff --git a/quickwit/quickwit-indexing/src/split_store/indexing_split_cache.rs b/quickwit/quickwit-indexing/src/split_store/indexing_split_cache.rs index 1d9cf939032..30c1459c813 100644 --- a/quickwit/quickwit-indexing/src/split_store/indexing_split_cache.rs +++ b/quickwit/quickwit-indexing/src/split_store/indexing_split_cache.rs @@ -17,6 +17,7 @@ // You should have received a copy of the GNU Affero General Public License // along with this program. If not, see . +use std::collections::btree_map::Entry; use std::collections::BTreeMap; use std::io; use std::path::{Path, PathBuf}; @@ -106,20 +107,20 @@ fn split_id_from_split_folder(dir_path: &Path) -> Option<&str> { dir_path.file_name()?.to_str()?.strip_suffix(".split") } -/// The `IndexingSplitCache` is a local cache used to improve the performance of indexing nodes. -/// Its purpose is simple: when a new split is freshly created, it is usely merged +/// The [`IndexingSplitCache`] is a local cache used to improve the performance of indexing nodes. +/// Its purpose is simple: when a new split is freshly created, it is usually merged /// very rapidly after. /// -/// In order to prevent this merge to force its download, we store it in the -/// `IndexingSplitCache`. This store is just a cache: a cache miss is acceptable and -/// just means that the split will be redownloaded. +/// In order to prevent this merge from forcing its download, we store it in the +/// [`IndexingSplitCache`]. This store is just a cache: a cache miss is acceptable and +/// just means that the split will be downloaded again. /// /// The indexing split cache eviction policy however, is rather uncommon. /// On our happy path, a split is stored into the cache, and is then used only once /// to undergo a merge. /// /// For this reason, we simply offer a way to `move splits into the cache`, -/// and `move splits out of the cache`. A split is removed from the split store +/// and `move splits out of the cache`. A split is removed from the split cache /// after its first access. /// /// Of course a failed merge could require accessing a given split more than once. In that @@ -128,8 +129,8 @@ fn split_id_from_split_folder(dir_path: &Path) -> Option<&str> { /// The cache size is limited by 3 things: /// - a maximum number of splits as defined in the `SplitStoreQuota`. /// - a maximum number of bytes as defined in the `SplitStoreQuota`. -/// - finally, we evict older splits to make sure that the newest split and the oldest -/// split only differ by at most `SPLIT_MAX_AGE`. +/// - finally, we evict older splits to make sure that the newest split and the oldest split only +/// differ by at most `SPLIT_MAX_AGE`. /// /// The point of this final rule invariant is to make sure that the disk space will be /// released if the cache is NOT under pressure but some splits are actually useless. @@ -137,7 +138,6 @@ fn split_id_from_split_folder(dir_path: &Path) -> Option<&str> { /// When adding a new split into the cache, if adding the split would break one of the following /// limit, we simply remove split one by one starting by the oldest first, until the split /// can be added. - pub struct IndexingSplitCache { inner: Mutex, } @@ -153,9 +153,9 @@ struct SplitFolderRegistry { /// Splits ids are generated using ULID, so that they are sorted /// according to their creation date. /// - /// We evict the oldest split first. (Note this is not an LRU strategy + /// We evict the oldest split first. Note this is not an LRU strategy /// because we do not care about the last access time, but we only - /// consider the creation time.) + /// consider the creation time. split_folders: BTreeMap, /// The split store quota shared among all indexing split stores. split_store_quota: SplitStoreQuota, @@ -181,23 +181,24 @@ impl SplitFolderRegistry { } // Inserting the same split_folder with a different number of bytes panics. - fn insert(&mut self, split_folder: SplitFolder) { - if let Some(previous) = self - .split_folders - .insert(split_folder.split_id, split_folder.num_bytes) - { - assert_eq!(previous, split_folder.num_bytes); + fn insert(&mut self, split_folder: SplitFolder) -> bool { + if let Entry::Vacant(entry) = self.split_folders.entry(split_folder.split_id) { + entry.insert(split_folder.num_bytes); + self.split_store_quota.add_split(split_folder.num_bytes); + true + } else { + false } - self.split_store_quota.add_split(split_folder.num_bytes); } /// Returns true if the split was indeed present in the registry - fn remove(&mut self, split_id: Ulid) -> bool { - let Some(num_bytes) = self.split_folders.remove(&split_id) else { - return false; - }; + fn remove(&mut self, split_id: Ulid) -> Option { + let num_bytes = self.split_folders.remove(&split_id)?; self.split_store_quota.remove_split(num_bytes); - true + Some(SplitFolder { + num_bytes, + split_id, + }) } /// Returns the oldest split (oldest in the sense of the ULID = creation time). @@ -207,10 +208,9 @@ impl SplitFolderRegistry { } /// Removes the oldest split. - fn pop_oldest(&mut self) -> Option { + fn pop_oldest(&mut self) -> Option { let oldest_split_id = self.oldest_split()?; - assert!(self.remove(oldest_split_id)); - Some(oldest_split_id) + self.remove(oldest_split_id) } fn quota(&self) -> &SplitStoreQuota { @@ -227,38 +227,43 @@ impl InnerSplitCache { split_id: Ulid, to_folder: &Path, ) -> StorageResult> { - if !self.split_registry.remove(split_id) { + let Some(split_folder) = self.split_registry.remove(split_id) else { // The split is simply not in cache. return Ok(None); }; let from_path = self.split_path(split_id); let to_full_path = to_folder.join(from_path.file_name().unwrap()); - // We voluntarily use a non async operation: - // A rename is supposed to be short, and we want to keep this operation - // as transactional as possible: we don't want our task to be cancelled in the middle - // of an inconsistent state. + // We voluntarily use a non async operation: A rename is supposed to be + // quick, and we want to keep this operation as transactional as + // possible. In particular, we don't want our task to be cancelled in the + // middle of an inconsistent state. if let Err(io_err) = std::fs::rename(&from_path, &to_full_path) { // We do not simply rely on the `io::ErrorKind::NotFound` here - // because it could be about the destination and no the origin. - if let Ok(false) = from_path.try_exists() { - // This could happen if some files have been manually deleted from the FS - // for instance. - // - // No catastrophy here. - warn!(from_path=%from_path.display(), to_full_path=%to_full_path.display(), error=%io_err, "split somehow missing from local split cache"); - } else { - // At this point, we are probably in an inconsistent state. - // The split has been removed from our registry but the files are still in the cache - // directory. - error!(from_path=%from_path.display(), to_full_path=%to_full_path.display(), error=%io_err, "failed to move split directory out of cache"); - // Let's attempt to repair consistency of our indexing split cache - // by removing the now dangling split directory. - if let Err(io_err) = tokio::fs::remove_dir(&from_path).await { - error!(from_path=%from_path.display(), to_full_path=%to_full_path.display(), error=%io_err, "failed to remove dangling split directory from local split cache"); + // because it could be about the destination and not the origin. + return match from_path.try_exists() { + Ok(false) => { + // This could happen if some files have been manually + // deleted from the FS for instance. + warn!(from_path=%from_path.display(), to_full_path=%to_full_path.display(), error=%io_err, "cached split missing from local split directory"); + Ok(None) } - } - return Err(From::from(io_err)); + Ok(true) => { + // The file couldn't be copied out but is still in the + // cache, we put it back to the registry to keep the + // statistics accurate + warn!(from_path=%from_path.display(), to_full_path=%to_full_path.display(), error=%io_err, "split stuck in local split cache"); + self.split_registry.insert(split_folder); + Ok(None) + } + Err(_) => { + // At this point, we are probably in an inconsistent state. + // The split has been removed from our registry but we don't + // know whether the files are still in the cache directory. + error!(from_path=%from_path.display(), to_full_path=%to_full_path.display(), error=%io_err, "failed to move split directory out of cache"); + Err(From::from(io_err)) + } + }; } Ok(Some(to_full_path)) } @@ -278,7 +283,17 @@ impl InnerSplitCache { .split_registry .pop_oldest() .expect("No remaining split to remove"); - tokio::fs::remove_dir_all(&self.split_path(evicted_split)).await?; + let result = tokio::fs::remove_dir_all(&self.split_path(evicted_split.split_id)).await; + if let Err(io_err) = result { + if io_err.kind() == io::ErrorKind::NotFound { + // This could happen if some files have been manually deleted + // from the FS for instance. + warn!(split_id=%evicted_split.split_id, "cached split missing from local split directory"); + return Ok(()); + } else { + return Err(io_err); + } + } Ok(()) } @@ -286,8 +301,7 @@ impl InnerSplitCache { /// /// Move is not an image here. We are literally moving the directory. /// - /// If the cache capacity does not allow it, this function - /// just logs a warning and returns Ok(false). + /// If the cache capacity does not allow it returns Ok(false). /// /// Ok(true) means the file was effectively accepted. async fn move_into_cache(&mut self, split_id_str: &str, split_path: &Path) -> io::Result { @@ -298,7 +312,11 @@ impl InnerSplitCache { return Ok(false); } let to_full_path = self.split_path(split_id); - tokio::fs::rename(split_path, &to_full_path).await?; + if let Err(io_err) = tokio::fs::rename(split_path, &to_full_path).await { + // keep the registry stats accurate + self.split_registry.remove(split_id); + return Err(io_err); + } Ok(true) } @@ -333,10 +351,8 @@ impl InnerSplitCache { if let Some(creation_time_limit) = split_folder.creation_time().checked_sub(SPLIT_MAX_AGE) { self.remove_splits_older_than_limit(creation_time_limit) .await?; - } - - self.split_registry.insert(split_folder); - Ok(true) + }; + Ok(self.split_registry.insert(split_folder)) } } @@ -421,7 +437,7 @@ impl IndexingSplitCache { } #[cfg(any(test, feature = "testsuite"))] - pub async fn inspect(&self) -> std::collections::HashMap { + pub async fn inspect_registry(&self) -> std::collections::HashMap { self.inner .lock() .await @@ -431,6 +447,16 @@ impl IndexingSplitCache { .collect() } + #[cfg(any(test, feature = "testsuite"))] + pub async fn inspect_quota(&self) -> SplitStoreQuota { + self.inner + .lock() + .await + .split_registry + .split_store_quota + .clone() + } + /// Returns a cached split to performs a merge operation. /// /// For simplicity, this method optimistically assumes that the merge operation will be @@ -454,10 +480,7 @@ impl IndexingSplitCache { .move_out(split_ulid, output_dir_path) .await?; if split_file_opt.is_none() { - debug!( - split_id = split_id, - "Split file/folder is not in cache missing." - ); + debug!(split_id = split_id, "Split folder not in cache"); } Ok(split_file_opt) } @@ -487,6 +510,7 @@ mod tests { use std::io; use std::io::Write; use std::path::Path; + use std::time::Duration; use bytesize::ByteSize; use quickwit_directories::BundleDirectory; @@ -497,6 +521,7 @@ mod tests { use tokio::fs; use ulid::Ulid; + use super::SPLIT_MAX_AGE; use crate::split_store::{IndexingSplitCache, SplitStoreQuota}; async fn create_fake_split( @@ -521,7 +546,7 @@ mod tests { let split_store = IndexingSplitCache::open(temp_dir.path().to_path_buf(), split_store_space_quota) .await?; - let cache_content = split_store.inspect().await; + let cache_content = split_store.inspect_registry().await; assert_eq!(cache_content.len(), 2); assert_eq!(cache_content.get(split_id1).cloned(), Some(ByteSize(15))); assert_eq!(cache_content.get(split_id2).cloned(), Some(ByteSize(13))); @@ -543,12 +568,14 @@ mod tests { create_fake_split(dir.path(), "01GF521CZC1260V8QPA81T46X7", 45) .await .unwrap(); // 2 - let split_store_space_quota = SplitStoreQuota::new(2, ByteSize::kb(1)); + let split_store_space_quota = SplitStoreQuota::try_new(2, ByteSize::kb(1)).unwrap(); let local_split_store = IndexingSplitCache::open(dir.path().to_path_buf(), split_store_space_quota) .await .unwrap(); - assert_eq!(local_split_store.inspect().await.len(), 2); + assert_eq!(local_split_store.inspect_registry().await.len(), 2); + let quota = local_split_store.inspect_quota().await; + assert_eq!(quota.used_num_bytes(), ByteSize(50)); } #[tokio::test] @@ -566,14 +593,51 @@ mod tests { create_fake_split(dir.path(), "01GF521CZC1260V8QPA81T46X7", 45) .await .unwrap(); // 2 - let split_store_space_quota = SplitStoreQuota::new(6, ByteSize(61)); + let split_store_space_quota = SplitStoreQuota::try_new(6, ByteSize(61)).unwrap(); let local_split_store = IndexingSplitCache::open(dir.path().to_path_buf(), split_store_space_quota) .await .unwrap(); - let cache_content = local_split_store.inspect().await; + let cache_content = local_split_store.inspect_registry().await; assert_eq!(cache_content.len(), 2); assert_eq!(cache_content.values().map(|v| v.as_u64()).sum::(), 50); + let quota = local_split_store.inspect_quota().await; + assert_eq!(quota.used_num_bytes(), ByteSize(50)); + } + + #[tokio::test] + async fn test_big_split_evicts_all() { + let dir = tempdir().unwrap(); + create_fake_split(dir.path(), "01GF5215TMV48JT7GZ543BV193", 100) + .await + .unwrap(); // 1 + create_fake_split(dir.path(), "01GF520MTTRNCCTQZE264BBYWM", 100) + .await + .unwrap(); // 0 + create_fake_split(dir.path(), "01GF521M316V9AEHZWTHN76F2V", 100) + .await + .unwrap(); // 3 + create_fake_split(dir.path(), "01GF521CZC1260V8QPA81T46X7", 100) + .await + .unwrap(); // 2 + let split_store_space_quota = SplitStoreQuota::try_new(6, ByteSize::b(401)).unwrap(); + let local_split_store = + IndexingSplitCache::open(dir.path().to_path_buf(), split_store_space_quota) + .await + .unwrap(); + assert_eq!(local_split_store.inspect_registry().await.len(), 4); + + let extra_split = tempdir().unwrap(); + fs::write(extra_split.path().join("splitdata"), &vec![0u8; 400]) + .await + .unwrap(); + local_split_store + .move_into_cache("01GFCZJBMBMEPMAQSFD09VTST2", extra_split.path()) + .await + .unwrap(); + assert_eq!(local_split_store.inspect_registry().await.len(), 1); + let quota = local_split_store.inspect_quota().await; + assert_eq!(quota.used_num_bytes(), ByteSize(400)); } #[tokio::test] @@ -595,20 +659,46 @@ mod tests { create_fake_split(dir.path(), "01GF1ZJBMBMEPMAQSFD09VTST2", 1) .await .unwrap(); - let split_store_space_quota = SplitStoreQuota::new(6, ByteSize(100)); + let split_store_space_quota = SplitStoreQuota::try_new(6, ByteSize(100)).unwrap(); let local_split_store = IndexingSplitCache::open(dir.path().to_path_buf(), split_store_space_quota) .await .unwrap(); - let cache_content = local_split_store.inspect().await; + let cache_content = local_split_store.inspect_registry().await; assert_eq!(cache_content.len(), 3); - let extra_split = tempdir().unwrap(); - local_split_store - .move_into_cache("01GFCZJBMBMEPMAQSFD09VTST2", extra_split.path()) - .await - .unwrap(); - let cache_content = local_split_store.inspect().await; - assert_eq!(cache_content.len(), 2); + + // adding a split with a large time gap only keeps splits younger than SPLIT_MAX_AGE + assert_eq!( + SPLIT_MAX_AGE, + Duration::from_secs(2 * 24 * 3_600), + "update this test if SPLIT_MAX_AGE changes" + ); + { + let extra_split = tempdir().unwrap(); + local_split_store + // 2022-10-15T4:48:49.803Z + .move_into_cache("01GFCZJBMBMEPMAQSFD09VTST2", extra_split.path()) + .await + .unwrap(); + let cache_content = local_split_store.inspect_registry().await; + assert_eq!(cache_content.len(), 2); + let quota = local_split_store.inspect_quota().await; + assert_eq!(quota.used_num_bytes(), ByteSize(1)); + } + { + // adding a split with a huge time gap should empty the cache entirely first + let extra_split = tempdir().unwrap(); + let was_accepted = local_split_store + // 2025-01-13T14:28:17.364Z + .move_into_cache("01JHG11FAM8F2XPWHY24R3HF6M", extra_split.path()) + .await + .unwrap(); + assert!(was_accepted); + let cache_content = local_split_store.inspect_registry().await; + assert_eq!(cache_content.len(), 1); + let quota = local_split_store.inspect_quota().await; + assert_eq!(quota.used_num_bytes(), ByteSize(0)); + } } #[tokio::test] @@ -638,25 +728,129 @@ mod tests { async fn test_store_and_fetch() { let temp_dir_in = tempfile::tempdir().unwrap(); let split_id = Ulid::default().to_string(); - let split_dir = temp_dir_in.path().join(format!("scratch_{split_id}")); - tokio::fs::create_dir(&split_dir).await.unwrap(); let cache_dir = tempfile::tempdir().unwrap(); let quota = SplitStoreQuota::default(); let local_store = IndexingSplitCache::open(cache_dir.path().to_path_buf(), quota) .await .unwrap(); - assert!(split_dir.try_exists().unwrap()); - assert!(local_store - .move_into_cache(&split_id, &split_dir) + { + let split_dir = temp_dir_in.path().join(format!("scratch_{split_id}")); + tokio::fs::create_dir(&split_dir).await.unwrap(); + assert!(local_store + .move_into_cache(&split_id, &split_dir) + .await + .unwrap()); + assert!(!split_dir.try_exists().unwrap()); + } + { + let split_path = local_store + .get_cached_split(&split_id, temp_dir_in.path()) + .await + .unwrap() + .unwrap(); + assert!(split_path.try_exists().unwrap()); + assert_eq!(split_path.parent().unwrap(), temp_dir_in.path()); + } + { + // cache miss because the previous get_cached_split removed the split from the cache + let split_path_opt = local_store + .get_cached_split(&split_id, temp_dir_in.path()) + .await + .unwrap(); + assert_eq!(split_path_opt, None); + } + } + + async fn clear_dir_manually(dir: &Path) { + let mut entries = fs::read_dir(dir).await.unwrap(); + while let Some(entry) = entries.next_entry().await.unwrap() { + let path = entry.path(); + if path.is_dir() { + fs::remove_dir_all(&path).await.unwrap(); + } else { + fs::remove_file(&path).await.unwrap(); + } + } + } + + #[tokio::test] + async fn test_fetch_manually_deleted_split() { + let dir = tempdir().unwrap(); + create_fake_split(dir.path(), "01GF5215TMV48JT7GZ543BV193", 100) .await - .unwrap()); - assert!(!split_dir.try_exists().unwrap()); - let split_path = local_store - .get_cached_split(&split_id, temp_dir_in.path()) + .unwrap(); + let split_store_space_quota = SplitStoreQuota::try_new(6, ByteSize::b(401)).unwrap(); + let local_split_store = + IndexingSplitCache::open(dir.path().to_path_buf(), split_store_space_quota) + .await + .unwrap(); + assert_eq!(local_split_store.inspect_registry().await.len(), 1); + + clear_dir_manually(dir.path()).await; + + let target_dir = tempdir().unwrap(); + let path_opt = local_split_store + .get_cached_split("01GF5215TMV48JT7GZ543BV193", target_dir.path()) + .await + .unwrap(); + assert_eq!(path_opt, None); + assert_eq!(local_split_store.inspect_registry().await.len(), 0); + let quota = local_split_store.inspect_quota().await; + assert_eq!(quota.used_num_bytes(), ByteSize(0)); + } + + #[tokio::test] + async fn test_evict_manually_deleted_split() { + let dir = tempdir().unwrap(); + // // 2022-10-12T20:53:23.211Z + create_fake_split(dir.path(), "01GF6ZJBMBMEPMAQSFD09VTST2", 100) + .await + .unwrap(); + let split_store_space_quota = SplitStoreQuota::try_new(1, ByteSize::b(401)).unwrap(); + let local_split_store = + IndexingSplitCache::open(dir.path().to_path_buf(), split_store_space_quota) + .await + .unwrap(); + assert_eq!(local_split_store.inspect_registry().await.len(), 1); + + clear_dir_manually(dir.path()).await; + + let extra_split = tempdir().unwrap(); + let was_accepted = local_split_store + // 2022-10-12T02:14:54.347Z + .move_into_cache("01GF4ZJBMBMEPMAQSFD09VTST2", extra_split.path()) + .await + .unwrap(); + assert!(was_accepted); + assert_eq!(local_split_store.inspect_registry().await.len(), 1); + let quota = local_split_store.inspect_quota().await; + assert_eq!(quota.used_num_bytes(), ByteSize(0)); + } + + #[tokio::test] + async fn test_load_same_split_twice() { + let temp_dir = tempfile::tempdir().unwrap(); + let split_id = "01GF5449X7DA53TK9F9W2ZJST2"; + create_fake_split(temp_dir.path(), split_id, 15) + .await + .unwrap(); + let split_store_space_quota = SplitStoreQuota::default(); + let split_store = + IndexingSplitCache::open(temp_dir.path().to_path_buf(), split_store_space_quota) + .await + .unwrap(); + + let extra_split = tempdir().unwrap(); + let extra_split_filepath = temp_dir.path().join("splitfile"); + let mut extra_split_file = File::create(&extra_split_filepath).unwrap(); + extra_split_file.write_all(&[0u8; 15]).unwrap(); + + let was_accepted = split_store + .move_into_cache(split_id, extra_split.path()) .await - .unwrap() .unwrap(); - assert!(split_path.try_exists().unwrap()); - assert_eq!(split_path.parent().unwrap(), temp_dir_in.path()); + assert!(!was_accepted); + let quota = split_store.inspect_quota().await; + assert_eq!(quota.used_num_bytes(), ByteSize(15)); } } diff --git a/quickwit/quickwit-indexing/src/split_store/indexing_split_store.rs b/quickwit/quickwit-indexing/src/split_store/indexing_split_store.rs index 45fc5a74448..1167cc6b617 100644 --- a/quickwit/quickwit-indexing/src/split_store/indexing_split_store.rs +++ b/quickwit/quickwit-indexing/src/split_store/indexing_split_store.rs @@ -217,7 +217,7 @@ impl IndexingSplitStore { /// Takes a snapshot of the cache view (only used for testing). #[cfg(any(test, feature = "testsuite"))] pub async fn inspect_split_cache(&self) -> HashMap { - self.inner.split_cache.inspect().await + self.inner.split_cache.inspect_registry().await } } @@ -229,7 +229,7 @@ mod tests { use bytesize::ByteSize; use quickwit_common::io::IoControls; use quickwit_metastore::{SplitMaturity, SplitMetadata}; - use quickwit_storage::{RamStorage, SplitPayloadBuilder}; + use quickwit_storage::{PutPayload, RamStorage, SplitPayloadBuilder}; use tempfile::tempdir; use time::OffsetDateTime; use tokio::fs; @@ -311,17 +311,37 @@ mod tests { Some(ByteSize(3)) ); + let io_controls = IoControls::default(); + { + let output = tempfile::tempdir()?; + let split1 = split_store + .fetch_and_open_split(&split_id1, output.path(), &io_controls) + .await?; + let local_store_stats = split_store.inspect_split_cache().await; + assert_eq!(local_store_stats.len(), 1); + assert!(split1.exists(std::path::Path::new("splitfile")).unwrap()); + } + { + let output = tempfile::tempdir()?; + let split2 = split_store + .fetch_and_open_split(&split_id2, output.path(), &io_controls) + .await?; + let local_store_stats = split_store.inspect_split_cache().await; + assert_eq!(local_store_stats.len(), 0); + assert!(split2.exists(std::path::Path::new("splitfile")).unwrap()); + } + Ok(()) } #[tokio::test] - async fn test_put_should_not_store_in_cache_when_max_num_files_reached() -> anyhow::Result<()> { + async fn test_evection_and_fallback_to_remote() -> anyhow::Result<()> { let temp_dir = tempfile::tempdir()?; let split_cache_dir = tempdir()?; let split_cache = IndexingSplitCache::open( split_cache_dir.path().to_path_buf(), - SplitStoreQuota::new(1, ByteSize::mb(1)), + SplitStoreQuota::try_new(1, ByteSize::mb(1)).unwrap(), ) .await?; @@ -329,7 +349,9 @@ mod tests { let split_store = IndexingSplitStore::new(remote_storage, Arc::new(split_cache)); let split_id1 = Ulid::new().to_string(); + let split_payload1 = SplitPayloadBuilder::get_split_payload(&[], &[], &[5, 5, 5])?; let split_id2 = Ulid::new().to_string(); + let split_payload2 = SplitPayloadBuilder::get_split_payload(&[], &[], &[5, 5, 5, 5])?; { let split_path = temp_dir.path().join(&split_id1); @@ -340,11 +362,7 @@ mod tests { .store_split( &split_metadata1, &split_path, - Box::new(SplitPayloadBuilder::get_split_payload( - &[], - &[], - &[5, 5, 5], - )?), + Box::new(split_payload1.clone()), ) .await?; assert!(!split_path.try_exists()?); @@ -369,11 +387,7 @@ mod tests { .store_split( &split_metadata2, &split_path, - Box::new(SplitPayloadBuilder::get_split_payload( - &[], - &[], - &[5, 5, 5], - )?), + Box::new(split_payload2.clone()), ) .await?; assert!(!split_path.try_exists()?); @@ -388,18 +402,36 @@ mod tests { Some(ByteSize(12)) ); } + let io_controls = IoControls::default(); { + // get from remote storage because split_id1 was evicted by split_id2 let output = tempfile::tempdir()?; - let io_controls = IoControls::default(); - // get from cache let _split1 = split_store .fetch_and_open_split(&split_id1, output.path(), &io_controls) .await?; - // get from remote storage + assert_eq!(io_controls.num_bytes(), split_payload1.len()); + } + { + // get from cache + let output = tempfile::tempdir()?; + let _split2 = split_store + .fetch_and_open_split(&split_id2, output.path(), &io_controls) + .await?; + // the number of downloaded by didn't change (still the size of split_payload1) + assert_eq!(io_controls.num_bytes(), split_payload1.len()); + } + { + // get from remote because getting from cache removes the split from the cache + let output = tempfile::tempdir()?; let _split2 = split_store .fetch_and_open_split(&split_id2, output.path(), &io_controls) .await?; + assert_eq!( + io_controls.num_bytes(), + split_payload1.len() + split_payload2.len() + ); } + Ok(()) } } diff --git a/quickwit/quickwit-indexing/src/split_store/split_store_quota.rs b/quickwit/quickwit-indexing/src/split_store/split_store_quota.rs index 75a4c01afcb..ab7c51d7b25 100644 --- a/quickwit/quickwit-indexing/src/split_store/split_store_quota.rs +++ b/quickwit/quickwit-indexing/src/split_store/split_store_quota.rs @@ -21,7 +21,7 @@ use bytesize::ByteSize; use quickwit_config::IndexerConfig; /// A struct for keeping in check multiple SplitStore. -#[derive(Debug)] +#[derive(Debug, Clone)] pub struct SplitStoreQuota { /// Current number of splits in the cache. num_splits_in_cache: usize, @@ -29,7 +29,7 @@ pub struct SplitStoreQuota { size_in_bytes_in_cache: ByteSize, /// Maximum number of files allowed in the cache. max_num_splits: usize, - /// Maximum size in bytes allowed in the cache. + /// Maximum size in bytes allowed in the cache. 0 if max_num_splits=0. max_num_bytes: ByteSize, } @@ -45,17 +45,20 @@ impl Default for SplitStoreQuota { } impl SplitStoreQuota { - pub fn new(max_num_splits: usize, max_num_bytes: ByteSize) -> Self { - Self { + pub fn try_new(max_num_splits: usize, max_num_bytes: ByteSize) -> anyhow::Result { + if max_num_splits == 0 && max_num_bytes.as_u64() > 0 { + anyhow::bail!("max_num_bytes cannot be > 0 if max_num_splits is 0"); + } + Ok(Self { max_num_splits, max_num_bytes, ..Default::default() - } + }) } /// Space quota that prevents any caching. pub fn no_caching() -> Self { - Self::new(0, ByteSize::default()) + Self::try_new(0, ByteSize::default()).unwrap() } pub fn can_fit_split(&self, split_size_in_bytes: ByteSize) -> bool { @@ -85,6 +88,10 @@ impl SplitStoreQuota { pub fn max_num_bytes(&self) -> ByteSize { self.max_num_bytes } + + pub fn used_num_bytes(&self) -> ByteSize { + self.size_in_bytes_in_cache + } } #[cfg(test)] @@ -93,28 +100,33 @@ mod tests { use crate::split_store::SplitStoreQuota; + #[test] + fn test_invalid_quota() { + SplitStoreQuota::try_new(0, ByteSize(100)).unwrap_err(); + } + #[test] fn test_split_store_quota_max_bytes_accepted() { - let split_store_quota = SplitStoreQuota::new(3, ByteSize(100)); + let split_store_quota = SplitStoreQuota::try_new(3, ByteSize(100)).unwrap(); assert!(split_store_quota.can_fit_split(ByteSize(100))); } #[test] fn test_split_store_quota_exceeding_bytes() { - let split_store_quota = SplitStoreQuota::new(3, ByteSize(100)); + let split_store_quota = SplitStoreQuota::try_new(3, ByteSize(100)).unwrap(); assert!(!split_store_quota.can_fit_split(ByteSize(101))); } #[test] fn test_split_store_quota_max_num_files_accepted() { - let mut split_store_quota = SplitStoreQuota::new(2, ByteSize(100)); + let mut split_store_quota = SplitStoreQuota::try_new(2, ByteSize(100)).unwrap(); split_store_quota.add_split(ByteSize(1)); assert!(split_store_quota.can_fit_split(ByteSize(1))); } #[test] fn test_split_store_quota_exceeding_max_num_files() { - let mut split_store_quota = SplitStoreQuota::new(2, ByteSize(100)); + let mut split_store_quota = SplitStoreQuota::try_new(2, ByteSize(100)).unwrap(); split_store_quota.add_split(ByteSize(1)); split_store_quota.add_split(ByteSize(1)); assert!(!split_store_quota.can_fit_split(ByteSize(1)));