Skip to content

Commit 52975b5

Browse files
committed
[fix](be) Address review feedback on workload group pause handling
Issue Number: None Related PR: #62285 Problem Summary: Address review feedback for the workload group paused-query change by avoiding unsigned wraparound in the test helper loop, making the reserve-failure test cleanup exception-safe, and clarifying the cancel message when the query itself exceeds its memory limit during workload group pressure handling. None - Test: No need to test (small follow-up on test cleanup and cancellation message; attempted incremental build but the sandbox hit a ccache temporary-directory permission issue) - Behavior changed: Yes (the cancellation message now reports query memory limit exceeded instead of workload group memory exceeded in that branch) - Does this need documentation: No
1 parent 55e82f8 commit 52975b5

2 files changed

Lines changed: 58 additions & 53 deletions

File tree

be/src/runtime/workload_group/workload_group_manager.cpp

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -779,8 +779,8 @@ bool WorkloadGroupMgr::handle_single_query_(const std::shared_ptr<ResourceContex
779779
return false;
780780
} else {
781781
Status error_status = Status::MemoryLimitExceeded(
782-
"Query {} workload group memory is exceeded"
783-
", and there is no cache now. And could not find task to spill, "
782+
"Query {} memory limit is exceeded while handling workload group memory "
783+
"pressure, and there is no cache now. And could not find task to spill, "
784784
"try to cancel query. "
785785
"Query memory usage: {}, limit: {}, reserved "
786786
"size: {}, try to reserve: {}, wg info: {}."

be/test/runtime/workload_group/workload_group_manager_test.cpp

Lines changed: 56 additions & 51 deletions
Original file line numberDiff line numberDiff line change
@@ -42,6 +42,7 @@
4242
#include "runtime/workload_group/workload_group.h"
4343
#include "storage/olap_define.h"
4444
#include "testutil/mock/mock_query_task_controller.h"
45+
#include "util/defer_op.h"
4546

4647
namespace doris {
4748

@@ -133,7 +134,8 @@ class WorkloadGroupManagerTest : public testing::Test {
133134

134135
void _run_checking_loop(const std::shared_ptr<WorkloadGroup>& wg, size_t check_times = 300) {
135136
CountDownLatch latch(1);
136-
while (check_times-- > 0) {
137+
while (check_times > 0) {
138+
--check_times;
137139
_wg_manager->handle_paused_queries();
138140
if (!_wg_manager->_paused_queries_list.contains(wg) ||
139141
_wg_manager->_paused_queries_list[wg].empty()) {
@@ -243,59 +245,62 @@ TEST_F(WorkloadGroupManagerTest, wg_reserve_failed_before_query_limit_and_high_w
243245
.slot_mem_policy = TWgSlotMemoryPolicy::NONE};
244246
auto wg = _wg_manager->get_or_create_workload_group(wg_info);
245247
auto query_context = _generate_on_query(wg, 4096);
246-
ThreadContext thread_context;
247-
thread_context.attach_task(query_context->resource_ctx());
248-
249-
auto st = thread_context.thread_mem_tracker_mgr->try_reserve(2048);
250-
ASSERT_TRUE(st.ok()) << st.to_string();
251-
ASSERT_EQ(query_context->resource_ctx()->memory_context()->current_memory_bytes(), 2048);
252-
ASSERT_EQ(query_context->resource_ctx()->memory_context()->reserved_consumption(), 2048);
253-
ASSERT_LT(query_context->resource_ctx()->memory_context()->current_memory_bytes(),
254-
query_context->resource_ctx()->memory_context()->mem_limit());
255-
ASSERT_LT(query_context->resource_ctx()->memory_context()->current_memory_bytes() + 1024,
256-
query_context->resource_ctx()->memory_context()->mem_limit());
257-
258-
bool exceed_low_watermark = false;
259-
bool exceed_high_watermark = false;
260-
wg->check_mem_used(&exceed_low_watermark, &exceed_high_watermark);
261-
ASSERT_FALSE(exceed_low_watermark);
262-
ASSERT_FALSE(exceed_high_watermark);
263-
264-
st = thread_context.thread_mem_tracker_mgr->try_reserve(1024);
265-
ASSERT_TRUE(st.is<ErrorCode::WORKLOAD_GROUP_MEMORY_EXCEEDED>()) << st.to_string();
266-
ASSERT_FALSE(st.is<ErrorCode::QUERY_MEMORY_EXCEEDED>()) << st.to_string();
267-
ASSERT_EQ(query_context->resource_ctx()->memory_context()->current_memory_bytes(), 2048);
268-
ASSERT_EQ(query_context->resource_ctx()->memory_context()->reserved_consumption(), 2048);
269-
270-
_wg_manager->add_paused_query(query_context->resource_ctx(), 1024, st);
271-
ASSERT_FALSE(query_context->get_memory_sufficient_dependency()->ready());
272248
{
273-
std::unique_lock<std::mutex> lock(_wg_manager->_paused_queries_lock);
274-
ASSERT_EQ(_wg_manager->_paused_queries_list[wg].size(), 1)
275-
<< "paused queue should not be empty";
276-
}
277-
278-
_run_checking_loop(wg, 3);
279-
280-
ASSERT_FALSE(query_context->get_memory_sufficient_dependency()->ready());
281-
ASSERT_TRUE(query_context->resource_ctx()
282-
->task_controller()
283-
->paused_reason()
284-
.is<ErrorCode::WORKLOAD_GROUP_MEMORY_EXCEEDED>());
285-
ASSERT_FALSE(query_context->is_cancelled());
286-
{
287-
std::unique_lock<std::mutex> lock(_wg_manager->_paused_queries_lock);
288-
ASSERT_EQ(_wg_manager->_paused_queries_list[wg].size(), 1)
289-
<< "paused queue should keep the query";
290-
}
249+
ThreadContext thread_context;
250+
thread_context.attach_task(query_context->resource_ctx());
251+
Defer cleanup {[&]() {
252+
{
253+
std::unique_lock<std::mutex> lock(_wg_manager->_paused_queries_lock);
254+
_wg_manager->_paused_queries_list.erase(wg);
255+
}
256+
query_context->set_memory_sufficient(true);
257+
thread_context.thread_mem_tracker_mgr->shrink_reserved();
258+
thread_context.detach_task();
259+
}};
260+
261+
auto st = thread_context.thread_mem_tracker_mgr->try_reserve(2048);
262+
ASSERT_TRUE(st.ok()) << st.to_string();
263+
ASSERT_EQ(query_context->resource_ctx()->memory_context()->current_memory_bytes(), 2048);
264+
ASSERT_EQ(query_context->resource_ctx()->memory_context()->reserved_consumption(), 2048);
265+
ASSERT_LT(query_context->resource_ctx()->memory_context()->current_memory_bytes(),
266+
query_context->resource_ctx()->memory_context()->mem_limit());
267+
ASSERT_LT(query_context->resource_ctx()->memory_context()->current_memory_bytes() + 1024,
268+
query_context->resource_ctx()->memory_context()->mem_limit());
269+
270+
bool exceed_low_watermark = false;
271+
bool exceed_high_watermark = false;
272+
wg->check_mem_used(&exceed_low_watermark, &exceed_high_watermark);
273+
ASSERT_FALSE(exceed_low_watermark);
274+
ASSERT_FALSE(exceed_high_watermark);
275+
276+
st = thread_context.thread_mem_tracker_mgr->try_reserve(1024);
277+
ASSERT_TRUE(st.is<ErrorCode::WORKLOAD_GROUP_MEMORY_EXCEEDED>()) << st.to_string();
278+
ASSERT_FALSE(st.is<ErrorCode::QUERY_MEMORY_EXCEEDED>()) << st.to_string();
279+
ASSERT_EQ(query_context->resource_ctx()->memory_context()->current_memory_bytes(), 2048);
280+
ASSERT_EQ(query_context->resource_ctx()->memory_context()->reserved_consumption(), 2048);
281+
282+
_wg_manager->add_paused_query(query_context->resource_ctx(), 1024, st);
283+
ASSERT_FALSE(query_context->get_memory_sufficient_dependency()->ready());
284+
{
285+
std::unique_lock<std::mutex> lock(_wg_manager->_paused_queries_lock);
286+
ASSERT_EQ(_wg_manager->_paused_queries_list[wg].size(), 1)
287+
<< "paused queue should not be empty";
288+
}
291289

292-
{
293-
std::unique_lock<std::mutex> lock(_wg_manager->_paused_queries_lock);
294-
_wg_manager->_paused_queries_list.erase(wg);
290+
_run_checking_loop(wg, 3);
291+
292+
ASSERT_FALSE(query_context->get_memory_sufficient_dependency()->ready());
293+
ASSERT_TRUE(query_context->resource_ctx()
294+
->task_controller()
295+
->paused_reason()
296+
.is<ErrorCode::WORKLOAD_GROUP_MEMORY_EXCEEDED>());
297+
ASSERT_FALSE(query_context->is_cancelled());
298+
{
299+
std::unique_lock<std::mutex> lock(_wg_manager->_paused_queries_lock);
300+
ASSERT_EQ(_wg_manager->_paused_queries_list[wg].size(), 1)
301+
<< "paused queue should keep the query";
302+
}
295303
}
296-
query_context->set_memory_sufficient(true);
297-
thread_context.thread_mem_tracker_mgr->shrink_reserved();
298-
thread_context.detach_task();
299304
ASSERT_EQ(query_context->resource_ctx()->memory_context()->current_memory_bytes(), 0);
300305
ASSERT_EQ(query_context->resource_ctx()->memory_context()->reserved_consumption(), 0);
301306
}

0 commit comments

Comments
 (0)