Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 10 additions & 0 deletions be/src/core/string_buffer.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,12 @@ class BufferWritable final {
_now_offset = 0;
}

char* data() { return reinterpret_cast<char*>(_data.data() + _now_offset + _offsets.back()); }

void add_offset(size_t len) { _now_offset += len; }

void resize(size_t size) { _data.resize(size + _now_offset + _offsets.back()); }

template <typename T>
void write_number(T data) {
fmt::memory_buffer buffer;
Expand Down Expand Up @@ -236,6 +242,10 @@ class BufferReadable {
_data += len;
}

const char* data() { return _data; }

void add_offset(size_t len) { _data += len; }

void read_var_uint(UInt64& x) {
x = 0;
// get length from first byte firstly
Expand Down
19 changes: 7 additions & 12 deletions be/src/exec/operator/hashjoin_build_sink.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -597,32 +597,27 @@ Status HashJoinBuildSinkLocalState::process_build_block(RuntimeState* state, Blo
RETURN_IF_ERROR(_hash_table_init(state, raw_ptrs));

Status st = std::visit(
Overload {[&](std::monostate& arg, auto join_op,
auto short_circuit_for_null_in_build_side,
auto with_other_conjuncts) -> Status {
Overload {[&](std::monostate& arg, auto join_op) -> Status {
throw Exception(Status::FatalError("FATAL: uninited hash table"));
},
[&](auto&& arg, auto&& join_op, auto short_circuit_for_null_in_build_side,
auto with_other_conjuncts) -> Status {
[&](auto&& arg, auto&& join_op) -> Status {
using HashTableCtxType = std::decay_t<decltype(arg)>;
using JoinOpType = std::decay_t<decltype(join_op)>;
ProcessHashTableBuild<HashTableCtxType> hash_table_build_process(
rows, raw_ptrs, this, state->batch_size(), state);
auto st = hash_table_build_process.template run<
JoinOpType::value, short_circuit_for_null_in_build_side,
with_other_conjuncts>(
auto st = hash_table_build_process.template run<JoinOpType::value>(
arg, null_map_val ? &null_map_val->get_data() : nullptr,
&_shared_state->_has_null_in_build_side);
&_shared_state->_has_null_in_build_side,
p._short_circuit_for_null_in_build_side,
p._have_other_join_conjunct);
COUNTER_SET(_memory_used_counter,
_build_blocks_memory_usage->value() +
(int64_t)(arg.hash_table->get_byte_size() +
arg.serialized_keys_size(true)));
return st;
}},
_shared_state->hash_table_variant_vector.front()->method_variant,
_shared_state->join_op_variants,
make_bool_variant(p._short_circuit_for_null_in_build_side),
make_bool_variant((p._have_other_join_conjunct)));
_shared_state->join_op_variants);
return st;
}

Expand Down
5 changes: 3 additions & 2 deletions be/src/exec/operator/hashjoin_build_sink.h
Original file line number Diff line number Diff line change
Expand Up @@ -212,8 +212,9 @@ struct ProcessHashTableBuild {
_batch_size(batch_size),
_state(state) {}

template <int JoinOpType, bool short_circuit_for_null, bool with_other_conjuncts>
Status run(HashTableContext& hash_table_ctx, ConstNullMapPtr null_map, bool* has_null_key) {
template <int JoinOpType>
Status run(HashTableContext& hash_table_ctx, ConstNullMapPtr null_map, bool* has_null_key,
bool short_circuit_for_null, bool with_other_conjuncts) {
if (null_map) {
// first row is mocked and is null
if (simd::contain_one(null_map->data() + 1, _rows - 1)) {
Expand Down
45 changes: 17 additions & 28 deletions be/src/exec/operator/scan_operator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -141,6 +141,7 @@ Status ScanLocalState<Derived>::init(RuntimeState* state, LocalStateInfo& info)
SCOPED_TIMER(exec_time_counter());
SCOPED_TIMER(_init_timer);
auto& p = _parent->cast<typename Derived::Parent>();
_max_pushdown_conditions_per_column = p._max_pushdown_conditions_per_column;
RETURN_IF_ERROR(_helper.init(state, p.is_serial_operator(), p.node_id(), p.operator_id(),
_filter_dependencies, p.get_name() + "_FILTER_DEPENDENCY"));
RETURN_IF_ERROR(_init_profile());
Expand Down Expand Up @@ -480,8 +481,7 @@ Status ScanLocalState<Derived>::_normalize_predicate(VExprContext* context, cons
return Status::OK();
}

template <typename Derived>
Status ScanLocalState<Derived>::_normalize_bloom_filter(
Status ScanLocalStateBase::_normalize_bloom_filter(
VExprContext* expr_ctx, const VExprSPtr& root, SlotDescriptor* slot,
std::vector<std::shared_ptr<ColumnPredicate>>& predicates, PushDownType* pdt) {
std::shared_ptr<ColumnPredicate> pred = nullptr;
Expand Down Expand Up @@ -509,8 +509,7 @@ Status ScanLocalState<Derived>::_normalize_bloom_filter(
return Status::OK();
}

template <typename Derived>
Status ScanLocalState<Derived>::_normalize_topn_filter(
Status ScanLocalStateBase::_normalize_topn_filter(
VExprContext* expr_ctx, const VExprSPtr& root, SlotDescriptor* slot,
std::vector<std::shared_ptr<ColumnPredicate>>& predicates, PushDownType* pdt) {
std::shared_ptr<ColumnPredicate> pred = nullptr;
Expand All @@ -526,18 +525,16 @@ Status ScanLocalState<Derived>::_normalize_topn_filter(
DCHECK(root->is_topn_filter());
*pdt = _should_push_down_topn_filter();
if (*pdt != PushDownType::UNACCEPTABLE) {
auto& p = _parent->cast<typename Derived::Parent>();
auto& tmp = _state->get_query_ctx()->get_runtime_predicate(
assert_cast<VTopNPred*>(root.get())->source_node_id());
if (_push_down_topn(tmp)) {
pred = tmp.get_predicate(p.node_id());
pred = tmp.get_predicate(_parent->node_id());
}
}
return Status::OK();
}

template <typename Derived>
Status ScanLocalState<Derived>::_normalize_bitmap_filter(
Status ScanLocalStateBase::_normalize_bitmap_filter(
VExprContext* expr_ctx, const VExprSPtr& root, SlotDescriptor* slot,
std::vector<std::shared_ptr<ColumnPredicate>>& predicates, PushDownType* pdt) {
std::shared_ptr<ColumnPredicate> pred = nullptr;
Expand Down Expand Up @@ -565,10 +562,8 @@ Status ScanLocalState<Derived>::_normalize_bitmap_filter(
return Status::OK();
}

template <typename Derived>
Status ScanLocalState<Derived>::_normalize_function_filters(VExprContext* expr_ctx,
SlotDescriptor* slot,
PushDownType* pdt) {
Status ScanLocalStateBase::_normalize_function_filters(VExprContext* expr_ctx, SlotDescriptor* slot,
PushDownType* pdt) {
auto expr = expr_ctx->root()->is_rf_wrapper() ? expr_ctx->root()->get_impl() : expr_ctx->root();
bool opposite = false;
VExpr* fn_expr = expr.get();
Expand Down Expand Up @@ -648,8 +643,7 @@ std::string ScanLocalState<Derived>::debug_string(int indentation_level) const {
return fmt::to_string(debug_string_buffer);
}

template <typename Derived>
Status ScanLocalState<Derived>::_eval_const_conjuncts(VExprContext* expr_ctx, PushDownType* pdt) {
Status ScanLocalStateBase::_eval_const_conjuncts(VExprContext* expr_ctx, PushDownType* pdt) {
auto vexpr =
expr_ctx->root()->is_rf_wrapper() ? expr_ctx->root()->get_impl() : expr_ctx->root();
// Used to handle constant expressions, such as '1 = 1' _eval_const_conjuncts does not handle cases like 'colA = 1'
Expand Down Expand Up @@ -697,9 +691,8 @@ Status ScanLocalState<Derived>::_eval_const_conjuncts(VExprContext* expr_ctx, Pu
return Status::OK();
}

template <typename Derived>
template <PrimitiveType T>
Status ScanLocalState<Derived>::_normalize_in_predicate(
Status ScanLocalStateBase::_normalize_in_predicate(
VExprContext* expr_ctx, const VExprSPtr& root, SlotDescriptor* slot,
std::vector<std::shared_ptr<ColumnPredicate>>& predicates, ColumnValueRange<T>& range,
PushDownType* pdt) {
Expand Down Expand Up @@ -731,8 +724,7 @@ Status ScanLocalState<Derived>::_normalize_in_predicate(
auto is_in = false;
if (hybrid_set != nullptr) {
// runtime filter produce VDirectInPredicate
if (hybrid_set->size() <=
_parent->cast<typename Derived::Parent>()._max_pushdown_conditions_per_column) {
if (hybrid_set->size() <= static_cast<size_t>(_max_pushdown_conditions_per_column)) {
iter = hybrid_set->begin();
}
is_in = true;
Expand Down Expand Up @@ -810,9 +802,8 @@ Status ScanLocalState<Derived>::_normalize_in_predicate(
return Status::OK();
}

template <typename Derived>
template <PrimitiveType T>
Status ScanLocalState<Derived>::_normalize_binary_predicate(
Status ScanLocalStateBase::_normalize_binary_predicate(
VExprContext* expr_ctx, const VExprSPtr& root, SlotDescriptor* slot,
std::vector<std::shared_ptr<ColumnPredicate>>& predicates, ColumnValueRange<T>& range,
PushDownType* pdt) {
Expand Down Expand Up @@ -921,13 +912,12 @@ Status ScanLocalState<Derived>::_normalize_binary_predicate(
return Status::OK();
}

template <typename Derived>
template <PrimitiveType PrimitiveType, typename ChangeFixedValueRangeFunc>
Status ScanLocalState<Derived>::_change_value_range(bool is_equal_op,
ColumnValueRange<PrimitiveType>& temp_range,
const Field& value,
const ChangeFixedValueRangeFunc& func,
const std::string& fn_name) {
Status ScanLocalStateBase::_change_value_range(bool is_equal_op,
ColumnValueRange<PrimitiveType>& temp_range,
const Field& value,
const ChangeFixedValueRangeFunc& func,
const std::string& fn_name) {
if constexpr (PrimitiveType == TYPE_DATE) {
auto tmp_value = value.template get<TYPE_DATE>();
if (is_equal_op) {
Expand Down Expand Up @@ -959,9 +949,8 @@ Status ScanLocalState<Derived>::_change_value_range(bool is_equal_op,
return Status::OK();
}

template <typename Derived>
template <PrimitiveType T>
Status ScanLocalState<Derived>::_normalize_is_null_predicate(
Status ScanLocalStateBase::_normalize_is_null_predicate(
VExprContext* expr_ctx, const VExprSPtr& root, SlotDescriptor* slot,
std::vector<std::shared_ptr<ColumnPredicate>>& predicates, ColumnValueRange<T>& range,
PushDownType* pdt) {
Expand Down
Loading
Loading