Skip to content

Commit 5b42546

Browse files
committed
fix: implement a CacheStrategy to ensure compaction use cache correctly (#5254)
* feat: impl CacheStrategy * refactor: replace Option<CacheManagerRef> with CacheStrategy * feat: add disabled strategy * ci: force update taplo * refactor: rename CacheStrategy::Normal to CacheStrategy::EnableAll * ci: force install cargo-gc-bin * ci: force install * chore: use CacheStrategy::Disabled as ScanInput default * chore: fix compiler errors
1 parent 0678a31 commit 5b42546

File tree

16 files changed

+316
-156
lines changed

16 files changed

+316
-156
lines changed

.github/workflows/develop.yml

Lines changed: 7 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -84,7 +84,7 @@ jobs:
8484
# Shares across multiple jobs
8585
shared-key: "check-toml"
8686
- name: Install taplo
87-
run: cargo +stable install taplo-cli --version ^0.9 --locked
87+
run: cargo +stable install taplo-cli --version ^0.9 --locked --force
8888
- name: Run taplo
8989
run: taplo format --check
9090

@@ -107,7 +107,7 @@ jobs:
107107
shared-key: "build-binaries"
108108
- name: Install cargo-gc-bin
109109
shell: bash
110-
run: cargo install cargo-gc-bin
110+
run: cargo install cargo-gc-bin --force
111111
- name: Build greptime binaries
112112
shell: bash
113113
# `cargo gc` will invoke `cargo build` with specified args
@@ -163,7 +163,7 @@ jobs:
163163
run: |
164164
sudo apt-get install -y libfuzzer-14-dev
165165
rustup install nightly
166-
cargo +nightly install cargo-fuzz cargo-gc-bin
166+
cargo +nightly install cargo-fuzz cargo-gc-bin --force
167167
- name: Download pre-built binaries
168168
uses: actions/download-artifact@v4
169169
with:
@@ -220,7 +220,7 @@ jobs:
220220
shell: bash
221221
run: |
222222
sudo apt update && sudo apt install -y libfuzzer-14-dev
223-
cargo install cargo-fuzz cargo-gc-bin
223+
cargo install cargo-fuzz cargo-gc-bin --force
224224
- name: Download pre-built binariy
225225
uses: actions/download-artifact@v4
226226
with:
@@ -268,7 +268,7 @@ jobs:
268268
shared-key: "build-greptime-ci"
269269
- name: Install cargo-gc-bin
270270
shell: bash
271-
run: cargo install cargo-gc-bin
271+
run: cargo install cargo-gc-bin --force
272272
- name: Build greptime bianry
273273
shell: bash
274274
# `cargo gc` will invoke `cargo build` with specified args
@@ -338,7 +338,7 @@ jobs:
338338
run: |
339339
sudo apt-get install -y libfuzzer-14-dev
340340
rustup install nightly
341-
cargo +nightly install cargo-fuzz cargo-gc-bin
341+
cargo +nightly install cargo-fuzz cargo-gc-bin --force
342342
# Downloads ci image
343343
- name: Download pre-built binariy
344344
uses: actions/download-artifact@v4
@@ -487,7 +487,7 @@ jobs:
487487
run: |
488488
sudo apt-get install -y libfuzzer-14-dev
489489
rustup install nightly
490-
cargo +nightly install cargo-fuzz cargo-gc-bin
490+
cargo +nightly install cargo-fuzz cargo-gc-bin --force
491491
# Downloads ci image
492492
- name: Download pre-built binariy
493493
uses: actions/download-artifact@v4

src/mito2/src/cache.rs

Lines changed: 189 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -55,6 +55,195 @@ const FILE_TYPE: &str = "file";
5555
/// Metrics type key for selector result cache.
5656
const SELECTOR_RESULT_TYPE: &str = "selector_result";
5757

58+
/// Cache strategies that may only enable a subset of caches.
59+
#[derive(Clone)]
60+
pub enum CacheStrategy {
61+
/// Strategy for normal operations.
62+
/// Doesn't disable any cache.
63+
EnableAll(CacheManagerRef),
64+
/// Strategy for compaction.
65+
/// Disables some caches during compaction to avoid affecting queries.
66+
/// Enables the write cache so that the compaction can read files cached
67+
/// in the write cache and write the compacted files back to the write cache.
68+
Compaction(CacheManagerRef),
69+
/// Do not use any cache.
70+
Disabled,
71+
}
72+
73+
impl CacheStrategy {
74+
/// Calls [CacheManager::get_parquet_meta_data()].
75+
pub async fn get_parquet_meta_data(
76+
&self,
77+
region_id: RegionId,
78+
file_id: FileId,
79+
) -> Option<Arc<ParquetMetaData>> {
80+
match self {
81+
CacheStrategy::EnableAll(cache_manager) => {
82+
cache_manager
83+
.get_parquet_meta_data(region_id, file_id)
84+
.await
85+
}
86+
CacheStrategy::Compaction(cache_manager) => {
87+
cache_manager
88+
.get_parquet_meta_data(region_id, file_id)
89+
.await
90+
}
91+
CacheStrategy::Disabled => None,
92+
}
93+
}
94+
95+
/// Calls [CacheManager::get_parquet_meta_data_from_mem_cache()].
96+
pub fn get_parquet_meta_data_from_mem_cache(
97+
&self,
98+
region_id: RegionId,
99+
file_id: FileId,
100+
) -> Option<Arc<ParquetMetaData>> {
101+
match self {
102+
CacheStrategy::EnableAll(cache_manager) => {
103+
cache_manager.get_parquet_meta_data_from_mem_cache(region_id, file_id)
104+
}
105+
CacheStrategy::Compaction(cache_manager) => {
106+
cache_manager.get_parquet_meta_data_from_mem_cache(region_id, file_id)
107+
}
108+
CacheStrategy::Disabled => None,
109+
}
110+
}
111+
112+
/// Calls [CacheManager::put_parquet_meta_data()].
113+
pub fn put_parquet_meta_data(
114+
&self,
115+
region_id: RegionId,
116+
file_id: FileId,
117+
metadata: Arc<ParquetMetaData>,
118+
) {
119+
match self {
120+
CacheStrategy::EnableAll(cache_manager) => {
121+
cache_manager.put_parquet_meta_data(region_id, file_id, metadata);
122+
}
123+
CacheStrategy::Compaction(cache_manager) => {
124+
cache_manager.put_parquet_meta_data(region_id, file_id, metadata);
125+
}
126+
CacheStrategy::Disabled => {}
127+
}
128+
}
129+
130+
/// Calls [CacheManager::remove_parquet_meta_data()].
131+
pub fn remove_parquet_meta_data(&self, region_id: RegionId, file_id: FileId) {
132+
match self {
133+
CacheStrategy::EnableAll(cache_manager) => {
134+
cache_manager.remove_parquet_meta_data(region_id, file_id);
135+
}
136+
CacheStrategy::Compaction(cache_manager) => {
137+
cache_manager.remove_parquet_meta_data(region_id, file_id);
138+
}
139+
CacheStrategy::Disabled => {}
140+
}
141+
}
142+
143+
/// Calls [CacheManager::get_repeated_vector()].
144+
/// It returns None if the strategy is [CacheStrategy::Compaction] or [CacheStrategy::Disabled].
145+
pub fn get_repeated_vector(
146+
&self,
147+
data_type: &ConcreteDataType,
148+
value: &Value,
149+
) -> Option<VectorRef> {
150+
match self {
151+
CacheStrategy::EnableAll(cache_manager) => {
152+
cache_manager.get_repeated_vector(data_type, value)
153+
}
154+
CacheStrategy::Compaction(_) | CacheStrategy::Disabled => None,
155+
}
156+
}
157+
158+
/// Calls [CacheManager::put_repeated_vector()].
159+
/// It does nothing if the strategy isn't [CacheStrategy::EnableAll].
160+
pub fn put_repeated_vector(&self, value: Value, vector: VectorRef) {
161+
if let CacheStrategy::EnableAll(cache_manager) = self {
162+
cache_manager.put_repeated_vector(value, vector);
163+
}
164+
}
165+
166+
/// Calls [CacheManager::get_pages()].
167+
/// It returns None if the strategy is [CacheStrategy::Compaction] or [CacheStrategy::Disabled].
168+
pub fn get_pages(&self, page_key: &PageKey) -> Option<Arc<PageValue>> {
169+
match self {
170+
CacheStrategy::EnableAll(cache_manager) => cache_manager.get_pages(page_key),
171+
CacheStrategy::Compaction(_) | CacheStrategy::Disabled => None,
172+
}
173+
}
174+
175+
/// Calls [CacheManager::put_pages()].
176+
/// It does nothing if the strategy isn't [CacheStrategy::EnableAll].
177+
pub fn put_pages(&self, page_key: PageKey, pages: Arc<PageValue>) {
178+
if let CacheStrategy::EnableAll(cache_manager) = self {
179+
cache_manager.put_pages(page_key, pages);
180+
}
181+
}
182+
183+
/// Calls [CacheManager::get_selector_result()].
184+
/// It returns None if the strategy is [CacheStrategy::Compaction] or [CacheStrategy::Disabled].
185+
pub fn get_selector_result(
186+
&self,
187+
selector_key: &SelectorResultKey,
188+
) -> Option<Arc<SelectorResultValue>> {
189+
match self {
190+
CacheStrategy::EnableAll(cache_manager) => {
191+
cache_manager.get_selector_result(selector_key)
192+
}
193+
CacheStrategy::Compaction(_) | CacheStrategy::Disabled => None,
194+
}
195+
}
196+
197+
/// Calls [CacheManager::put_selector_result()].
198+
/// It does nothing if the strategy isn't [CacheStrategy::EnableAll].
199+
pub fn put_selector_result(
200+
&self,
201+
selector_key: SelectorResultKey,
202+
result: Arc<SelectorResultValue>,
203+
) {
204+
if let CacheStrategy::EnableAll(cache_manager) = self {
205+
cache_manager.put_selector_result(selector_key, result);
206+
}
207+
}
208+
209+
/// Calls [CacheManager::write_cache()].
210+
/// It returns None if the strategy is [CacheStrategy::Disabled].
211+
pub fn write_cache(&self) -> Option<&WriteCacheRef> {
212+
match self {
213+
CacheStrategy::EnableAll(cache_manager) => cache_manager.write_cache(),
214+
CacheStrategy::Compaction(cache_manager) => cache_manager.write_cache(),
215+
CacheStrategy::Disabled => None,
216+
}
217+
}
218+
219+
/// Calls [CacheManager::index_cache()].
220+
/// It returns None if the strategy is [CacheStrategy::Compaction] or [CacheStrategy::Disabled].
221+
pub fn index_cache(&self) -> Option<&InvertedIndexCacheRef> {
222+
match self {
223+
CacheStrategy::EnableAll(cache_manager) => cache_manager.index_cache(),
224+
CacheStrategy::Compaction(_) | CacheStrategy::Disabled => None,
225+
}
226+
}
227+
228+
/// Calls [CacheManager::bloom_filter_index_cache()].
229+
/// It returns None if the strategy is [CacheStrategy::Compaction] or [CacheStrategy::Disabled].
230+
pub fn bloom_filter_index_cache(&self) -> Option<&BloomFilterIndexCacheRef> {
231+
match self {
232+
CacheStrategy::EnableAll(cache_manager) => cache_manager.bloom_filter_index_cache(),
233+
CacheStrategy::Compaction(_) | CacheStrategy::Disabled => None,
234+
}
235+
}
236+
237+
/// Calls [CacheManager::puffin_metadata_cache()].
238+
/// It returns None if the strategy is [CacheStrategy::Compaction] or [CacheStrategy::Disabled].
239+
pub fn puffin_metadata_cache(&self) -> Option<&PuffinMetadataCacheRef> {
240+
match self {
241+
CacheStrategy::EnableAll(cache_manager) => cache_manager.puffin_metadata_cache(),
242+
CacheStrategy::Compaction(_) | CacheStrategy::Disabled => None,
243+
}
244+
}
245+
}
246+
58247
/// Manages cached data for the engine.
59248
///
60249
/// All caches are disabled by default.

src/mito2/src/cache/write_cache.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -332,7 +332,7 @@ mod tests {
332332
use super::*;
333333
use crate::access_layer::OperationType;
334334
use crate::cache::test_util::new_fs_store;
335-
use crate::cache::CacheManager;
335+
use crate::cache::{CacheManager, CacheStrategy};
336336
use crate::region::options::IndexOptions;
337337
use crate::sst::file::FileId;
338338
use crate::sst::location::{index_file_path, sst_file_path};
@@ -495,7 +495,7 @@ mod tests {
495495

496496
// Read metadata from write cache
497497
let builder = ParquetReaderBuilder::new(data_home, handle.clone(), mock_store.clone())
498-
.cache(Some(cache_manager.clone()));
498+
.cache(CacheStrategy::EnableAll(cache_manager.clone()));
499499
let reader = builder.build().await.unwrap();
500500

501501
// Check parquet metadata

src/mito2/src/compaction.rs

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -43,7 +43,7 @@ use table::predicate::Predicate;
4343
use tokio::sync::mpsc::{self, Sender};
4444

4545
use crate::access_layer::AccessLayerRef;
46-
use crate::cache::CacheManagerRef;
46+
use crate::cache::{CacheManagerRef, CacheStrategy};
4747
use crate::compaction::compactor::{CompactionRegion, CompactionVersion, DefaultCompactor};
4848
use crate::compaction::picker::{new_picker, CompactionTask};
4949
use crate::compaction::task::CompactionTaskImpl;
@@ -573,6 +573,7 @@ pub struct SerializedCompactionOutput {
573573
struct CompactionSstReaderBuilder<'a> {
574574
metadata: RegionMetadataRef,
575575
sst_layer: AccessLayerRef,
576+
cache: CacheManagerRef,
576577
inputs: &'a [FileHandle],
577578
append_mode: bool,
578579
filter_deleted: bool,
@@ -586,7 +587,8 @@ impl<'a> CompactionSstReaderBuilder<'a> {
586587
let mut scan_input = ScanInput::new(self.sst_layer, ProjectionMapper::all(&self.metadata)?)
587588
.with_files(self.inputs.to_vec())
588589
.with_append_mode(self.append_mode)
589-
.with_cache(None)
590+
// We use special cache strategy for compaction.
591+
.with_cache(CacheStrategy::Compaction(self.cache))
590592
.with_filter_deleted(self.filter_deleted)
591593
// We ignore file not found error during compaction.
592594
.with_ignore_file_not_found(true)

src/mito2/src/compaction/compactor.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -307,6 +307,7 @@ impl Compactor for DefaultCompactor {
307307
let reader = CompactionSstReaderBuilder {
308308
metadata: region_metadata.clone(),
309309
sst_layer: sst_layer.clone(),
310+
cache: cache_manager.clone(),
310311
inputs: &output.inputs,
311312
append_mode,
312313
filter_deleted: output.filter_deleted,

src/mito2/src/engine.rs

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -84,6 +84,7 @@ use store_api::region_request::{AffectedRows, RegionOpenRequest, RegionRequest};
8484
use store_api::storage::{RegionId, ScanRequest};
8585
use tokio::sync::{oneshot, Semaphore};
8686

87+
use crate::cache::CacheStrategy;
8788
use crate::config::MitoConfig;
8889
use crate::error::{
8990
InvalidRequestSnafu, JoinSnafu, RecvSnafu, RegionNotFoundSnafu, Result, SerdeJsonSnafu,
@@ -428,7 +429,7 @@ impl EngineInner {
428429
version,
429430
region.access_layer.clone(),
430431
request,
431-
Some(cache_manager),
432+
CacheStrategy::EnableAll(cache_manager),
432433
)
433434
.with_parallel_scan_channel_size(self.config.parallel_scan_channel_size)
434435
.with_ignore_inverted_index(self.config.inverted_index.apply_on_query.disabled())

0 commit comments

Comments
 (0)