Skip to content

Commit 938d757

Browse files
authored
feat: expose SST index metadata via information schema (#7044)
Signed-off-by: Zhenchi <[email protected]>
1 parent 855eb54 commit 938d757

File tree

11 files changed

+238
-20
lines changed

11 files changed

+238
-20
lines changed

src/catalog/src/system_schema/information_schema.rs

Lines changed: 12 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -48,7 +48,7 @@ use datatypes::schema::SchemaRef;
4848
use lazy_static::lazy_static;
4949
use paste::paste;
5050
use process_list::InformationSchemaProcessList;
51-
use store_api::sst_entry::{ManifestSstEntry, StorageSstEntry};
51+
use store_api::sst_entry::{ManifestSstEntry, PuffinIndexMetaEntry, StorageSstEntry};
5252
use store_api::storage::{ScanRequest, TableId};
5353
use table::TableRef;
5454
use table::metadata::TableType;
@@ -68,7 +68,7 @@ use crate::system_schema::information_schema::region_peers::InformationSchemaReg
6868
use crate::system_schema::information_schema::runtime_metrics::InformationSchemaMetrics;
6969
use crate::system_schema::information_schema::schemata::InformationSchemaSchemata;
7070
use crate::system_schema::information_schema::ssts::{
71-
InformationSchemaSstsManifest, InformationSchemaSstsStorage,
71+
InformationSchemaSstsIndexMeta, InformationSchemaSstsManifest, InformationSchemaSstsStorage,
7272
};
7373
use crate::system_schema::information_schema::table_constraints::InformationSchemaTableConstraints;
7474
use crate::system_schema::information_schema::tables::InformationSchemaTables;
@@ -263,6 +263,9 @@ impl SystemSchemaProviderInner for InformationSchemaProvider {
263263
SSTS_STORAGE => Some(Arc::new(InformationSchemaSstsStorage::new(
264264
self.catalog_manager.clone(),
265265
)) as _),
266+
SSTS_INDEX_META => Some(Arc::new(InformationSchemaSstsIndexMeta::new(
267+
self.catalog_manager.clone(),
268+
)) as _),
266269
_ => None,
267270
}
268271
}
@@ -342,6 +345,10 @@ impl InformationSchemaProvider {
342345
SSTS_STORAGE.to_string(),
343346
self.build_table(SSTS_STORAGE).unwrap(),
344347
);
348+
tables.insert(
349+
SSTS_INDEX_META.to_string(),
350+
self.build_table(SSTS_INDEX_META).unwrap(),
351+
);
345352
}
346353

347354
tables.insert(TABLES.to_string(), self.build_table(TABLES).unwrap());
@@ -456,6 +463,8 @@ pub enum DatanodeInspectKind {
456463
SstManifest,
457464
/// List SST entries discovered in storage layer
458465
SstStorage,
466+
/// List index metadata collected from manifest
467+
SstIndexMeta,
459468
}
460469

461470
impl DatanodeInspectRequest {
@@ -464,6 +473,7 @@ impl DatanodeInspectRequest {
464473
match self.kind {
465474
DatanodeInspectKind::SstManifest => ManifestSstEntry::build_plan(self.scan),
466475
DatanodeInspectKind::SstStorage => StorageSstEntry::build_plan(self.scan),
476+
DatanodeInspectKind::SstIndexMeta => PuffinIndexMetaEntry::build_plan(self.scan),
467477
}
468478
}
469479
}

src/catalog/src/system_schema/information_schema/ssts.rs

Lines changed: 60 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -15,20 +15,22 @@
1515
use std::sync::{Arc, Weak};
1616

1717
use common_catalog::consts::{
18-
INFORMATION_SCHEMA_SSTS_MANIFEST_TABLE_ID, INFORMATION_SCHEMA_SSTS_STORAGE_TABLE_ID,
18+
INFORMATION_SCHEMA_SSTS_INDEX_META_TABLE_ID, INFORMATION_SCHEMA_SSTS_MANIFEST_TABLE_ID,
19+
INFORMATION_SCHEMA_SSTS_STORAGE_TABLE_ID,
1920
};
2021
use common_error::ext::BoxedError;
2122
use common_recordbatch::SendableRecordBatchStream;
2223
use common_recordbatch::adapter::AsyncRecordBatchStreamAdapter;
2324
use datatypes::schema::SchemaRef;
2425
use snafu::ResultExt;
25-
use store_api::sst_entry::{ManifestSstEntry, StorageSstEntry};
26+
use store_api::sst_entry::{ManifestSstEntry, PuffinIndexMetaEntry, StorageSstEntry};
2627
use store_api::storage::{ScanRequest, TableId};
2728

