Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(mito): add bloom filter read metrics #5239

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions src/mito2/src/read/prune.rs
Original file line number Diff line number Diff line change
Expand Up @@ -113,13 +113,13 @@ impl PruneReader {
let num_rows_before_filter = batch.num_rows();
let Some(batch_filtered) = self.context.precise_filter(batch)? else {
// the entire batch is filtered out
self.metrics.filter_metrics.num_rows_precise_filtered += num_rows_before_filter;
self.metrics.filter_metrics.rows_precise_filtered += num_rows_before_filter;
return Ok(None);
};

// update metric
let filtered_rows = num_rows_before_filter - batch_filtered.num_rows();
self.metrics.filter_metrics.num_rows_precise_filtered += filtered_rows;
self.metrics.filter_metrics.rows_precise_filtered += filtered_rows;

if !batch_filtered.is_empty() {
Ok(Some(batch_filtered))
Expand Down
138 changes: 77 additions & 61 deletions src/mito2/src/sst/parquet/reader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -350,8 +350,8 @@ impl ParquetReaderBuilder {
return BTreeMap::default();
}

metrics.num_row_groups_before_filtering += num_row_groups;
metrics.num_rows_in_row_group_before_filtering += num_rows as usize;
metrics.rg_total += num_row_groups;
metrics.rows_total += num_rows as usize;

let mut output = (0..num_row_groups).map(|i| (i, None)).collect();

Expand Down Expand Up @@ -398,7 +398,7 @@ impl ParquetReaderBuilder {
Err(err) => {
if cfg!(any(test, feature = "test")) {
panic!(
"Failed to apply full-text index, region_id: {}, file_id: {}, err: {}",
"Failed to apply full-text index, region_id: {}, file_id: {}, err: {:?}",
self.file_handle.region_id(),
self.file_handle.file_id(),
err
Expand All @@ -420,8 +420,8 @@ impl ParquetReaderBuilder {
parquet_meta,
row_group_to_row_ids,
output,
&mut metrics.num_row_groups_fulltext_index_filtered,
&mut metrics.num_rows_in_row_group_fulltext_index_filtered,
&mut metrics.rg_fulltext_filtered,
&mut metrics.rows_fulltext_filtered,
);

true
Expand Down Expand Up @@ -482,7 +482,7 @@ impl ParquetReaderBuilder {
Err(err) => {
if cfg!(any(test, feature = "test")) {
panic!(
"Failed to apply inverted index, region_id: {}, file_id: {}, err: {}",
"Failed to apply inverted index, region_id: {}, file_id: {}, err: {:?}",
self.file_handle.region_id(),
self.file_handle.file_id(),
err
Expand Down Expand Up @@ -521,8 +521,8 @@ impl ParquetReaderBuilder {
parquet_meta,
ranges_in_row_groups,
output,
&mut metrics.num_row_groups_inverted_index_filtered,
&mut metrics.num_rows_in_row_group_inverted_index_filtered,
&mut metrics.rg_inverted_filtered,
&mut metrics.rows_inverted_filtered,
);

true
Expand Down Expand Up @@ -564,7 +564,7 @@ impl ParquetReaderBuilder {
.collect::<BTreeMap<_, _>>();

let row_groups_after = res.len();
metrics.num_row_groups_min_max_filtered += row_groups_before - row_groups_after;
metrics.rg_minmax_filtered += row_groups_before - row_groups_after;

*output = res;
true
Expand Down Expand Up @@ -627,7 +627,7 @@ impl ParquetReaderBuilder {
&self,
parquet_meta: &ParquetMetaData,
output: &mut BTreeMap<usize, Option<RowSelection>>,
_metrics: &mut ReaderFilterMetrics,
metrics: &mut ReaderFilterMetrics,
) -> bool {
let Some(index_applier) = &self.bloom_filter_index_applier else {
return false;
Expand All @@ -637,8 +637,10 @@ impl ParquetReaderBuilder {
return false;
}

let before_rg = output.len();

let file_size_hint = self.file_handle.meta_ref().bloom_filter_index_size();
match index_applier
if let Err(err) = index_applier
.apply(
self.file_handle.file_id(),
file_size_hint,
Expand All @@ -647,26 +649,27 @@ impl ParquetReaderBuilder {
)
.await
{
Ok(output) => output,
Err(err) => {
if cfg!(any(test, feature = "test")) {
panic!(
"Failed to apply bloom filter index, region_id: {}, file_id: {}, err: {}",
self.file_handle.region_id(),
self.file_handle.file_id(),
err
);
} else {
warn!(
err; "Failed to apply bloom filter index, region_id: {}, file_id: {}",
self.file_handle.region_id(), self.file_handle.file_id()
);
}

return false;
if cfg!(any(test, feature = "test")) {
panic!(
"Failed to apply bloom filter index, region_id: {}, file_id: {}, err: {:?}",
self.file_handle.region_id(),
self.file_handle.file_id(),
err
);
} else {
warn!(
err; "Failed to apply bloom filter index, region_id: {}, file_id: {}",
self.file_handle.region_id(), self.file_handle.file_id()
);
}

return false;
};

let after_rg = output.len();
// Update metrics.
metrics.rg_bloom_filtered += before_rg - after_rg;

true
}

Expand Down Expand Up @@ -727,64 +730,77 @@ impl ParquetReaderBuilder {
#[derive(Debug, Default, Clone, Copy)]
pub(crate) struct ReaderFilterMetrics {
/// Number of row groups before filtering.
pub(crate) num_row_groups_before_filtering: usize,
pub(crate) rg_total: usize,
/// Number of row groups filtered by fulltext index.
pub(crate) num_row_groups_fulltext_index_filtered: usize,
pub(crate) rg_fulltext_filtered: usize,
/// Number of row groups filtered by inverted index.
pub(crate) num_row_groups_inverted_index_filtered: usize,
pub(crate) rg_inverted_filtered: usize,
/// Number of row groups filtered by min-max index.
pub(crate) num_row_groups_min_max_filtered: usize,
/// Number of rows filtered by precise filter.
pub(crate) num_rows_precise_filtered: usize,
pub(crate) rg_minmax_filtered: usize,
/// Number of row groups filtered by bloom filter index.
pub(crate) rg_bloom_filtered: usize,

/// Number of rows in row group before filtering.
pub(crate) num_rows_in_row_group_before_filtering: usize,
pub(crate) rows_total: usize,
/// Number of rows in row group filtered by fulltext index.
pub(crate) num_rows_in_row_group_fulltext_index_filtered: usize,
pub(crate) rows_fulltext_filtered: usize,
/// Number of rows in row group filtered by inverted index.
pub(crate) num_rows_in_row_group_inverted_index_filtered: usize,
pub(crate) rows_inverted_filtered: usize,
/// Number of rows in row group filtered by bloom filter index.
pub(crate) rows_bloom_filtered: usize,
/// Number of rows filtered by precise filter.
pub(crate) rows_precise_filtered: usize,
}

impl ReaderFilterMetrics {
/// Adds `other` metrics to this metrics.
pub(crate) fn merge_from(&mut self, other: &ReaderFilterMetrics) {
self.num_row_groups_before_filtering += other.num_row_groups_before_filtering;
self.num_row_groups_fulltext_index_filtered += other.num_row_groups_fulltext_index_filtered;
self.num_row_groups_inverted_index_filtered += other.num_row_groups_inverted_index_filtered;
self.num_row_groups_min_max_filtered += other.num_row_groups_min_max_filtered;
self.num_rows_precise_filtered += other.num_rows_precise_filtered;
self.num_rows_in_row_group_before_filtering += other.num_rows_in_row_group_before_filtering;
self.num_rows_in_row_group_fulltext_index_filtered +=
other.num_rows_in_row_group_fulltext_index_filtered;
self.num_rows_in_row_group_inverted_index_filtered +=
other.num_rows_in_row_group_inverted_index_filtered;
self.rg_total += other.rg_total;
self.rg_fulltext_filtered += other.rg_fulltext_filtered;
self.rg_inverted_filtered += other.rg_inverted_filtered;
self.rg_minmax_filtered += other.rg_minmax_filtered;
self.rg_bloom_filtered += other.rg_bloom_filtered;

self.rows_total += other.rows_total;
self.rows_fulltext_filtered += other.rows_fulltext_filtered;
self.rows_inverted_filtered += other.rows_inverted_filtered;
self.rows_bloom_filtered += other.rows_bloom_filtered;
self.rows_precise_filtered += other.rows_precise_filtered;
}

/// Reports metrics.
pub(crate) fn observe(&self) {
READ_ROW_GROUPS_TOTAL
.with_label_values(&["before_filtering"])
.inc_by(self.num_row_groups_before_filtering as u64);
.inc_by(self.rg_total as u64);
READ_ROW_GROUPS_TOTAL
.with_label_values(&["fulltext_index_filtered"])
.inc_by(self.num_row_groups_fulltext_index_filtered as u64);
.inc_by(self.rg_fulltext_filtered as u64);
READ_ROW_GROUPS_TOTAL
.with_label_values(&["inverted_index_filtered"])
.inc_by(self.num_row_groups_inverted_index_filtered as u64);
.inc_by(self.rg_inverted_filtered as u64);
READ_ROW_GROUPS_TOTAL
.with_label_values(&["minmax_index_filtered"])
.inc_by(self.num_row_groups_min_max_filtered as u64);
.inc_by(self.rg_minmax_filtered as u64);
READ_ROW_GROUPS_TOTAL
.with_label_values(&["bloom_filter_index_filtered"])
.inc_by(self.rg_bloom_filtered as u64);

PRECISE_FILTER_ROWS_TOTAL
.with_label_values(&["parquet"])
.inc_by(self.num_rows_precise_filtered as u64);
.inc_by(self.rows_precise_filtered as u64);
READ_ROWS_IN_ROW_GROUP_TOTAL
.with_label_values(&["before_filtering"])
.inc_by(self.num_rows_in_row_group_before_filtering as u64);
.inc_by(self.rows_total as u64);
READ_ROWS_IN_ROW_GROUP_TOTAL
.with_label_values(&["fulltext_index_filtered"])
.inc_by(self.num_rows_in_row_group_fulltext_index_filtered as u64);
.inc_by(self.rows_fulltext_filtered as u64);
READ_ROWS_IN_ROW_GROUP_TOTAL
.with_label_values(&["inverted_index_filtered"])
.inc_by(self.num_rows_in_row_group_inverted_index_filtered as u64);
.inc_by(self.rows_inverted_filtered as u64);
READ_ROWS_IN_ROW_GROUP_TOTAL
.with_label_values(&["bloom_filter_index_filtered"])
.inc_by(self.rows_bloom_filtered as u64);
}
}

Expand Down Expand Up @@ -1040,12 +1056,12 @@ impl Drop for ParquetReader {
self.context.reader_builder().file_handle.region_id(),
self.context.reader_builder().file_handle.file_id(),
self.context.reader_builder().file_handle.time_range(),
metrics.filter_metrics.num_row_groups_before_filtering
- metrics
.filter_metrics
.num_row_groups_inverted_index_filtered
- metrics.filter_metrics.num_row_groups_min_max_filtered,
metrics.filter_metrics.num_row_groups_before_filtering,
metrics.filter_metrics.rg_total
- metrics.filter_metrics.rg_inverted_filtered
- metrics.filter_metrics.rg_minmax_filtered
- metrics.filter_metrics.rg_fulltext_filtered
- metrics.filter_metrics.rg_bloom_filtered,
metrics.filter_metrics.rg_total,
metrics
);

Expand Down
Loading