Skip to content

Commit 96fc8ab

Browse files
feat: use OceanBase as object store (#887)
## Related Issues Closes # ## Detailed Changes OceanBase is a highly reliable, distributed relational database. Currently, it can be used as a write-ahead log (WAL) for CeresDB. If OceanBase can be used as an object store for CeresDB, it would decrease CeresDB's dependencies and improve stability. And this change set implements the trait `ObjectStore` based on OceanBase. ## Test Plan Add new unit tests for the new module. --------- Co-authored-by: WEI Xikai <[email protected]>
1 parent b11574c commit 96fc8ab

File tree

12 files changed

+1612
-16
lines changed

12 files changed

+1612
-16
lines changed

Cargo.lock

Lines changed: 6 additions & 3 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

analytic_engine/src/setup.rs

Lines changed: 29 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -5,14 +5,15 @@
55
use std::{num::NonZeroUsize, path::Path, pin::Pin, sync::Arc};
66

77
use async_trait::async_trait;
8-
use common_util::{define_result, runtime::Runtime};
8+
use common_util::define_result;
99
use futures::Future;
1010
use message_queue::kafka::kafka_impl::KafkaImpl;
1111
use object_store::{
1212
aliyun,
1313
disk_cache::DiskCacheStore,
1414
mem_cache::{MemCache, MemCacheStore},
1515
metrics::StoreWithMetrics,
16+
obkv,
1617
prefix::StoreWithPrefix,
1718
LocalFileSystem, ObjectStoreRef,
1819
};
@@ -110,11 +111,8 @@ pub struct EngineBuilder<'a> {
110111

111112
impl<'a> EngineBuilder<'a> {
112113
pub async fn build(self) -> Result<TableEngineRef> {
113-
let opened_storages = open_storage(
114-
self.config.storage.clone(),
115-
self.engine_runtimes.io_runtime.clone(),
116-
)
117-
.await?;
114+
let opened_storages =
115+
open_storage(self.config.storage.clone(), self.engine_runtimes.clone()).await?;
118116
let manifest_storages = ManifestStorages {
119117
wal_manager: self.opened_wals.manifest_wal.clone(),
120118
oss_storage: opened_storages.default_store().clone(),
@@ -412,7 +410,7 @@ impl ObjectStorePicker for OpenedStorages {
412410
// ```
413411
fn open_storage(
414412
opts: StorageOptions,
415-
runtime: Arc<Runtime>,
413+
engine_runtimes: Arc<EngineRuntimes>,
416414
) -> Pin<Box<dyn Future<Output = Result<OpenedStorages>> + Send>> {
417415
Box::pin(async move {
418416
let mut store = match opts.object_store {
@@ -444,6 +442,26 @@ fn open_storage(
444442
let store_with_prefix = StoreWithPrefix::new(aliyun_opts.prefix, oss);
445443
Arc::new(store_with_prefix.context(OpenObjectStore)?) as _
446444
}
445+
ObjectStoreOptions::Obkv(obkv_opts) => {
446+
let obkv_config = obkv_opts.client;
447+
let obkv = engine_runtimes
448+
.write_runtime
449+
.spawn_blocking(move || ObkvImpl::new(obkv_config).context(OpenObkv))
450+
.await
451+
.context(RuntimeExec)??;
452+
453+
let oss: ObjectStoreRef = Arc::new(
454+
obkv::ObkvObjectStore::try_new(
455+
Arc::new(obkv),
456+
obkv_opts.shard_num,
457+
obkv_opts.part_size.0 as usize,
458+
obkv_opts.max_object_size.0 as usize,
459+
obkv_opts.upload_parallelism,
460+
)
461+
.context(OpenObjectStore)?,
462+
);
463+
Arc::new(StoreWithPrefix::new(obkv_opts.prefix, oss).context(OpenObjectStore)?) as _
464+
}
447465
};
448466

449467
if opts.disk_cache_capacity.as_byte() > 0 {
@@ -464,7 +482,10 @@ fn open_storage(
464482
) as _;
465483
}
466484

467-
store = Arc::new(StoreWithMetrics::new(store, runtime));
485+
store = Arc::new(StoreWithMetrics::new(
486+
store,
487+
engine_runtimes.io_runtime.clone(),
488+
));
468489

469490
if opts.mem_cache_capacity.as_byte() > 0 {
470491
let mem_cache = Arc::new(

analytic_engine/src/storage_options.rs

Lines changed: 36 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@ use std::time::Duration;
44

55
use common_util::config::{ReadableDuration, ReadableSize};
66
use serde::{Deserialize, Serialize};
7+
use table_kv::config::ObkvConfig;
78

89
#[derive(Debug, Clone, Deserialize, Serialize)]
910
#[serde(default)]
@@ -39,9 +40,11 @@ impl Default for StorageOptions {
3940

4041
#[derive(Debug, Clone, Deserialize, Serialize)]
4142
#[serde(tag = "type")]
43+
#[allow(clippy::large_enum_variant)]
4244
pub enum ObjectStoreOptions {
4345
Local(LocalOptions),
4446
Aliyun(AliyunOptions),
47+
Obkv(ObkvOptions),
4548
}
4649

4750
#[derive(Debug, Clone, Deserialize, Serialize)]
@@ -83,3 +86,36 @@ impl AliyunOptions {
8386
ReadableDuration::from(Duration::from_secs(2))
8487
}
8588
}
89+
90+
#[derive(Debug, Clone, Serialize, Deserialize)]
91+
pub struct ObkvOptions {
92+
pub prefix: String,
93+
#[serde(default = "ObkvOptions::default_shard_num")]
94+
pub shard_num: usize,
95+
#[serde(default = "ObkvOptions::default_part_size")]
96+
pub part_size: ReadableSize,
97+
#[serde(default = "ObkvOptions::default_max_object_size")]
98+
pub max_object_size: ReadableSize,
99+
#[serde(default = "ObkvOptions::default_upload_parallelism")]
100+
pub upload_parallelism: usize,
101+
/// Obkv client config
102+
pub client: ObkvConfig,
103+
}
104+
105+
impl ObkvOptions {
106+
fn default_max_object_size() -> ReadableSize {
107+
ReadableSize::gb(1)
108+
}
109+
110+
fn default_part_size() -> ReadableSize {
111+
ReadableSize::mb(1)
112+
}
113+
114+
fn default_shard_num() -> usize {
115+
512
116+
}
117+
118+
fn default_upload_parallelism() -> usize {
119+
8
120+
}
121+
}

components/object_store/Cargo.toml

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -25,10 +25,13 @@ lru = { workspace = true }
2525
prometheus = { workspace = true }
2626
prometheus-static-metric = { workspace = true }
2727
prost = { workspace = true }
28+
rand = { workspace = true }
2829
serde = { workspace = true }
2930
serde_json = { workspace = true }
3031
snafu = { workspace = true }
32+
table_kv = { workspace = true }
3133
tokio = { workspace = true }
34+
twox-hash = "1.6"
3235
upstream = { package = "object_store", version = "0.5.6", features = [ "aws" ] }
3336

3437
[dev-dependencies]

components/object_store/src/lib.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@ pub mod disk_cache;
1414
pub mod mem_cache;
1515
pub mod metrics;
1616
pub mod multipart;
17+
pub mod obkv;
1718
pub mod prefix;
1819

1920
pub type ObjectStoreRef = Arc<dyn ObjectStore>;

0 commit comments

Comments
 (0)