diff --git a/be/src/pipeline/exec/set_sink_operator.cpp b/be/src/pipeline/exec/set_sink_operator.cpp index 5fc38f3ca706ac..9d7ff33a22fcd3 100644 --- a/be/src/pipeline/exec/set_sink_operator.cpp +++ b/be/src/pipeline/exec/set_sink_operator.cpp @@ -26,7 +26,8 @@ namespace doris::pipeline { template -Status SetSinkOperatorX::sink(RuntimeState* state, vectorized::Block* in_block, +Status SetSinkOperatorX::sink(RuntimeState* state, + vectorized::Block* in_block, bool eos) { constexpr static auto BUILD_BLOCK_MAX_SIZE = 4 * 1024UL * 1024UL * 1024UL; RETURN_IF_CANCELLED(state); @@ -36,18 +37,22 @@ Status SetSinkOperatorX::sink(RuntimeState* state, vectorized::Blo COUNTER_UPDATE(local_state.rows_input_counter(), (int64_t)in_block->rows()); auto& build_block = local_state._shared_state->build_block; - auto& valid_element_in_hash_tbl = local_state._shared_state->valid_element_in_hash_tbl; + auto& valid_element_in_hash_tbl = + local_state._shared_state->valid_element_in_hash_tbl; if (in_block->rows() != 0) { RETURN_IF_ERROR(local_state._mutable_block.merge(*in_block)); - if (local_state._mutable_block.rows() > std::numeric_limits::max()) { - return Status::NotSupported("set operator do not support build table rows over:" + - std::to_string(std::numeric_limits::max())); + if (local_state._mutable_block.rows() > + std::numeric_limits::max()) { + return Status::NotSupported( + "set operator do not support build table rows over:" + + std::to_string(std::numeric_limits::max())); } } - if (eos || local_state._mutable_block.allocated_bytes() >= BUILD_BLOCK_MAX_SIZE) { + if (eos || + local_state._mutable_block.allocated_bytes() >= BUILD_BLOCK_MAX_SIZE) { build_block = local_state._mutable_block.to_block(); RETURN_IF_ERROR(_process_build_block(local_state, build_block, state)); local_state._mutable_block.clear(); @@ -57,16 +62,18 @@ Status SetSinkOperatorX::sink(RuntimeState* state, vectorized::Blo valid_element_in_hash_tbl = 0; } else { std::visit( - [&](auto&& arg) { - using HashTableCtxType = std::decay_t; - if constexpr (!std::is_same_v) { - valid_element_in_hash_tbl = arg.hash_table->size(); - } - }, - *local_state._shared_state->hash_table_variants); + [&](auto&& arg) { + using HashTableCtxType = std::decay_t; + if constexpr (!std::is_same_v) { + valid_element_in_hash_tbl = arg.hash_table->size(); + } + }, + *local_state._shared_state->hash_table_variants); } - local_state._shared_state->probe_finished_children_dependency[_cur_child_id + 1] - ->set_ready(); + local_state._shared_state + ->probe_finished_children_dependency[_cur_child_id + 1] + ->set_ready(); if (_child_quantity == 1) { local_state._dependency->set_ready_to_read(); } @@ -77,8 +84,9 @@ Status SetSinkOperatorX::sink(RuntimeState* state, vectorized::Blo template Status SetSinkOperatorX::_process_build_block( - SetSinkLocalState& local_state, vectorized::Block& block, - RuntimeState* state) { + SetSinkLocalState& local_state, + vectorized::Block& block, + RuntimeState* state) { size_t rows = block.rows(); if (rows == 0) { return Status::OK(); @@ -89,32 +97,37 @@ Status SetSinkOperatorX::_process_build_block( RETURN_IF_ERROR(_extract_build_column(local_state, block, raw_ptrs, rows)); std::visit( - [&](auto&& arg) { - using HashTableCtxType = std::decay_t; - if constexpr (!std::is_same_v) { - vectorized::HashTableBuild - hash_table_build_process(&local_state, rows, raw_ptrs, state); - static_cast(hash_table_build_process(arg, local_state._arena)); - } else { - LOG(FATAL) << "FATAL: uninited hash table"; - __builtin_unreachable(); - } - }, - *local_state._shared_state->hash_table_variants); + [&](auto&& arg) { + using HashTableCtxType = std::decay_t; + if constexpr (!std::is_same_v) { + vectorized::HashTableBuild + hash_table_build_process(&local_state, rows, raw_ptrs, + state); + static_cast( + hash_table_build_process(arg, local_state._arena)); + } else { + LOG(FATAL) << "FATAL: uninited hash table"; + __builtin_unreachable(); + } + }, + *local_state._shared_state->hash_table_variants); return Status::OK(); } template Status SetSinkOperatorX::_extract_build_column( - SetSinkLocalState& local_state, vectorized::Block& block, - vectorized::ColumnRawPtrs& raw_ptrs, size_t& rows) { + SetSinkLocalState& local_state, + vectorized::Block& block, + vectorized::ColumnRawPtrs& raw_ptrs, + size_t& rows) { std::vector result_locs(_child_exprs.size(), -1); bool is_all_const = true; for (size_t i = 0; i < _child_exprs.size(); ++i) { RETURN_IF_ERROR(_child_exprs[i]->execute(&block, &result_locs[i])); - is_all_const &= is_column_const(*block.get_by_position(result_locs[i]).column); + is_all_const &= + is_column_const(*block.get_by_position(result_locs[i]).column); } rows = is_all_const ? 1 : rows; @@ -123,16 +136,17 @@ Status SetSinkOperatorX::_extract_build_column( if (is_all_const) { block.get_by_position(result_col_id).column = - assert_cast( - *block.get_by_position(result_col_id).column) - .get_data_column_ptr(); + assert_cast( + *block.get_by_position(result_col_id).column) + .get_data_column_ptr(); } else { block.get_by_position(result_col_id).column = - block.get_by_position(result_col_id).column->convert_to_full_column_if_const(); + block.get_by_position(result_col_id) + .column->convert_to_full_column_if_const(); } if (local_state._shared_state->build_not_ignore_null[i]) { block.get_by_position(result_col_id).column = - make_nullable(block.get_by_position(result_col_id).column); + make_nullable(block.get_by_position(result_col_id).column); } raw_ptrs[i] = block.get_by_position(result_col_id).column.get(); @@ -143,16 +157,19 @@ Status SetSinkOperatorX::_extract_build_column( } template -Status SetSinkLocalState::init(RuntimeState* state, LocalSinkStateInfo& info) { +Status SetSinkLocalState::init(RuntimeState* state, + LocalSinkStateInfo& info) { RETURN_IF_ERROR(PipelineXSinkLocalState::init(state, info)); SCOPED_TIMER(exec_time_counter()); SCOPED_TIMER(_init_timer); _build_timer = ADD_TIMER(_profile, "BuildTime"); auto& parent = _parent->cast(); - _shared_state->probe_finished_children_dependency[parent._cur_child_id] = _dependency; + _shared_state->probe_finished_children_dependency[parent._cur_child_id] = + _dependency; DCHECK(parent._cur_child_id == 0); auto& child_exprs_lists = _shared_state->child_exprs_lists; - DCHECK(child_exprs_lists.empty() || child_exprs_lists.size() == parent._child_quantity); + DCHECK(child_exprs_lists.empty() || + child_exprs_lists.size() == parent._child_quantity); if (child_exprs_lists.empty()) { child_exprs_lists.resize(parent._child_quantity); } @@ -171,16 +188,22 @@ Status SetSinkLocalState::open(RuntimeState* state) { SCOPED_TIMER(_open_timer); RETURN_IF_ERROR(PipelineXSinkLocalState::open(state)); + auto output_data_types = vectorized::VectorizedUtils::get_data_types( + _parent->cast>()._row_descriptor); auto& parent = _parent->cast(); DCHECK(parent._cur_child_id == 0); auto& child_exprs_lists = _shared_state->child_exprs_lists; - _shared_state->build_not_ignore_null.resize(child_exprs_lists[parent._cur_child_id].size()); - _shared_state->hash_table_variants = std::make_unique(); + _shared_state->build_not_ignore_null.resize( + child_exprs_lists[parent._cur_child_id].size()); + _shared_state->hash_table_variants = + std::make_unique(); for (const auto& ctl : child_exprs_lists) { for (int i = 0; i < ctl.size(); ++i) { _shared_state->build_not_ignore_null[i] = - _shared_state->build_not_ignore_null[i] || ctl[i]->root()->is_nullable(); + (output_data_types[i]->is_nullable() || + _shared_state->build_not_ignore_null[i] || + ctl[i]->root()->is_nullable()); } } _shared_state->hash_table_init(); @@ -188,7 +211,8 @@ Status SetSinkLocalState::open(RuntimeState* state) { } template -Status SetSinkOperatorX::init(const TPlanNode& tnode, RuntimeState* state) { +Status SetSinkOperatorX::init(const TPlanNode& tnode, + RuntimeState* state) { Base::_name = "SET_SINK_OPERATOR"; const std::vector>* result_texpr_lists; @@ -198,7 +222,8 @@ Status SetSinkOperatorX::init(const TPlanNode& tnode, RuntimeState } else if (tnode.node_type == TPlanNodeType::type::EXCEPT_NODE) { result_texpr_lists = &(tnode.except_node.result_expr_lists); } else { - return Status::NotSupported("Not Implemented, Check The Operation Node."); + return Status::NotSupported( + "Not Implemented, Check The Operation Node."); } const auto& texpr = (*result_texpr_lists)[_cur_child_id]; @@ -210,7 +235,8 @@ Status SetSinkOperatorX::init(const TPlanNode& tnode, RuntimeState template Status SetSinkOperatorX::prepare(RuntimeState* state) { RETURN_IF_ERROR(Base::prepare(state)); - return vectorized::VExpr::prepare(_child_exprs, state, _child_x->row_desc()); + return vectorized::VExpr::prepare(_child_exprs, state, + _child_x->row_desc()); } template @@ -224,4 +250,4 @@ template class SetSinkLocalState; template class SetSinkOperatorX; template class SetSinkOperatorX; -} // namespace doris::pipeline +} // namespace doris::pipeline diff --git a/be/src/pipeline/exec/set_sink_operator.h b/be/src/pipeline/exec/set_sink_operator.h index 09a1fa09e7ccbf..46c75b97150b5d 100644 --- a/be/src/pipeline/exec/set_sink_operator.h +++ b/be/src/pipeline/exec/set_sink_operator.h @@ -77,7 +77,8 @@ class SetSinkOperatorX final : public DataSinkOperatorX _partition_exprs; using OperatorBase::_child_x; + const RowDescriptor _row_descriptor; }; } // namespace pipeline diff --git a/regression-test/data/correctness_p0/test_set_operation.out b/regression-test/data/correctness_p0/test_set_operation.out index 09fa8314065ac0..0e19fba622d9ab 100644 --- a/regression-test/data/correctness_p0/test_set_operation.out +++ b/regression-test/data/correctness_p0/test_set_operation.out @@ -11,3 +11,7 @@ aaaa bbbb -- !select1 -- + +-- !select1 -- +3.0 + diff --git a/regression-test/suites/correctness_p0/test_set_operation.groovy b/regression-test/suites/correctness_p0/test_set_operation.groovy index 5ee6348a037685..eadcb3cf8ad32e 100644 --- a/regression-test/suites/correctness_p0/test_set_operation.groovy +++ b/regression-test/suites/correctness_p0/test_set_operation.groovy @@ -127,4 +127,6 @@ suite("test_set_operation") { qt_select1 """ (select 0) intersect (select null); """ + qt_select1 """ select sqrt('9') except select sqrt('4'); """ + }