Skip to content

Commit 09827b8

Browse files
authored
[fix](doris catalog) FragmentMgr should not cancel virtual doris cluster query (#62135) (#62281)
Cherry-picked from #62135
1 parent 7d94c30 commit 09827b8

4 files changed

Lines changed: 240 additions & 4 deletions

File tree

be/src/runtime/fragment_mgr.cpp

Lines changed: 15 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -737,9 +737,16 @@ Status FragmentMgr::_get_or_create_query_ctx(const TPipelineFragmentParams& para
737737

738738
// This may be a first fragment request of the query.
739739
// Create the query fragments context.
740-
query_ctx = QueryContext::create(query_id, _exec_env, params.query_options,
741-
params.coord, params.is_nereids,
742-
params.current_connect_fe, query_source);
740+
// Cross-cluster query: coordinator FE may not belong to local cluster.
741+
// In that case, cancel_worker() should not cancel it based on local FE liveness.
742+
QuerySource actual_query_source = query_source;
743+
if (query_source == QuerySource::INTERNAL_FRONTEND &&
744+
!_exec_env->get_running_frontends().contains(params.coord)) {
745+
actual_query_source = QuerySource::EXTERNAL_FRONTEND;
746+
}
747+
query_ctx = QueryContext::create(
748+
query_id, _exec_env, params.query_options, params.coord,
749+
params.is_nereids, params.current_connect_fe, actual_query_source);
743750
SCOPED_SWITCH_THREAD_MEM_TRACKER_LIMITER(query_ctx->query_mem_tracker());
744751
RETURN_IF_ERROR(DescriptorTbl::create(
745752
&(query_ctx->obj_pool), params.desc_tbl, &(query_ctx->desc_tbl)));
@@ -1024,6 +1031,11 @@ void FragmentMgr::cancel_worker() {
10241031
-> Status {
10251032
for (const auto& it : map) {
10261033
if (auto q_ctx = it.second.lock()) {
1034+
// Cross-cluster query: coordinator FE is not in local `running_fes`,
1035+
// we should not cancel it based on local coordinator liveness.
1036+
if (q_ctx->get_query_source() == QuerySource::EXTERNAL_FRONTEND) {
1037+
continue;
1038+
}
10271039
q_contexts.push_back(q_ctx);
10281040
const int64_t fe_process_uuid = q_ctx->get_fe_process_uuid();
10291041

be/src/runtime/query_context.cpp

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -72,6 +72,8 @@ const std::string toString(QuerySource queryType) {
7272
return "ROUTINE_LOAD";
7373
case QuerySource::EXTERNAL_CONNECTOR:
7474
return "EXTERNAL_CONNECTOR";
75+
case QuerySource::EXTERNAL_FRONTEND:
76+
return "EXTERNAL_FRONTEND";
7577
default:
7678
return "UNKNOWN";
7779
}

be/src/runtime/query_context.h

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -70,7 +70,8 @@ enum class QuerySource {
7070
STREAM_LOAD,
7171
GROUP_COMMIT_LOAD,
7272
ROUTINE_LOAD,
73-
EXTERNAL_CONNECTOR
73+
EXTERNAL_CONNECTOR,
74+
EXTERNAL_FRONTEND
7475
};
7576

7677
const std::string toString(QuerySource query_source);
Lines changed: 221 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,221 @@
1+
// Licensed to the Apache Software Foundation (ASF) under one
2+
// or more contributor license agreements. See the NOTICE file
3+
// distributed with this work for additional information
4+
// regarding copyright ownership. The ASF licenses this file
5+
// to you under the Apache License, Version 2.0 (the
6+
// "License"); you may not use this file except in compliance
7+
// with the License. You may obtain a copy of the License at
8+
//
9+
// http://www.apache.org/licenses/LICENSE-2.0
10+
//
11+
// Unless required by applicable law or agreed to in writing,
12+
// software distributed under the License is distributed on an
13+
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14+
// KIND, either express or implied. See the License for the
15+
// specific language governing permissions and limitations
16+
// under the License.
17+
18+
#include <gen_cpp/HeartbeatService_types.h>
19+
#include <gen_cpp/PaloInternalService_types.h>
20+
#include <gtest/gtest.h>
21+
22+
#include <chrono>
23+
#include <thread>
24+
25+
#include "common/config.h"
26+
#include "runtime/descriptor_helper.h"
27+
#include "runtime/exec_env.h"
28+
#include "runtime/fragment_mgr.h"
29+
#include "runtime/workload_group/workload_group_manager.h"
30+
31+
namespace doris {
32+
33+
class FragmentMgrCrossClusterCancelTest : public testing::Test {
34+
public:
35+
void SetUp() override {
36+
_origin_cancel_worker_interval_seconds =
37+
config::fragment_mgr_cancel_worker_interval_seconds;
38+
// Make cancel_worker run quickly in UT, and restore it in TearDown.
39+
config::fragment_mgr_cancel_worker_interval_seconds = 1;
40+
41+
// Make frontends list deterministic for both this ExecEnv instance and global ExecEnv.
42+
_origin_global_frontends = ExecEnv::GetInstance()->get_frontends();
43+
ExecEnv::GetInstance()->update_frontends({});
44+
_exec_env.update_frontends({});
45+
46+
_exec_env._workload_group_manager = new WorkloadGroupMgr();
47+
// Ensure there is a "normal" workload group, otherwise WorkloadGroupMgr::get_group() will throw.
48+
WorkloadGroupInfo normal_wg_info {.id = 1, .name = "normal"};
49+
_exec_env._workload_group_manager->get_or_create_workload_group(normal_wg_info);
50+
51+
_exec_env._fragment_mgr = new FragmentMgr(&_exec_env);
52+
}
53+
54+
void TearDown() override {
55+
if (_exec_env._fragment_mgr != nullptr) {
56+
_exec_env._fragment_mgr->stop();
57+
}
58+
delete _exec_env._fragment_mgr;
59+
_exec_env._fragment_mgr = nullptr;
60+
delete _exec_env._workload_group_manager;
61+
_exec_env._workload_group_manager = nullptr;
62+
_exec_env.update_frontends({});
63+
64+
ExecEnv::GetInstance()->update_frontends(_origin_global_frontends);
65+
config::fragment_mgr_cancel_worker_interval_seconds =
66+
_origin_cancel_worker_interval_seconds;
67+
}
68+
69+
protected:
70+
static TDescriptorTable _make_min_desc_tbl() {
71+
TDescriptorTableBuilder dtb;
72+
TTupleDescriptorBuilder tuple_builder;
73+
tuple_builder.add_slot(TSlotDescriptorBuilder()
74+
.type(TYPE_INT)
75+
.nullable(true)
76+
.column_name("c1")
77+
.column_pos(1)
78+
.build());
79+
tuple_builder.build(&dtb);
80+
return dtb.desc_tbl();
81+
}
82+
83+
static TQueryOptions _make_min_query_options(int64_t fe_process_uuid) {
84+
TQueryOptions query_options;
85+
query_options.__set_query_type(TQueryType::SELECT);
86+
query_options.__set_execution_timeout(60);
87+
query_options.__set_query_timeout(60);
88+
query_options.__set_mem_limit(64L * 1024 * 1024);
89+
query_options.__set_fe_process_uuid(fe_process_uuid);
90+
return query_options;
91+
}
92+
93+
ExecEnv _exec_env;
94+
int32_t _origin_cancel_worker_interval_seconds = 0;
95+
std::vector<TFrontendInfo> _origin_global_frontends;
96+
};
97+
98+
TEST_F(FragmentMgrCrossClusterCancelTest,
99+
MarkQuerySourceAsExternalFrontendWhenCoordinatorNotLocal) {
100+
auto* fragment_mgr = _exec_env.fragment_mgr();
101+
ASSERT_NE(fragment_mgr, nullptr);
102+
103+
TUniqueId query_id;
104+
query_id.__set_hi(1);
105+
query_id.__set_lo(2);
106+
107+
TNetworkAddress coord;
108+
coord.hostname = "fe-a";
109+
coord.port = 9030;
110+
111+
TPipelineFragmentParams params;
112+
params.__set_query_id(query_id);
113+
params.__set_is_simplified_param(false);
114+
params.__set_coord(coord);
115+
params.__set_is_nereids(false);
116+
params.__set_current_connect_fe(coord);
117+
params.__set_fragment_num_on_host(1);
118+
params.__set_query_options(_make_min_query_options(/*fe_process_uuid*/ 123));
119+
params.__set_desc_tbl(_make_min_desc_tbl());
120+
121+
std::shared_ptr<QueryContext> query_ctx;
122+
TPipelineFragmentParamsList parent;
123+
auto st = fragment_mgr->_get_or_create_query_ctx(params, parent, QuerySource::INTERNAL_FRONTEND,
124+
query_ctx);
125+
ASSERT_TRUE(st.ok()) << st.to_string();
126+
ASSERT_NE(query_ctx, nullptr);
127+
EXPECT_EQ(query_ctx->get_query_source(), QuerySource::EXTERNAL_FRONTEND);
128+
}
129+
130+
TEST_F(FragmentMgrCrossClusterCancelTest, CancelWorkerSkipsExternalFrontendQuery) {
131+
auto* fragment_mgr = _exec_env.fragment_mgr();
132+
ASSERT_NE(fragment_mgr, nullptr);
133+
134+
// Make global running frontends non-empty so cancel_worker executes the invalid-query check path.
135+
// NOTE: cancel_worker uses ExecEnv::GetInstance()->get_running_frontends().
136+
TFrontendInfo global_fe;
137+
global_fe.coordinator_address.hostname = "fe-global";
138+
global_fe.coordinator_address.port = 9030;
139+
global_fe.process_uuid = 777;
140+
ExecEnv::GetInstance()->update_frontends({global_fe});
141+
142+
// Make local running frontends contain `coord_internal` so this query remains INTERNAL_FRONTEND.
143+
// NOTE: _get_or_create_query_ctx uses `_exec_env->get_running_frontends()`.
144+
TNetworkAddress coord_internal;
145+
coord_internal.hostname = "fe-local";
146+
coord_internal.port = 9030;
147+
148+
TFrontendInfo local_fe;
149+
local_fe.coordinator_address = coord_internal;
150+
local_fe.process_uuid = 999;
151+
_exec_env.update_frontends({local_fe});
152+
153+
// Create an INTERNAL_FRONTEND query (should be cancelled by cancel_worker when coordinator not found).
154+
TUniqueId internal_query_id;
155+
internal_query_id.__set_hi(3);
156+
internal_query_id.__set_lo(4);
157+
158+
TPipelineFragmentParams internal_params;
159+
internal_params.__set_query_id(internal_query_id);
160+
internal_params.__set_is_simplified_param(false);
161+
internal_params.__set_coord(coord_internal);
162+
internal_params.__set_is_nereids(false);
163+
internal_params.__set_current_connect_fe(coord_internal);
164+
internal_params.__set_fragment_num_on_host(1);
165+
internal_params.__set_query_options(_make_min_query_options(/*fe_process_uuid*/ 111));
166+
internal_params.__set_desc_tbl(_make_min_desc_tbl());
167+
168+
std::shared_ptr<QueryContext> internal_query_ctx;
169+
TPipelineFragmentParamsList parent;
170+
auto st = fragment_mgr->_get_or_create_query_ctx(
171+
internal_params, parent, QuerySource::INTERNAL_FRONTEND, internal_query_ctx);
172+
ASSERT_TRUE(st.ok()) << st.to_string();
173+
ASSERT_NE(internal_query_ctx, nullptr);
174+
ASSERT_EQ(internal_query_ctx->get_query_source(), QuerySource::INTERNAL_FRONTEND);
175+
176+
// Create a cross-cluster query (coordinator not in local frontends), it should be marked as EXTERNAL_FRONTEND.
177+
TUniqueId external_query_id;
178+
external_query_id.__set_hi(5);
179+
external_query_id.__set_lo(6);
180+
181+
TNetworkAddress coord_external;
182+
coord_external.hostname = "fe-remote";
183+
coord_external.port = 9030;
184+
185+
TPipelineFragmentParams external_params;
186+
external_params.__set_query_id(external_query_id);
187+
external_params.__set_is_simplified_param(false);
188+
external_params.__set_coord(coord_external);
189+
external_params.__set_is_nereids(false);
190+
external_params.__set_current_connect_fe(coord_external);
191+
external_params.__set_fragment_num_on_host(1);
192+
external_params.__set_query_options(_make_min_query_options(/*fe_process_uuid*/ 222));
193+
external_params.__set_desc_tbl(_make_min_desc_tbl());
194+
195+
std::shared_ptr<QueryContext> external_query_ctx;
196+
st = fragment_mgr->_get_or_create_query_ctx(external_params, parent,
197+
QuerySource::INTERNAL_FRONTEND, external_query_ctx);
198+
ASSERT_TRUE(st.ok()) << st.to_string();
199+
ASSERT_NE(external_query_ctx, nullptr);
200+
ASSERT_EQ(external_query_ctx->get_query_source(), QuerySource::EXTERNAL_FRONTEND);
201+
202+
// Wait for background cancel_worker to cancel the INTERNAL_FRONTEND query,
203+
// and keep the EXTERNAL_FRONTEND query alive.
204+
// NOTE: In BE_TEST, FragmentMgr::remove_query_context() does not erase `_query_ctx_map`,
205+
// so we validate by `is_cancelled()` instead of expecting get_query_ctx() == nullptr.
206+
constexpr int kMaxWaitMs = 5000;
207+
int waited_ms = 0;
208+
while (waited_ms < kMaxWaitMs) {
209+
if (internal_query_ctx->is_cancelled()) {
210+
break;
211+
}
212+
std::this_thread::sleep_for(std::chrono::milliseconds(100));
213+
waited_ms += 100;
214+
}
215+
216+
EXPECT_TRUE(internal_query_ctx->is_cancelled());
217+
EXPECT_FALSE(external_query_ctx->is_cancelled());
218+
EXPECT_EQ(external_query_ctx->get_query_source(), QuerySource::EXTERNAL_FRONTEND);
219+
}
220+
221+
} // namespace doris

0 commit comments

Comments
 (0)