Skip to content

Commit

Permalink
Some sanity rewrite of the LocalSplitStore
Browse files Browse the repository at this point in the history
- Renaming  `LocalSplitStore` -> `IndexingSplitCache`.
- Removing of the useless BinaryHeap, HashMap, using instead a single
  BTreeMap
- Isolating quota and BTreeMap to ensure their transactionality.
- Logging eventual io errors.
  • Loading branch information
fulmicoton authored and rdettai committed Jan 14, 2025
1 parent d8e98b7 commit 8e8d3f8
Show file tree
Hide file tree
Showing 4 changed files with 195 additions and 126 deletions.
6 changes: 3 additions & 3 deletions quickwit/quickwit-indexing/src/actors/indexing_service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ use super::{MergePlanner, MergeSchedulerService};
use crate::actors::merge_pipeline::FinishPendingMergesAndShutdownPipeline;
use crate::models::{DetachIndexingPipeline, DetachMergePipeline, ObservePipeline, SpawnPipeline};
use crate::source::{AssignShards, Assignment};
use crate::split_store::{LocalSplitStore, SplitStoreQuota};
use crate::split_store::{IndexingSplitCache, SplitStoreQuota};
use crate::{IndexingPipeline, IndexingPipelineParams, IndexingSplitStore, IndexingStatistics};

/// Name of the indexing directory, usually located at `<data_dir_path>/indexing`.
Expand Down Expand Up @@ -113,7 +113,7 @@ pub struct IndexingService {
storage_resolver: StorageResolver,
indexing_pipelines: HashMap<PipelineUid, PipelineHandle>,
counters: IndexingServiceCounters,
local_split_store: Arc<LocalSplitStore>,
local_split_store: Arc<IndexingSplitCache>,
max_concurrent_split_uploads: usize,
merge_pipeline_handles: HashMap<MergePipelineId, MergePipelineHandle>,
cooperative_indexing_permits: Option<Arc<Semaphore>>,
Expand Down Expand Up @@ -155,7 +155,7 @@ impl IndexingService {
indexer_config.max_merge_write_throughput.map(io::limiter);
let split_cache_dir_path = get_cache_directory_path(&data_dir_path);
let local_split_store =
LocalSplitStore::open(split_cache_dir_path, split_store_space_quota).await?;
IndexingSplitCache::open(split_cache_dir_path, split_store_space_quota).await?;
let indexing_root_directory =
temp_dir::create_or_purge_directory(&data_dir_path.join(INDEXING_DIR_NAME)).await?;
let queue_dir_path = data_dir_path.join(QUEUES_DIR_NAME);
Expand Down
Loading

0 comments on commit 8e8d3f8

Please sign in to comment.