Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Bug](exec) fix setSinkOp null flag bug #36754

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
120 changes: 73 additions & 47 deletions be/src/pipeline/exec/set_sink_operator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,8 @@
namespace doris::pipeline {

template <bool is_intersect>
Status SetSinkOperatorX<is_intersect>::sink(RuntimeState* state, vectorized::Block* in_block,
Status SetSinkOperatorX<is_intersect>::sink(RuntimeState* state,
vectorized::Block* in_block,
bool eos) {
constexpr static auto BUILD_BLOCK_MAX_SIZE = 4 * 1024UL * 1024UL * 1024UL;
RETURN_IF_CANCELLED(state);
Expand All @@ -36,18 +37,22 @@ Status SetSinkOperatorX<is_intersect>::sink(RuntimeState* state, vectorized::Blo
COUNTER_UPDATE(local_state.rows_input_counter(), (int64_t)in_block->rows());

auto& build_block = local_state._shared_state->build_block;
auto& valid_element_in_hash_tbl = local_state._shared_state->valid_element_in_hash_tbl;
auto& valid_element_in_hash_tbl =
local_state._shared_state->valid_element_in_hash_tbl;

if (in_block->rows() != 0) {
RETURN_IF_ERROR(local_state._mutable_block.merge(*in_block));

if (local_state._mutable_block.rows() > std::numeric_limits<uint32_t>::max()) {
return Status::NotSupported("set operator do not support build table rows over:" +
std::to_string(std::numeric_limits<uint32_t>::max()));
if (local_state._mutable_block.rows() >
std::numeric_limits<uint32_t>::max()) {
return Status::NotSupported(
"set operator do not support build table rows over:" +
std::to_string(std::numeric_limits<uint32_t>::max()));
}
}

if (eos || local_state._mutable_block.allocated_bytes() >= BUILD_BLOCK_MAX_SIZE) {
if (eos ||
local_state._mutable_block.allocated_bytes() >= BUILD_BLOCK_MAX_SIZE) {
build_block = local_state._mutable_block.to_block();
RETURN_IF_ERROR(_process_build_block(local_state, build_block, state));
local_state._mutable_block.clear();
Expand All @@ -57,16 +62,18 @@ Status SetSinkOperatorX<is_intersect>::sink(RuntimeState* state, vectorized::Blo
valid_element_in_hash_tbl = 0;
} else {
std::visit(
[&](auto&& arg) {
using HashTableCtxType = std::decay_t<decltype(arg)>;
if constexpr (!std::is_same_v<HashTableCtxType, std::monostate>) {
valid_element_in_hash_tbl = arg.hash_table->size();
}
},
*local_state._shared_state->hash_table_variants);
[&](auto&& arg) {
using HashTableCtxType = std::decay_t<decltype(arg)>;
if constexpr (!std::is_same_v<HashTableCtxType,
std::monostate>) {
valid_element_in_hash_tbl = arg.hash_table->size();
}
},
*local_state._shared_state->hash_table_variants);
}
local_state._shared_state->probe_finished_children_dependency[_cur_child_id + 1]
->set_ready();
local_state._shared_state
->probe_finished_children_dependency[_cur_child_id + 1]
->set_ready();
if (_child_quantity == 1) {
local_state._dependency->set_ready_to_read();
}
Expand All @@ -77,8 +84,9 @@ Status SetSinkOperatorX<is_intersect>::sink(RuntimeState* state, vectorized::Blo

template <bool is_intersect>
Status SetSinkOperatorX<is_intersect>::_process_build_block(
SetSinkLocalState<is_intersect>& local_state, vectorized::Block& block,
RuntimeState* state) {
SetSinkLocalState<is_intersect>& local_state,
vectorized::Block& block,
RuntimeState* state) {
size_t rows = block.rows();
if (rows == 0) {
return Status::OK();
Expand All @@ -89,32 +97,37 @@ Status SetSinkOperatorX<is_intersect>::_process_build_block(
RETURN_IF_ERROR(_extract_build_column(local_state, block, raw_ptrs, rows));

std::visit(
[&](auto&& arg) {
using HashTableCtxType = std::decay_t<decltype(arg)>;
if constexpr (!std::is_same_v<HashTableCtxType, std::monostate>) {
vectorized::HashTableBuild<HashTableCtxType, is_intersect>
hash_table_build_process(&local_state, rows, raw_ptrs, state);
static_cast<void>(hash_table_build_process(arg, local_state._arena));
} else {
LOG(FATAL) << "FATAL: uninited hash table";
__builtin_unreachable();
}
},
*local_state._shared_state->hash_table_variants);
[&](auto&& arg) {
using HashTableCtxType = std::decay_t<decltype(arg)>;
if constexpr (!std::is_same_v<HashTableCtxType, std::monostate>) {
vectorized::HashTableBuild<HashTableCtxType, is_intersect>
hash_table_build_process(&local_state, rows, raw_ptrs,
state);
static_cast<void>(
hash_table_build_process(arg, local_state._arena));
} else {
LOG(FATAL) << "FATAL: uninited hash table";
__builtin_unreachable();
}
},
*local_state._shared_state->hash_table_variants);

return Status::OK();
}

template <bool is_intersect>
Status SetSinkOperatorX<is_intersect>::_extract_build_column(
SetSinkLocalState<is_intersect>& local_state, vectorized::Block& block,
vectorized::ColumnRawPtrs& raw_ptrs, size_t& rows) {
SetSinkLocalState<is_intersect>& local_state,
vectorized::Block& block,
vectorized::ColumnRawPtrs& raw_ptrs,
size_t& rows) {
std::vector<int> result_locs(_child_exprs.size(), -1);
bool is_all_const = true;

for (size_t i = 0; i < _child_exprs.size(); ++i) {
RETURN_IF_ERROR(_child_exprs[i]->execute(&block, &result_locs[i]));
is_all_const &= is_column_const(*block.get_by_position(result_locs[i]).column);
is_all_const &=
is_column_const(*block.get_by_position(result_locs[i]).column);
}
rows = is_all_const ? 1 : rows;

Expand All @@ -123,16 +136,17 @@ Status SetSinkOperatorX<is_intersect>::_extract_build_column(

if (is_all_const) {
block.get_by_position(result_col_id).column =
assert_cast<const vectorized::ColumnConst&>(
*block.get_by_position(result_col_id).column)
.get_data_column_ptr();
assert_cast<const vectorized::ColumnConst&>(
*block.get_by_position(result_col_id).column)
.get_data_column_ptr();
} else {
block.get_by_position(result_col_id).column =
block.get_by_position(result_col_id).column->convert_to_full_column_if_const();
block.get_by_position(result_col_id)
.column->convert_to_full_column_if_const();
}
if (local_state._shared_state->build_not_ignore_null[i]) {
block.get_by_position(result_col_id).column =
make_nullable(block.get_by_position(result_col_id).column);
make_nullable(block.get_by_position(result_col_id).column);
}

raw_ptrs[i] = block.get_by_position(result_col_id).column.get();
Expand All @@ -143,16 +157,19 @@ Status SetSinkOperatorX<is_intersect>::_extract_build_column(
}

template <bool is_intersect>
Status SetSinkLocalState<is_intersect>::init(RuntimeState* state, LocalSinkStateInfo& info) {
Status SetSinkLocalState<is_intersect>::init(RuntimeState* state,
LocalSinkStateInfo& info) {
RETURN_IF_ERROR(PipelineXSinkLocalState<SetSharedState>::init(state, info));
SCOPED_TIMER(exec_time_counter());
SCOPED_TIMER(_init_timer);
_build_timer = ADD_TIMER(_profile, "BuildTime");
auto& parent = _parent->cast<Parent>();
_shared_state->probe_finished_children_dependency[parent._cur_child_id] = _dependency;
_shared_state->probe_finished_children_dependency[parent._cur_child_id] =
_dependency;
DCHECK(parent._cur_child_id == 0);
auto& child_exprs_lists = _shared_state->child_exprs_lists;
DCHECK(child_exprs_lists.empty() || child_exprs_lists.size() == parent._child_quantity);
DCHECK(child_exprs_lists.empty() ||
child_exprs_lists.size() == parent._child_quantity);
if (child_exprs_lists.empty()) {
child_exprs_lists.resize(parent._child_quantity);
}
Expand All @@ -171,24 +188,31 @@ Status SetSinkLocalState<is_intersect>::open(RuntimeState* state) {
SCOPED_TIMER(_open_timer);
RETURN_IF_ERROR(PipelineXSinkLocalState<SetSharedState>::open(state));

auto output_data_types = vectorized::VectorizedUtils::get_data_types(
_parent->cast<SetSinkOperatorX<is_intersect>>()._row_descriptor);
auto& parent = _parent->cast<Parent>();
DCHECK(parent._cur_child_id == 0);
auto& child_exprs_lists = _shared_state->child_exprs_lists;
_shared_state->build_not_ignore_null.resize(child_exprs_lists[parent._cur_child_id].size());
_shared_state->hash_table_variants = std::make_unique<SetHashTableVariants>();
_shared_state->build_not_ignore_null.resize(
child_exprs_lists[parent._cur_child_id].size());
_shared_state->hash_table_variants =
std::make_unique<SetHashTableVariants>();

for (const auto& ctl : child_exprs_lists) {
for (int i = 0; i < ctl.size(); ++i) {
_shared_state->build_not_ignore_null[i] =
_shared_state->build_not_ignore_null[i] || ctl[i]->root()->is_nullable();
(output_data_types[i]->is_nullable() ||
_shared_state->build_not_ignore_null[i] ||
ctl[i]->root()->is_nullable());
}
}
_shared_state->hash_table_init();
return Status::OK();
}

template <bool is_intersect>
Status SetSinkOperatorX<is_intersect>::init(const TPlanNode& tnode, RuntimeState* state) {
Status SetSinkOperatorX<is_intersect>::init(const TPlanNode& tnode,
RuntimeState* state) {
Base::_name = "SET_SINK_OPERATOR";
const std::vector<std::vector<TExpr>>* result_texpr_lists;

Expand All @@ -198,7 +222,8 @@ Status SetSinkOperatorX<is_intersect>::init(const TPlanNode& tnode, RuntimeState
} else if (tnode.node_type == TPlanNodeType::type::EXCEPT_NODE) {
result_texpr_lists = &(tnode.except_node.result_expr_lists);
} else {
return Status::NotSupported("Not Implemented, Check The Operation Node.");
return Status::NotSupported(
"Not Implemented, Check The Operation Node.");
}

const auto& texpr = (*result_texpr_lists)[_cur_child_id];
Expand All @@ -210,7 +235,8 @@ Status SetSinkOperatorX<is_intersect>::init(const TPlanNode& tnode, RuntimeState
template <bool is_intersect>
Status SetSinkOperatorX<is_intersect>::prepare(RuntimeState* state) {
RETURN_IF_ERROR(Base::prepare(state));
return vectorized::VExpr::prepare(_child_exprs, state, _child_x->row_desc());
return vectorized::VExpr::prepare(_child_exprs, state,
_child_x->row_desc());
}

template <bool is_intersect>
Expand All @@ -224,4 +250,4 @@ template class SetSinkLocalState<false>;
template class SetSinkOperatorX<true>;
template class SetSinkOperatorX<false>;

} // namespace doris::pipeline
} // namespace doris::pipeline
4 changes: 3 additions & 1 deletion be/src/pipeline/exec/set_sink_operator.h
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,8 @@ class SetSinkOperatorX final : public DataSinkOperatorX<SetSinkLocalState<is_int
_is_colocate(is_intersect ? tnode.intersect_node.is_colocate
: tnode.except_node.is_colocate),
_partition_exprs(is_intersect ? tnode.intersect_node.result_expr_lists[child_id]
: tnode.except_node.result_expr_lists[child_id]) {}
: tnode.except_node.result_expr_lists[child_id]),
_row_descriptor(descs, tnode.row_tuples, tnode.nullable_tuples) {}
~SetSinkOperatorX() override = default;
Status init(const TDataSink& tsink) override {
return Status::InternalError("{} should not init with TDataSink",
Expand Down Expand Up @@ -113,6 +114,7 @@ class SetSinkOperatorX final : public DataSinkOperatorX<SetSinkLocalState<is_int
const bool _is_colocate;
const std::vector<TExpr> _partition_exprs;
using OperatorBase::_child_x;
const RowDescriptor _row_descriptor;
};

} // namespace pipeline
Expand Down
4 changes: 4 additions & 0 deletions regression-test/data/correctness_p0/test_set_operation.out
Original file line number Diff line number Diff line change
Expand Up @@ -11,3 +11,7 @@ aaaa
bbbb

-- !select1 --

-- !select1 --
3.0

Original file line number Diff line number Diff line change
Expand Up @@ -127,4 +127,6 @@ suite("test_set_operation") {

qt_select1 """ (select 0) intersect (select null); """

qt_select1 """ select sqrt('9') except select sqrt('4'); """

}
Loading