2829
use crate::CatalogManager;
2930
use crate::error::{ProjectSchemaSnafu, Result};
3031
use crate::information_schema::{
31-
DatanodeInspectKind, DatanodeInspectRequest, InformationTable, SSTS_MANIFEST, SSTS_STORAGE,
32+
DatanodeInspectKind, DatanodeInspectRequest, InformationTable, SSTS_INDEX_META, SSTS_MANIFEST,
33+
SSTS_STORAGE,
3234
};
3335
use crate::system_schema::utils;
3436

@@ -140,3 +142,58 @@ impl InformationTable for InformationSchemaSstsStorage {
140142
)))
141143
}
142144
}
145+
146+
/// Information schema table for index metadata.
147+
pub struct InformationSchemaSstsIndexMeta {
148+
schema: SchemaRef,
149+
catalog_manager: Weak<dyn CatalogManager>,
150+
}
151+
152+
impl InformationSchemaSstsIndexMeta {
153+
pub(super) fn new(catalog_manager: Weak<dyn CatalogManager>) -> Self {
154+
Self {
155+
schema: PuffinIndexMetaEntry::schema(),
156+
catalog_manager,
157+
}
158+
}
159+
}
160+
161+
impl InformationTable for InformationSchemaSstsIndexMeta {
162+
fn table_id(&self) -> TableId {
163+
INFORMATION_SCHEMA_SSTS_INDEX_META_TABLE_ID
164+
}
165+
166+
fn table_name(&self) -> &'static str {
167+
SSTS_INDEX_META
168+
}
169+
170+
fn schema(&self) -> SchemaRef {
171+
self.schema.clone()
172+
}
173+
174+
fn to_stream(&self, request: ScanRequest) -> Result<SendableRecordBatchStream> {
175+
let schema = if let Some(p) = &request.projection {
176+
Arc::new(self.schema.try_project(p).context(ProjectSchemaSnafu)?)
177+
} else {
178+
self.schema.clone()
179+
};
180+
181+
let info_ext = utils::information_extension(&self.catalog_manager)?;
182+
let req = DatanodeInspectRequest {
183+
kind: DatanodeInspectKind::SstIndexMeta,
184+
scan: request,
185+
};
186+
187+
let future = async move {
188+
info_ext
189+
.inspect_datanode(req)
190+
.await
191+
.map_err(BoxedError::new)
192+
.context(common_recordbatch::error::ExternalSnafu)
193+
};
194+
Ok(Box::pin(AsyncRecordBatchStreamAdapter::new(
195+
schema,
196+
Box::pin(future),
197+
)))
198+
}
199+
}

src/catalog/src/system_schema/information_schema/table_names.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -50,3 +50,4 @@ pub const REGION_STATISTICS: &str = "region_statistics";
5050
pub const PROCESS_LIST: &str = "process_list";
5151
pub const SSTS_MANIFEST: &str = "ssts_manifest";
5252
pub const SSTS_STORAGE: &str = "ssts_storage";
53+
pub const SSTS_INDEX_META: &str = "ssts_index_meta";

src/common/catalog/src/consts.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -108,6 +108,8 @@ pub const INFORMATION_SCHEMA_PROCESS_LIST_TABLE_ID: u32 = 36;
108108
pub const INFORMATION_SCHEMA_SSTS_MANIFEST_TABLE_ID: u32 = 37;
109109
/// id for information_schema.ssts_storage
110110
pub const INFORMATION_SCHEMA_SSTS_STORAGE_TABLE_ID: u32 = 38;
111+
/// id for information_schema.ssts_index_meta
112+
pub const INFORMATION_SCHEMA_SSTS_INDEX_META_TABLE_ID: u32 = 39;
111113

