Skip to content

Commit

Permalink
[Bug](exec) fix setSinkOp null flag bug
Browse files Browse the repository at this point in the history
  • Loading branch information
yoko123yoko committed Jul 7, 2024
1 parent d7dc33a commit 8585fe5
Show file tree
Hide file tree
Showing 4 changed files with 82 additions and 48 deletions.
120 changes: 73 additions & 47 deletions be/src/pipeline/exec/set_sink_operator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,8 @@
namespace doris::pipeline {

template <bool is_intersect>
Status SetSinkOperatorX<is_intersect>::sink(RuntimeState* state, vectorized::Block* in_block,
Status SetSinkOperatorX<is_intersect>::sink(RuntimeState* state,
vectorized::Block* in_block,
bool eos) {
constexpr static auto BUILD_BLOCK_MAX_SIZE = 4 * 1024UL * 1024UL * 1024UL;
RETURN_IF_CANCELLED(state);
Expand All @@ -36,18 +37,22 @@ Status SetSinkOperatorX<is_intersect>::sink(RuntimeState* state, vectorized::Blo
COUNTER_UPDATE(local_state.rows_input_counter(), (int64_t)in_block->rows());

auto& build_block = local_state._shared_state->build_block;
auto& valid_element_in_hash_tbl = local_state._shared_state->valid_element_in_hash_tbl;
auto& valid_element_in_hash_tbl =
local_state._shared_state->valid_element_in_hash_tbl;

if (in_block->rows() != 0) {
RETURN_IF_ERROR(local_state._mutable_block.merge(*in_block));

if (local_state._mutable_block.rows() > std::numeric_limits<uint32_t>::max()) {
return Status::NotSupported("set operator do not support build table rows over:" +
std::to_string(std::numeric_limits<uint32_t>::max()));
if (local_state._mutable_block.rows() >
std::numeric_limits<uint32_t>::max()) {
return Status::NotSupported(
"set operator do not support build table rows over:" +
std::to_string(std::numeric_limits<uint32_t>::max()));
}
}

if (eos || local_state._mutable_block.allocated_bytes() >= BUILD_BLOCK_MAX_SIZE) {
if (eos ||
local_state._mutable_block.allocated_bytes() >= BUILD_BLOCK_MAX_SIZE) {
build_block = local_state._mutable_block.to_block();
RETURN_IF_ERROR(_process_build_block(local_state, build_block, state));
local_state._mutable_block.clear();
Expand All @@ -57,16 +62,18 @@ Status SetSinkOperatorX<is_intersect>::sink(RuntimeState* state, vectorized::Blo
valid_element_in_hash_tbl = 0;
} else {
std::visit(
[&](auto&& arg) {
using HashTableCtxType = std::decay_t<decltype(arg)>;
if constexpr (!std::is_same_v<HashTableCtxType, std::monostate>) {
valid_element_in_hash_tbl = arg.hash_table->size();
}
},
*local_state._shared_state->hash_table_variants);
[&](auto&& arg) {
using HashTableCtxType = std::decay_t<decltype(arg)>;
if constexpr (!std::is_same_v<HashTableCtxType,
std::monostate>) {
valid_element_in_hash_tbl = arg.hash_table->size();
}
},
*local_state._shared_state->hash_table_variants);
}
local_state._shared_state->probe_finished_children_dependency[_cur_child_id + 1]
->set_ready();
local_state._shared_state
->probe_finished_children_dependency[_cur_child_id + 1]
->set_ready();
if (_child_quantity == 1) {
local_state._dependency->set_ready_to_read();
}
Expand All @@ -77,8 +84,9 @@ Status SetSinkOperatorX<is_intersect>::sink(RuntimeState* state, vectorized::Blo

template <bool is_intersect>
Status SetSinkOperatorX<is_intersect>::_process_build_block(
SetSinkLocalState<is_intersect>& local_state, vectorized::Block& block,
RuntimeState* state) {
SetSinkLocalState<is_intersect>& local_state,
vectorized::Block& block,
RuntimeState* state) {
size_t rows = block.rows();
if (rows == 0) {
return Status::OK();
Expand All @@ -89,32 +97,37 @@ Status SetSinkOperatorX<is_intersect>::_process_build_block(
RETURN_IF_ERROR(_extract_build_column(local_state, block, raw_ptrs, rows));

std::visit(
[&](auto&& arg) {
using HashTableCtxType = std::decay_t<decltype(arg)>;
if constexpr (!std::is_same_v<HashTableCtxType, std::monostate>) {
vectorized::HashTableBuild<HashTableCtxType, is_intersect>
hash_table_build_process(&local_state, rows, raw_ptrs, state);
static_cast<void>(hash_table_build_process(arg, local_state._arena));
} else {
LOG(FATAL) << "FATAL: uninited hash table";
__builtin_unreachable();
}
},
*local_state._shared_state->hash_table_variants);
[&](auto&& arg) {
using HashTableCtxType = std::decay_t<decltype(arg)>;
if constexpr (!std::is_same_v<HashTableCtxType, std::monostate>) {
vectorized::HashTableBuild<HashTableCtxType, is_intersect>
hash_table_build_process(&local_state, rows, raw_ptrs,
state);
static_cast<void>(
hash_table_build_process(arg, local_state._arena));
} else {
LOG(FATAL) << "FATAL: uninited hash table";
__builtin_unreachable();
}
},
*local_state._shared_state->hash_table_variants);

return Status::OK();
}

template <bool is_intersect>
Status SetSinkOperatorX<is_intersect>::_extract_build_column(
SetSinkLocalState<is_intersect>& local_state, vectorized::Block& block,
vectorized::ColumnRawPtrs& raw_ptrs, size_t& rows) {
SetSinkLocalState<is_intersect>& local_state,
vectorized::Block& block,
vectorized::ColumnRawPtrs& raw_ptrs,
size_t& rows) {
std::vector<int> result_locs(_child_exprs.size(), -1);
bool is_all_const = true;

for (size_t i = 0; i < _child_exprs.size(); ++i) {
RETURN_IF_ERROR(_child_exprs[i]->execute(&block, &result_locs[i]));
is_all_const &= is_column_const(*block.get_by_position(result_locs[i]).column);
is_all_const &=
is_column_const(*block.get_by_position(result_locs[i]).column);
}
rows = is_all_const ? 1 : rows;

Expand All @@ -123,16 +136,17 @@ Status SetSinkOperatorX<is_intersect>::_extract_build_column(

if (is_all_const) {
block.get_by_position(result_col_id).column =
assert_cast<const vectorized::ColumnConst&>(
*block.get_by_position(result_col_id).column)
.get_data_column_ptr();
assert_cast<const vectorized::ColumnConst&>(
*block.get_by_position(result_col_id).column)
.get_data_column_ptr();
} else {
block.get_by_position(result_col_id).column =
block.get_by_position(result_col_id).column->convert_to_full_column_if_const();
block.get_by_position(result_col_id)
.column->convert_to_full_column_if_const();
}
if (local_state._shared_state->build_not_ignore_null[i]) {
block.get_by_position(result_col_id).column =
make_nullable(block.get_by_position(result_col_id).column);
make_nullable(block.get_by_position(result_col_id).column);
}

raw_ptrs[i] = block.get_by_position(result_col_id).column.get();
Expand All @@ -143,16 +157,19 @@ Status SetSinkOperatorX<is_intersect>::_extract_build_column(
}

template <bool is_intersect>
Status SetSinkLocalState<is_intersect>::init(RuntimeState* state, LocalSinkStateInfo& info) {
Status SetSinkLocalState<is_intersect>::init(RuntimeState* state,
LocalSinkStateInfo& info) {
RETURN_IF_ERROR(PipelineXSinkLocalState<SetSharedState>::init(state, info));
SCOPED_TIMER(exec_time_counter());
SCOPED_TIMER(_init_timer);
_build_timer = ADD_TIMER(_profile, "BuildTime");
auto& parent = _parent->cast<Parent>();
_shared_state->probe_finished_children_dependency[parent._cur_child_id] = _dependency;
_shared_state->probe_finished_children_dependency[parent._cur_child_id] =
_dependency;
DCHECK(parent._cur_child_id == 0);
auto& child_exprs_lists = _shared_state->child_exprs_lists;
DCHECK(child_exprs_lists.empty() || child_exprs_lists.size() == parent._child_quantity);
DCHECK(child_exprs_lists.empty() ||
child_exprs_lists.size() == parent._child_quantity);
if (child_exprs_lists.empty()) {
child_exprs_lists.resize(parent._child_quantity);
}
Expand All @@ -171,24 +188,31 @@ Status SetSinkLocalState<is_intersect>::open(RuntimeState* state) {
SCOPED_TIMER(_open_timer);
RETURN_IF_ERROR(PipelineXSinkLocalState<SetSharedState>::open(state));

auto output_data_types = vectorized::VectorizedUtils::get_data_types(
_parent->cast<SetSinkOperatorX<is_intersect>>()._row_descriptor);
auto& parent = _parent->cast<Parent>();
DCHECK(parent._cur_child_id == 0);
auto& child_exprs_lists = _shared_state->child_exprs_lists;
_shared_state->build_not_ignore_null.resize(child_exprs_lists[parent._cur_child_id].size());
_shared_state->hash_table_variants = std::make_unique<SetHashTableVariants>();
_shared_state->build_not_ignore_null.resize(
child_exprs_lists[parent._cur_child_id].size());
_shared_state->hash_table_variants =
std::make_unique<SetHashTableVariants>();

for (const auto& ctl : child_exprs_lists) {
for (int i = 0; i < ctl.size(); ++i) {
_shared_state->build_not_ignore_null[i] =
_shared_state->build_not_ignore_null[i] || ctl[i]->root()->is_nullable();
(output_data_types[i]->is_nullable() ||
_shared_state->build_not_ignore_null[i] ||
ctl[i]->root()->is_nullable());
}
}
_shared_state->hash_table_init();
return Status::OK();
}

template <bool is_intersect>
Status SetSinkOperatorX<is_intersect>::init(const TPlanNode& tnode, RuntimeState* state) {
Status SetSinkOperatorX<is_intersect>::init(const TPlanNode& tnode,
RuntimeState* state) {
Base::_name = "SET_SINK_OPERATOR";
const std::vector<std::vector<TExpr>>* result_texpr_lists;

Expand All @@ -198,7 +222,8 @@ Status SetSinkOperatorX<is_intersect>::init(const TPlanNode& tnode, RuntimeState
} else if (tnode.node_type == TPlanNodeType::type::EXCEPT_NODE) {
result_texpr_lists = &(tnode.except_node.result_expr_lists);
} else {
return Status::NotSupported("Not Implemented, Check The Operation Node.");
return Status::NotSupported(
"Not Implemented, Check The Operation Node.");
}

const auto& texpr = (*result_texpr_lists)[_cur_child_id];
Expand All @@ -210,7 +235,8 @@ Status SetSinkOperatorX<is_intersect>::init(const TPlanNode& tnode, RuntimeState
template <bool is_intersect>
Status SetSinkOperatorX<is_intersect>::prepare(RuntimeState* state) {
RETURN_IF_ERROR(Base::prepare(state));
return vectorized::VExpr::prepare(_child_exprs, state, _child_x->row_desc());
return vectorized::VExpr::prepare(_child_exprs, state,
_child_x->row_desc());
}

template <bool is_intersect>
Expand All @@ -224,4 +250,4 @@ template class SetSinkLocalState<false>;
template class SetSinkOperatorX<true>;
template class SetSinkOperatorX<false>;

} // namespace doris::pipeline
} // namespace doris::pipeline
4 changes: 3 additions & 1 deletion be/src/pipeline/exec/set_sink_operator.h
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,8 @@ class SetSinkOperatorX final : public DataSinkOperatorX<SetSinkLocalState<is_int
_is_colocate(is_intersect ? tnode.intersect_node.is_colocate
: tnode.except_node.is_colocate),
_partition_exprs(is_intersect ? tnode.intersect_node.result_expr_lists[child_id]
: tnode.except_node.result_expr_lists[child_id]) {}
: tnode.except_node.result_expr_lists[child_id]),
_row_descriptor(descs, tnode.row_tuples, tnode.nullable_tuples) {}
~SetSinkOperatorX() override = default;
Status init(const TDataSink& tsink) override {
return Status::InternalError("{} should not init with TDataSink",
Expand Down Expand Up @@ -113,6 +114,7 @@ class SetSinkOperatorX final : public DataSinkOperatorX<SetSinkLocalState<is_int
const bool _is_colocate;
const std::vector<TExpr> _partition_exprs;
using OperatorBase::_child_x;
const RowDescriptor _row_descriptor;
};

} // namespace pipeline
Expand Down
4 changes: 4 additions & 0 deletions regression-test/data/correctness_p0/test_set_operation.out
Original file line number Diff line number Diff line change
Expand Up @@ -11,3 +11,7 @@ aaaa
bbbb

-- !select1 --

-- !select1 --
3.0

Original file line number Diff line number Diff line change
Expand Up @@ -127,4 +127,6 @@ suite("test_set_operation") {

qt_select1 """ (select 0) intersect (select null); """

qt_select1 """ select sqrt('9') except select sqrt('4'); """

}

0 comments on commit 8585fe5

Please sign in to comment.