Skip to content

Commit 06a2498

Browse files
committed
refactor: consolidate RegionManifestOptions creation logic
Signed-off-by: WenyXu <[email protected]>
1 parent 605f327 commit 06a2498

File tree

4 files changed

+39
-60
lines changed

4 files changed

+39
-60
lines changed

src/mito2/src/compaction/compactor.rs

Lines changed: 4 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -41,11 +41,9 @@ use crate::error::{
4141
EmptyRegionDirSnafu, InvalidPartitionExprSnafu, JoinSnafu, ObjectStoreNotFoundSnafu, Result,
4242
};
4343
use crate::manifest::action::{RegionEdit, RegionMetaAction, RegionMetaActionList};
44-
use crate::manifest::manager::{RegionManifestManager, RegionManifestOptions, RemoveFileOptions};
45-
use crate::manifest::storage::manifest_compress_type;
44+
use crate::manifest::manager::{RegionManifestManager, RegionManifestOptions};
4645
use crate::metrics;
4746
use crate::read::{FlatSource, Source};
48-
use crate::region::opener::new_manifest_dir;
4947
use crate::region::options::RegionOptions;
5048
use crate::region::version::VersionRef;
5149
use crate::region::{ManifestContext, RegionLeaderState, RegionRoleState};
@@ -162,19 +160,9 @@ pub async fn open_compaction_region(
162160
};
163161