112114
// ----- End of information_schema tables -----
113115

src/datanode/src/region_server/catalog.rs

Lines changed: 26 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -27,18 +27,20 @@ use datafusion_expr::{LogicalPlan, TableSource};
2727
use futures::TryStreamExt;
2828
use session::context::QueryContextRef;
2929
use snafu::{OptionExt, ResultExt};
30-
use store_api::sst_entry::{ManifestSstEntry, StorageSstEntry};
30+
use store_api::sst_entry::{ManifestSstEntry, PuffinIndexMetaEntry, StorageSstEntry};
3131
use store_api::storage::RegionId;
3232

3333
use crate::error::{DataFusionSnafu, ListStorageSstsSnafu, Result, UnexpectedSnafu};
3434
use crate::region_server::RegionServer;
3535

3636
/// Reserved internal table kinds used.
3737
/// These are recognized by reserved table names and mapped to providers.
38+
#[allow(clippy::enum_variant_names)]
3839
#[derive(Clone, Debug, PartialEq, Eq, Hash, Copy)]
3940
enum InternalTableKind {
4041
InspectSstManifest,
4142
InspectSstStorage,
43+
InspectSstIndexMeta,
4244
}
4345

4446
impl InternalTableKind {
@@ -50,6 +52,9 @@ impl InternalTableKind {
5052
if name.eq_ignore_ascii_case(StorageSstEntry::reserved_table_name_for_inspection()) {
5153
return Some(Self::InspectSstStorage);
5254
}
55+
if name.eq_ignore_ascii_case(PuffinIndexMetaEntry::reserved_table_name_for_inspection()) {
56+
return Some(Self::InspectSstIndexMeta);
57+
}
5358
None
5459
}
5560

@@ -58,6 +63,7 @@ impl InternalTableKind {
5863
match self {
5964
Self::InspectSstManifest => server.inspect_sst_manifest_provider().await,
6065
Self::InspectSstStorage => server.inspect_sst_storage_provider().await,
66+
Self::InspectSstIndexMeta => server.inspect_sst_index_meta_provider().await,
6167
}
6268
}
6369
}
@@ -103,6 +109,25 @@ impl RegionServer {
103109
let table = MemTable::try_new(schema, vec![vec![batch]]).context(DataFusionSnafu)?;
104110
Ok(Arc::new(table))
105111
}
112+
113+
/// Expose index metadata across the engine as an in-memory table.
114+
pub async fn inspect_sst_index_meta_provider(&self) -> Result<Arc<dyn TableProvider>> {
115+
let mito = {
116+
let guard = self.inner.mito_engine.read().unwrap();
117+
guard.as_ref().cloned().context(UnexpectedSnafu {
118+
violated: "mito engine not available",
119+
})?
120+
};
121+
122+
let entries = mito.all_index_metas().await;
123+
let schema = PuffinIndexMetaEntry::schema().arrow_schema().clone();
124+
let batch = PuffinIndexMetaEntry::to_record_batch(&entries)
125+
.map_err(DataFusionError::from)
126+
.context(DataFusionSnafu)?;
127+
128+
let table = MemTable::try_new(schema, vec![vec![batch]]).context(DataFusionSnafu)?;
129+
Ok(Arc::new(table))
130+
}
106131
}
107132

108133
/// A catalog list that resolves `TableProvider` by table name:

src/store-api/src/sst_entry.rs

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -356,6 +356,15 @@ impl PuffinIndexMetaEntry {
356356
pub fn reserved_table_name_for_inspection() -> &'static str {
357357
"__inspect/__mito/__puffin_index_meta"
358358
}
359+
360+
/// Builds a logical plan for scanning puffin index metadata entries.
361+
pub fn build_plan(scan_request: ScanRequest) -> Result<LogicalPlan, DataFusionError> {
362+
build_plan_helper(
363+
scan_request,
364+
Self::reserved_table_name_for_inspection(),
365+
Self::schema(),
366+
)
367+
}
359368
}
360369

361370
fn build_plan_helper(

0 commit comments

Comments
 (0)