diff --git a/be/src/exec/operator/cache_source_operator.cpp b/be/src/exec/operator/cache_source_operator.cpp index 91d48d77470ec6..e0ce634e03dfc7 100644 --- a/be/src/exec/operator/cache_source_operator.cpp +++ b/be/src/exec/operator/cache_source_operator.cpp @@ -72,9 +72,11 @@ Status CacheSourceLocalState::init(RuntimeState* state, LocalStateInfo& info) { custom_profile()->add_info_string("CacheTabletId", tablet_ids_str); // 3. lookup the cache and find proper slot order - hit_cache = _global_cache->lookup(_cache_key, _version, &_query_cache_handle); + if (!cache_param.force_refresh_query_cache) { + hit_cache = _global_cache->lookup(_cache_key, _version, &_query_cache_handle); + } custom_profile()->add_info_string("HitCache", std::to_string(hit_cache)); - if (hit_cache && !cache_param.force_refresh_query_cache) { + if (hit_cache) { _hit_cache_results = _query_cache_handle.get_cache_result(); auto hit_cache_slot_orders = _query_cache_handle.get_cache_slot_orders(); diff --git a/be/src/exec/operator/olap_table_sink_operator.h b/be/src/exec/operator/olap_table_sink_operator.h index 55b45e8a55ee85..fdb3756bc023c8 100644 --- a/be/src/exec/operator/olap_table_sink_operator.h +++ b/be/src/exec/operator/olap_table_sink_operator.h @@ -38,9 +38,9 @@ class OlapTableSinkLocalState final class OlapTableSinkOperatorX final : public DataSinkOperatorX { public: using Base = DataSinkOperatorX; - OlapTableSinkOperatorX(ObjectPool* pool, int operator_id, const RowDescriptor& row_desc, - const std::vector& t_output_expr) - : Base(operator_id, 0, 0), + OlapTableSinkOperatorX(ObjectPool* pool, int operator_id, int node_id, + const RowDescriptor& row_desc, const std::vector& t_output_expr) + : Base(operator_id, node_id, node_id), _row_desc(row_desc), _t_output_expr(t_output_expr), _pool(pool) {}; diff --git a/be/src/exec/operator/olap_table_sink_v2_operator.h b/be/src/exec/operator/olap_table_sink_v2_operator.h index 0426a7b59d4f14..d97ea631a08429 100644 --- a/be/src/exec/operator/olap_table_sink_v2_operator.h +++ b/be/src/exec/operator/olap_table_sink_v2_operator.h @@ -39,9 +39,9 @@ class OlapTableSinkV2LocalState final class OlapTableSinkV2OperatorX final : public DataSinkOperatorX { public: using Base = DataSinkOperatorX; - OlapTableSinkV2OperatorX(ObjectPool* pool, int operator_id, const RowDescriptor& row_desc, - const std::vector& t_output_expr) - : Base(operator_id, 0, 0), + OlapTableSinkV2OperatorX(ObjectPool* pool, int operator_id, int node_id, + const RowDescriptor& row_desc, const std::vector& t_output_expr) + : Base(operator_id, node_id, node_id), _row_desc(row_desc), _t_output_expr(t_output_expr), _pool(pool) {}; diff --git a/be/src/exec/operator/result_sink_operator.cpp b/be/src/exec/operator/result_sink_operator.cpp index da684ffc8d18b8..162cf0b7d61de8 100644 --- a/be/src/exec/operator/result_sink_operator.cpp +++ b/be/src/exec/operator/result_sink_operator.cpp @@ -94,11 +94,11 @@ Status ResultSinkLocalState::open(RuntimeState* state) { return Status::OK(); } -ResultSinkOperatorX::ResultSinkOperatorX(int operator_id, const RowDescriptor& row_desc, +ResultSinkOperatorX::ResultSinkOperatorX(int operator_id, int node_id, + const RowDescriptor& row_desc, const std::vector& t_output_expr, const TResultSink& sink) - : DataSinkOperatorX(operator_id, std::numeric_limits::max(), - std::numeric_limits::max()), + : DataSinkOperatorX(operator_id, node_id, node_id), _sink_type(!sink.__isset.type || sink.type == TResultSinkType::MYSQL_PROTOCOL ? TResultSinkType::MYSQL_PROTOCOL : sink.type), diff --git a/be/src/exec/operator/result_sink_operator.h b/be/src/exec/operator/result_sink_operator.h index 7fdbca0b8dc549..e8cc0fd25de90b 100644 --- a/be/src/exec/operator/result_sink_operator.h +++ b/be/src/exec/operator/result_sink_operator.h @@ -156,7 +156,7 @@ class ResultSinkLocalState final : public PipelineXSinkLocalState { public: - ResultSinkOperatorX(int operator_id, const RowDescriptor& row_desc, + ResultSinkOperatorX(int operator_id, int node_id, const RowDescriptor& row_desc, const std::vector& select_exprs, const TResultSink& sink); Status prepare(RuntimeState* state) override; diff --git a/be/src/exec/pipeline/pipeline_fragment_context.cpp b/be/src/exec/pipeline/pipeline_fragment_context.cpp index 265d9fa24d2388..e04745a853eb46 100644 --- a/be/src/exec/pipeline/pipeline_fragment_context.cpp +++ b/be/src/exec/pipeline/pipeline_fragment_context.cpp @@ -1074,8 +1074,11 @@ Status PipelineFragmentContext::_create_data_sink(ObjectPool* pool, const TDataS return Status::InternalError("Missing data buffer sink."); } - _sink = std::make_shared(next_sink_operator_id(), row_desc, - output_exprs, thrift_sink.result_sink); + auto& pipeline = _pipelines[cur_pipeline_id]; + int child_node_id = pipeline->operators().back()->node_id(); + _sink = std::make_shared(next_sink_operator_id(), child_node_id + 1, + row_desc, output_exprs, + thrift_sink.result_sink); break; } case TDataSinkType::DICTIONARY_SINK: { @@ -1089,14 +1092,16 @@ Status PipelineFragmentContext::_create_data_sink(ObjectPool* pool, const TDataS } case TDataSinkType::GROUP_COMMIT_OLAP_TABLE_SINK: case TDataSinkType::OLAP_TABLE_SINK: { + auto& pipeline = _pipelines[cur_pipeline_id]; + int child_node_id = pipeline->operators().back()->node_id(); if (state->query_options().enable_memtable_on_sink_node && !_has_inverted_index_v1_or_partial_update(thrift_sink.olap_table_sink) && !config::is_cloud_mode()) { - _sink = std::make_shared(pool, next_sink_operator_id(), - row_desc, output_exprs); + _sink = std::make_shared( + pool, next_sink_operator_id(), child_node_id + 1, row_desc, output_exprs); } else { - _sink = std::make_shared(pool, next_sink_operator_id(), - row_desc, output_exprs); + _sink = std::make_shared( + pool, next_sink_operator_id(), child_node_id + 1, row_desc, output_exprs); } break; }