Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
24 changes: 20 additions & 4 deletions be/src/exec/common/variant_util.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -280,6 +280,10 @@ bool should_materialize_nested_group_regular_subcolumns(
(info_it != uid_to_variant_extended_info.end() && info_it->second.has_nested_group);
}

bool can_skip_missing_physical_column(const TabletColumn& column) {
return column.has_default_value() || column.is_nullable();
}

std::unordered_set<int32_t> collect_nested_group_compaction_root_uids(
const TabletSchemaSPtr& target,
const std::unordered_map<int32_t, VariantExtendedInfo>& uid_to_variant_extended_info) {
Expand Down Expand Up @@ -865,8 +869,14 @@ Status VariantCompactionUtil::aggregate_path_to_stats(
for (const auto& segment : segment_cache.get_segments()) {
std::shared_ptr<ColumnReader> column_reader;
OlapReaderStatistics stats;
RETURN_IF_ERROR(
segment->get_column_reader(column->unique_id(), &column_reader, &stats));
auto st = segment->get_column_reader(column->unique_id(), &column_reader, &stats);
if (st.is<ErrorCode::NOT_FOUND>()) {
if (!can_skip_missing_physical_column(*column)) {
return st;
}
continue;
}
RETURN_IF_ERROR(st);
if (!column_reader) {
continue;
}
Expand Down Expand Up @@ -910,8 +920,14 @@ Status VariantCompactionUtil::aggregate_variant_extended_info(
for (const auto& segment : segment_cache.get_segments()) {
std::shared_ptr<ColumnReader> column_reader;
OlapReaderStatistics stats;
RETURN_IF_ERROR(
segment->get_column_reader(column->unique_id(), &column_reader, &stats));
auto st = segment->get_column_reader(column->unique_id(), &column_reader, &stats);
if (st.is<ErrorCode::NOT_FOUND>()) {
if (!can_skip_missing_physical_column(*column)) {
return st;
}
continue;
}
RETURN_IF_ERROR(st);
if (!column_reader) {
continue;
}
Expand Down
2 changes: 2 additions & 0 deletions be/src/storage/index/inverted/inverted_index_iterator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@ void InvertedIndexIterator::add_reader(InvertedIndexReaderType type,
}

Status InvertedIndexIterator::read_from_index(const IndexParam& param) {
_last_read_index_id = -1;
const auto* i_param_ptr = std::get_if<InvertedIndexParam*>(&param);
if (i_param_ptr == nullptr) {
return Status::Error<ErrorCode::INDEX_INVALID_PARAMETERS>(
Expand All @@ -79,6 +80,7 @@ Status InvertedIndexIterator::read_from_index(const IndexParam& param) {
return Status::Error<ErrorCode::INVERTED_INDEX_CLUCENE_ERROR>(
"inverted index reader is null");
}
_last_read_index_id = reader->get_index_id();
auto* runtime_state = _context->runtime_state;
if (!i_param->skip_try && reader->type() == InvertedIndexReaderType::BKD) {
if (runtime_state != nullptr &&
Expand Down
5 changes: 4 additions & 1 deletion be/src/storage/index/inverted/inverted_index_iterator.h
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,8 @@ class InvertedIndexIterator : public IndexIterator {
[[nodiscard]] Result<InvertedIndexReaderPtr> select_best_reader(
const std::string& analyzer_key);

[[nodiscard]] int64_t last_read_index_id() const { return _last_read_index_id; }

private:
ENABLE_FACTORY_CREATOR(InvertedIndexIterator);

Expand Down Expand Up @@ -98,10 +100,11 @@ class InvertedIndexIterator : public IndexIterator {
// These two phases are guaranteed not to overlap, so no synchronization is needed.
// Do NOT call add_reader() after any read_from_index() call on the same iterator.
std::vector<ReaderEntry> _reader_entries;
int64_t _last_read_index_id = -1;

// Index for O(1) lookup by analyzer_key. Maps normalized key to indices in _reader_entries.
// Built incrementally in add_reader().
std::unordered_map<std::string, std::vector<size_t>> _key_to_entries;
};

} // namespace doris::segment_v2
} // namespace doris::segment_v2
42 changes: 42 additions & 0 deletions be/src/storage/olap_common.h
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
#pragma once

#include <gen_cpp/Types_types.h>
#include <gen_cpp/olap_file.pb.h>
#include <netinet/in.h>

#include <atomic>
Expand All @@ -27,13 +28,15 @@
#include <list>
#include <map>
#include <memory>
#include <optional>
#include <ostream>
#include <sstream>
#include <string>
#include <typeinfo>
#include <unordered_map>
#include <unordered_set>
#include <utility>
#include <vector>

#include "common/cast_set.h"
#include "common/config.h"
Expand Down Expand Up @@ -112,6 +115,43 @@ struct TabletInfo {
UniqueId tablet_uid;
};

enum class IndexProbeSource {
COLUMN_PREDICATE,
EXPR_PUSHDOWN,
SEARCH_FUNCTION,
};

enum class IndexProbeState {
APPLIED,
FALLBACK,
NOT_ATTEMPTED,
};

enum class IndexFallbackReason {
NONE,
MISSING_INDEX,
BYPASS,
NO_TERMS,
CORRUPTED,
EVALUATE_SKIPPED,
QUERY_DISABLED,
NOT_SUPPORTED,
};

struct IndexProbeEvent {
int32_t column_uid = -1;
std::optional<std::string> variant_path;
int64_t index_id = -1;
int32_t segment_id = -1;
InvertedIndexStorageFormatPB storage_format = InvertedIndexStorageFormatPB::V1;
IndexProbeSource source = IndexProbeSource::COLUMN_PREDICATE;
IndexProbeState state = IndexProbeState::NOT_ATTEMPTED;
IndexFallbackReason reason = IndexFallbackReason::NONE;
int64_t input_rows = 0;
int64_t output_rows = 0;
int64_t filtered_rows = 0;
};

struct TabletSize {
TabletSize(TTabletId in_tablet_id, size_t in_tablet_size)
: tablet_id(in_tablet_id), tablet_size(in_tablet_size) {}
Expand Down Expand Up @@ -388,6 +428,8 @@ struct OlapReaderStatistics {
int64_t inverted_index_analyzer_timer = 0;
int64_t inverted_index_lookup_timer = 0;
InvertedIndexStatistics inverted_index_stats;
bool collect_index_probe_events = false;
std::vector<IndexProbeEvent> index_probe_events;

int64_t ann_index_load_ns = 0;
int64_t ann_topn_search_ns = 0;
Expand Down
127 changes: 121 additions & 6 deletions be/src/storage/segment/segment_iterator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,7 @@
#include "storage/index/index_query_context.h"
#include "storage/index/index_reader_helper.h"
#include "storage/index/indexed_column_reader.h"
#include "storage/index/inverted/inverted_index_iterator.h"
#include "storage/index/inverted/inverted_index_reader.h"
#include "storage/index/ordinal_page_index.h"
#include "storage/index/primary_key_index.h"
Expand Down Expand Up @@ -231,6 +232,34 @@ Status rebind_storage_exprs_to_reader_schema(const StorageReadOptions& opts, con

} // namespace

namespace {

IndexFallbackReason index_fallback_reason(const Status& status, bool need_remaining) {
if (status.is<ErrorCode::INVERTED_INDEX_FILE_NOT_FOUND>()) {
return IndexFallbackReason::MISSING_INDEX;
}
if (status.is<ErrorCode::INVERTED_INDEX_BYPASS>()) {
return IndexFallbackReason::BYPASS;
}
if (status.is<ErrorCode::INVERTED_INDEX_NO_TERMS>() && need_remaining) {
return IndexFallbackReason::NO_TERMS;
}
if (status.is<ErrorCode::INVERTED_INDEX_FILE_CORRUPTED>()) {
return IndexFallbackReason::CORRUPTED;
}
if (status.is<ErrorCode::INVERTED_INDEX_EVALUATE_SKIPPED>()) {
return IndexFallbackReason::EVALUATE_SKIPPED;
}
return IndexFallbackReason::NOT_SUPPORTED;
}

int64_t last_inverted_index_id(const std::unique_ptr<IndexIterator>& iterator) {
auto* inverted_index_iterator = dynamic_cast<InvertedIndexIterator*>(iterator.get());
return inverted_index_iterator == nullptr ? -1 : inverted_index_iterator->last_read_index_id();
}

} // namespace

SegmentIterator::~SegmentIterator() = default;

void SegmentIterator::_init_row_bitmap_by_condition_cache() {
Expand Down Expand Up @@ -906,19 +935,35 @@ Status SegmentIterator::_get_row_ranges_by_column_conditions() {
}

{
if (_opts.runtime_state &&
_opts.runtime_state->query_options().enable_inverted_index_query &&
(has_index_in_iterators() || !_common_expr_ctxs_push_down.empty())) {
const bool should_collect_not_attempted = _opts.stats != nullptr &&
_opts.stats->collect_index_probe_events &&
!_col_predicates.empty();
const bool enable_inverted_index_query =
_opts.runtime_state != nullptr &&
_opts.runtime_state->query_options().enable_inverted_index_query;
if (_opts.runtime_state && (enable_inverted_index_query || should_collect_not_attempted) &&
(has_index_in_iterators() || !_common_expr_ctxs_push_down.empty() ||
should_collect_not_attempted)) {
SCOPED_RAW_TIMER(&_opts.stats->inverted_index_filter_timer);
size_t input_rows = _row_bitmap.cardinality();
// Only apply column-level inverted index if we have iterators
if (has_index_in_iterators()) {
if (enable_inverted_index_query && has_index_in_iterators()) {
RETURN_IF_ERROR(_apply_inverted_index());
} else {
for (const auto& pred : _col_predicates) {
_record_index_probe_event(pred, IndexProbeState::NOT_ATTEMPTED,
enable_inverted_index_query
? IndexFallbackReason::MISSING_INDEX
: IndexFallbackReason::QUERY_DISABLED,
input_rows, input_rows);
}
}
// Always apply expr-level index (e.g., search expressions) if we have common_expr_pushdown
// This allows search expressions with variant subcolumns to be evaluated even when
// the segment doesn't have all subcolumns
RETURN_IF_ERROR(_apply_index_expr());
if (enable_inverted_index_query) {
RETURN_IF_ERROR(_apply_index_expr());
}
for (auto it = _common_expr_ctxs_push_down.begin();
it != _common_expr_ctxs_push_down.end();) {
if ((*it)->all_expr_inverted_index_evaluated()) {
Expand Down Expand Up @@ -1445,19 +1490,82 @@ inline bool SegmentIterator::_inverted_index_not_support_pred_type(const Predica
return type == PredicateType::BF || type == PredicateType::BITMAP_FILTER;
}

void SegmentIterator::_record_index_probe_event(const std::shared_ptr<ColumnPredicate>& pred,
IndexProbeState state, IndexFallbackReason reason,
int64_t input_rows, int64_t output_rows,
int64_t index_id) {
if (_opts.stats == nullptr || !_opts.stats->collect_index_probe_events) {
return;
}
const auto pred_column_id = pred->column_id();
if (pred_column_id >= _opts.tablet_schema->num_columns()) {
return;
}

const auto& column = _opts.tablet_schema->column(pred_column_id);
int32_t column_uid = column.unique_id();
if (column_uid < 0) {
column_uid = column.parent_unique_id();
}

std::optional<std::string> variant_path;
if (column.has_path_info()) {
variant_path = column.path_info_ptr()->copy_pop_front().get_path();
}

_opts.stats->index_probe_events.push_back(IndexProbeEvent {
.column_uid = column_uid,
.variant_path = std::move(variant_path),
.index_id = index_id,
.segment_id = static_cast<int32_t>(_segment->id()),
.storage_format = _opts.tablet_schema->get_inverted_index_storage_format(),
.source = IndexProbeSource::COLUMN_PREDICATE,
.state = state,
.reason = reason,
.input_rows = input_rows,
.output_rows = output_rows,
.filtered_rows = std::max<int64_t>(0, input_rows - output_rows),
});
}

Status SegmentIterator::_apply_inverted_index_on_column_predicate(
std::shared_ptr<ColumnPredicate> pred,
std::vector<std::shared_ptr<ColumnPredicate>>& remaining_predicates, bool* continue_apply) {
const bool collect_index_probe_events =
_opts.stats != nullptr && _opts.stats->collect_index_probe_events;
if (!_check_apply_by_inverted_index(pred)) {
remaining_predicates.emplace_back(pred);
if (collect_index_probe_events) {
const auto rows = static_cast<int64_t>(_row_bitmap.cardinality());
const auto pred_column_id = pred->column_id();
IndexFallbackReason reason = IndexFallbackReason::NOT_SUPPORTED;
if (_opts.runtime_state != nullptr &&
!_opts.runtime_state->query_options().enable_inverted_index_query) {
reason = IndexFallbackReason::QUERY_DISABLED;
} else if (pred_column_id >= _index_iterators.size() ||
_index_iterators[pred_column_id] == nullptr) {
reason = IndexFallbackReason::MISSING_INDEX;
}
_record_index_probe_event(pred, IndexProbeState::NOT_ATTEMPTED, reason, rows, rows);
}
} else {
bool need_remaining_after_evaluate = _column_has_fulltext_index(pred->column_id()) &&
PredicateTypeTraits::is_equal_or_list(pred->type());
const auto input_rows =
collect_index_probe_events ? static_cast<int64_t>(_row_bitmap.cardinality()) : 0;
Status res =
pred->evaluate(_storage_name_and_type[pred->column_id()],
_index_iterators[pred->column_id()].get(), num_rows(), &_row_bitmap);
if (!res.ok()) {
if (_downgrade_without_index(res, need_remaining_after_evaluate)) {
if (collect_index_probe_events) {
const auto index_id =
last_inverted_index_id(_index_iterators[pred->column_id()]);
_record_index_probe_event(
pred, IndexProbeState::FALLBACK,
index_fallback_reason(res, need_remaining_after_evaluate), input_rows,
input_rows, index_id);
}
remaining_predicates.emplace_back(pred);
return Status::OK();
}
Expand All @@ -1472,6 +1580,13 @@ Status SegmentIterator::_apply_inverted_index_on_column_predicate(
*continue_apply = false;
}

if (collect_index_probe_events) {
const auto index_id = last_inverted_index_id(_index_iterators[pred->column_id()]);
_record_index_probe_event(pred, IndexProbeState::APPLIED, IndexFallbackReason::NONE,
input_rows, static_cast<int64_t>(_row_bitmap.cardinality()),
index_id);
}

if (need_remaining_after_evaluate) {
remaining_predicates.emplace_back(pred);
return Status::OK();
Expand Down Expand Up @@ -3504,7 +3619,7 @@ bool SegmentIterator::_no_need_read_key_data(ColumnId cid, MutableColumnPtr& col
return false;
}

if (!_check_all_conditions_passed_inverted_index_for_column(cid)) {
if (!_check_all_conditions_passed_inverted_index_for_column(cid, true)) {
return false;
}

Expand Down
3 changes: 3 additions & 0 deletions be/src/storage/segment/segment_iterator.h
Original file line number Diff line number Diff line change
Expand Up @@ -194,6 +194,9 @@ class SegmentIterator : public RowwiseIterator {
std::shared_ptr<ColumnPredicate> pred,
std::vector<std::shared_ptr<ColumnPredicate>>& remaining_predicates,
bool* continue_apply);
void _record_index_probe_event(const std::shared_ptr<ColumnPredicate>& pred,
IndexProbeState state, IndexFallbackReason reason,
int64_t input_rows, int64_t output_rows, int64_t index_id = -1);
[[nodiscard]] Status _apply_ann_topn_predicate();
[[nodiscard]] Status _apply_index_expr();

Expand Down
Loading
Loading