164162
let manifest_manager = {
165-
let region_manifest_options = RegionManifestOptions {
166-
manifest_dir: new_manifest_dir(&region_dir_from_table_dir(
167-
&req.table_dir,
168-
req.region_id,
169-
req.path_type,
170-
)),
171-
object_store: object_store.clone(),
172-
compress_type: manifest_compress_type(mito_config.compress_manifest),
173-
checkpoint_distance: mito_config.manifest_checkpoint_distance,
174-
remove_file_options: RemoveFileOptions {
175-
enable_gc: mito_config.gc.enable,
176-
},
177-
};
163+
let region_dir = region_dir_from_table_dir(&req.table_dir, req.region_id, req.path_type);
164+
let region_manifest_options =
165+
RegionManifestOptions::new(mito_config, &region_dir, object_store);
178166

179167
RegionManifestManager::open(region_manifest_options, &Default::default())
180168
.await?

src/mito2/src/manifest/manager.rs

Lines changed: 20 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@ use store_api::metadata::RegionMetadataRef;
2424
use store_api::storage::FileId;
2525
use store_api::{MAX_VERSION, MIN_VERSION, ManifestVersion};
2626

27+
use crate::config::MitoConfig;
2728
use crate::error::{
2829
self, InstallManifestToSnafu, NoCheckpointSnafu, NoManifestsSnafu, RegionStoppedSnafu, Result,
2930
};
@@ -33,7 +34,8 @@ use crate::manifest::action::{
3334
};
3435
use crate::manifest::checkpointer::Checkpointer;
3536
use crate::manifest::storage::{
36-
ManifestObjectStore, file_version, is_checkpoint_file, is_delta_file,
37+
ManifestObjectStore, file_version, is_checkpoint_file, is_delta_file, manifest_compress_type,
38+
manifest_dir,
3739
};
3840
use crate::metrics::MANIFEST_OP_ELAPSED;
3941
use crate::region::{ManifestStats, RegionLeaderState, RegionRoleState};
@@ -52,6 +54,23 @@ pub struct RegionManifestOptions {
5254
pub remove_file_options: RemoveFileOptions,
5355
}
5456

57+
impl RegionManifestOptions {
58+
/// Creates a new [RegionManifestOptions] with the given region directory, object store, and configuration.
59+
pub fn new(config: &MitoConfig, region_dir: &str, object_store: &ObjectStore) -> Self {
60+
RegionManifestOptions {
61+
manifest_dir: manifest_dir(region_dir),
62+
object_store: object_store.clone(),
63+
// We don't allow users to set the compression algorithm as we use it as a file suffix.
64+
// Currently, the manifest storage doesn't have good support for changing compression algorithms.
65+
compress_type: manifest_compress_type(config.compress_manifest),
66+
checkpoint_distance: config.manifest_checkpoint_distance,
67+
remove_file_options: RemoveFileOptions {
68+
enable_gc: config.gc.enable,
69+
},
70+
}
71+
}
72+
}
73+
5574
/// Options for updating `removed_files` field in [RegionManifest].
5675
#[derive(Debug, Clone)]
5776
#[cfg_attr(any(test, feature = "test"), derive(Default))]

src/mito2/src/manifest/storage.rs

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@ use crc32fast::Hasher;
2424
use futures::TryStreamExt;
2525
use futures::future::try_join_all;
2626
use lazy_static::lazy_static;
27+
use object_store::util::join_dir;
2728
use object_store::{Entry, ErrorKind, Lister, ObjectStore, util};
2829
use regex::Regex;
2930
use serde::{Deserialize, Serialize};
@@ -49,6 +50,11 @@ const DEFAULT_MANIFEST_COMPRESSION_TYPE: CompressionType = CompressionType::Gzip
4950
const FALL_BACK_COMPRESS_TYPE: CompressionType = CompressionType::Uncompressed;
5051
const FETCH_MANIFEST_PARALLELISM: usize = 16;
5152

53+
/// Returns the directory to the manifest files.
54+
pub fn manifest_dir(region_dir: &str) -> String {
55+
join_dir(region_dir, "manifest")
56+
}
57+
5258
/// Returns the [CompressionType] according to whether to compress manifest files.
5359
pub const fn manifest_compress_type(compress: bool) -> CompressionType {
5460
if compress {

src/mito2/src/region/opener.rs

Lines changed: 9 additions & 43 deletions
Original file line numberDiff line numberDiff line change
@@ -28,7 +28,7 @@ use log_store::kafka::log_store::KafkaLogStore;
2828
use log_store::noop::log_store::NoopLogStore;
2929
use log_store::raft_engine::log_store::RaftEngineLogStore;
3030
use object_store::manager::ObjectStoreManagerRef;
31-
use object_store::util::{join_dir, normalize_dir};
31+
use object_store::util::normalize_dir;
3232
use snafu::{OptionExt, ResultExt, ensure};
3333
use store_api::logstore::LogStore;
3434
use store_api::logstore::provider::Provider;
@@ -49,8 +49,7 @@ use crate::error::{
4949
Result, StaleLogEntrySnafu,
5050
};
5151
use crate::manifest::action::RegionManifest;
52-
use crate::manifest::manager::{RegionManifestManager, RegionManifestOptions, RemoveFileOptions};
53-
use crate::manifest::storage::manifest_compress_type;
52+
use crate::manifest::manager::{RegionManifestManager, RegionManifestOptions};
5453
use crate::memtable::MemtableBuilderProvider;
5554
use crate::memtable::bulk::part::BulkPart;
5655
use crate::memtable::time_partition::{TimePartitions, TimePartitionsRef};
@@ -272,7 +271,7 @@ impl RegionOpener {
272271
};
273272
// Create a manifest manager for this region and writes regions to the manifest file.
274273
let region_manifest_options =
275-
Self::manifest_options(config, &options, &region_dir, &self.object_store_manager)?;
274+
RegionManifestOptions::new(config, &region_dir, &object_store);
276275
// For remote WAL, we need to set flushed_entry_id to current topic's latest entry id.
277276
let flushed_entry_id = provider.initial_flushed_entry_id::<S>(wal.store());
278277
let manifest_manager = RegionManifestManager::new(
@@ -406,13 +405,9 @@ impl RegionOpener {
406405
) -> Result<Option<MitoRegionRef>> {
407406
let now = Instant::now();
408407
let mut region_options = self.options.as_ref().unwrap().clone();
409-
410-
let region_manifest_options = Self::manifest_options(
411-
config,
412-
&region_options,
413-
&self.region_dir(),
414-
&self.object_store_manager,
415-
)?;
408+
let object_storage = get_object_store(&region_options.storage, &self.object_store_manager)?;
409+
let region_manifest_options =
410+
RegionManifestOptions::new(config, &self.region_dir(), &object_storage);
416411
let Some(manifest_manager) =
417412
RegionManifestManager::open(region_manifest_options, &self.stats).await?
418413
else {
@@ -576,27 +571,6 @@ impl RegionOpener {
576571

577572
Ok(Some(region))
578573
}
579-
580-
/// Returns a new manifest options.
581-
fn manifest_options(
582-
config: &MitoConfig,
583-
options: &RegionOptions,
584-
region_dir: &str,
585-
object_store_manager: &ObjectStoreManagerRef,
586-
) -> Result<RegionManifestOptions> {
587-
let object_store = get_object_store(&options.storage, object_store_manager)?;
588-
Ok(RegionManifestOptions {
589-
manifest_dir: new_manifest_dir(region_dir),
590-
object_store,
591-
// We don't allow users to set the compression algorithm as we use it as a file suffix.
592-
// Currently, the manifest storage doesn't have good support for changing compression algorithms.
593-
compress_type: manifest_compress_type(config.compress_manifest),
594-
checkpoint_distance: config.manifest_checkpoint_distance,
595-
remove_file_options: RemoveFileOptions {
596-
enable_gc: config.gc.enable,
597-
},
598-
})
599-
}
600574
}
601575

602576
/// Creates a version builder from a region manifest.
@@ -678,12 +652,9 @@ impl RegionMetadataLoader {
678652
region_dir: &str,
679653
region_options: &RegionOptions,
680654
) -> Result<Option<Arc<RegionManifest>>> {
681-
let region_manifest_options = RegionOpener::manifest_options(
682-
&self.config,
683-
region_options,
684-
region_dir,
685-
&self.object_store_manager,
686-
)?;
655+
let object_store = get_object_store(&region_options.storage, &self.object_store_manager)?;
656+
let region_manifest_options =
657+
RegionManifestOptions::new(&self.config, region_dir, &object_store);
687658
let Some(manifest_manager) =
688659
RegionManifestManager::open(region_manifest_options, &Default::default()).await?
689660
else {
@@ -848,11 +819,6 @@ where
848819
Ok(last_entry_id)
849820
}
850821

851-
/// Returns the directory to the manifest files.
852-
pub(crate) fn new_manifest_dir(region_dir: &str) -> String {
853-
join_dir(region_dir, "manifest")
854-
}
855-
856822
/// A task to load and fill the region file cache.
857823
pub(crate) struct RegionLoadCacheTask {
858824
region: MitoRegionRef,

0 commit comments

Comments
 (0)