From 55e82f86321dd408bc514509e531865f334e7d1b Mon Sep 17 00:00:00 2001 From: Hu Shenggang Date: Thu, 9 Apr 2026 16:18:52 +0800 Subject: [PATCH 1/2] [fix](be) Keep workload group paused queries blocked when reserve still fails Issue Number: None Related PR: None Problem Summary: Keep queries paused after WORKLOAD_GROUP_MEMORY_EXCEEDED when retrying the same reserve would still exceed the workload group high watermark, and add a manager unit test that exercises a real reserve failure path. None - Test: Unit Test - Incrementally compiled , , and - Full was not completed because linking triggers a large incremental rebuild in this worktree - Behavior changed: Yes (queries that still cannot satisfy the same workload group reserve stay paused instead of being resumed immediately) - Does this need documentation: No --- .../runtime/workload_group/workload_group.h | 8 +- .../workload_group/workload_group_manager.cpp | 11 ++- .../workload_group_manager_test.cpp | 99 +++++++++++++++++-- 3 files changed, 106 insertions(+), 12 deletions(-) diff --git a/be/src/runtime/workload_group/workload_group.h b/be/src/runtime/workload_group/workload_group.h index e18b1be00981d5..9b764664adb255 100644 --- a/be/src/runtime/workload_group/workload_group.h +++ b/be/src/runtime/workload_group/workload_group.h @@ -111,7 +111,13 @@ class WorkloadGroup : public std::enable_shared_from_this { } void check_mem_used(bool* is_low_watermark, bool* is_high_watermark) const { - auto realtime_total_mem_used = _total_mem_used + _wg_refresh_interval_memory_growth.load(); + check_mem_used(0, is_low_watermark, is_high_watermark); + } + + void check_mem_used(size_t reserved_size, bool* is_low_watermark, + bool* is_high_watermark) const { + auto realtime_total_mem_used = _total_mem_used + _wg_refresh_interval_memory_growth.load() + + static_cast(reserved_size); *is_low_watermark = (realtime_total_mem_used > ((double)_memory_limit * _memory_low_watermark.load(std::memory_order_relaxed) / 100)); diff --git a/be/src/runtime/workload_group/workload_group_manager.cpp b/be/src/runtime/workload_group/workload_group_manager.cpp index 3d5893b9a062d6..ed9e30df26ad17 100644 --- a/be/src/runtime/workload_group/workload_group_manager.cpp +++ b/be/src/runtime/workload_group/workload_group_manager.cpp @@ -763,11 +763,20 @@ bool WorkloadGroupMgr::handle_single_query_(const std::shared_ptr()) { - if (!wg->exceed_limit()) { + bool exceed_low_watermark = false; + bool exceed_high_watermark = false; + wg->check_mem_used(size_to_reserve, &exceed_low_watermark, &exceed_high_watermark); + if (!exceed_high_watermark) { LOG(INFO) << "Query: " << query_id << " paused caused by WORKLOAD_GROUP_MEMORY_EXCEEDED, now resume it."; requestor->task_controller()->set_memory_sufficient(true); return true; + } else if (memory_usage <= limit) { + LOG(INFO) << "Query: " << query_id + << " paused caused by WORKLOAD_GROUP_MEMORY_EXCEEDED, keep it paused. " + << "Reserve size: " << PrettyPrinter::print_bytes(size_to_reserve) + << ", wg info: " << wg->memory_debug_string(); + return false; } else { Status error_status = Status::MemoryLimitExceeded( "Query {} workload group memory is exceeded" diff --git a/be/test/runtime/workload_group/workload_group_manager_test.cpp b/be/test/runtime/workload_group/workload_group_manager_test.cpp index a061ab719f0c8b..219a80af9e0a79 100644 --- a/be/test/runtime/workload_group/workload_group_manager_test.cpp +++ b/be/test/runtime/workload_group/workload_group_manager_test.cpp @@ -32,11 +32,13 @@ #include "common/config.h" #include "common/status.h" +#include "exec/pipeline/dependency.h" #include "exec/pipeline/pipeline_tracing.h" #include "exec/spill/spill_file_manager.h" #include "runtime/exec_env.h" #include "runtime/query_context.h" #include "runtime/runtime_query_statistics_mgr.h" +#include "runtime/thread_context.h" #include "runtime/workload_group/workload_group.h" #include "storage/olap_define.h" #include "testutil/mock/mock_query_task_controller.h" @@ -100,10 +102,11 @@ class WorkloadGroupManagerTest : public testing::Test { } private: - std::shared_ptr _generate_on_query(std::shared_ptr& wg) { + std::shared_ptr _generate_on_query(std::shared_ptr& wg, + int64_t mem_limit = 1024L * 1024 * 128) { TQueryOptions query_options; query_options.query_type = TQueryType::SELECT; - query_options.mem_limit = 1024L * 1024 * 128; + query_options.mem_limit = mem_limit; query_options.query_slot_count = 1; TNetworkAddress fe_address; fe_address.hostname = "127.0.0.1"; @@ -128,10 +131,9 @@ class WorkloadGroupManagerTest : public testing::Test { query_context->resource_ctx()->task_controller()); } - void _run_checking_loop(const std::shared_ptr& wg) { + void _run_checking_loop(const std::shared_ptr& wg, size_t check_times = 300) { CountDownLatch latch(1); - size_t check_times = 300; - while (--check_times > 0) { + while (check_times-- > 0) { _wg_manager->handle_paused_queries(); if (!_wg_manager->_paused_queries_list.contains(wg) || _wg_manager->_paused_queries_list[wg].empty()) { @@ -208,7 +210,11 @@ TEST_F(WorkloadGroupManagerTest, wg_exceed1) { // TWgSlotMemoryPolicy::NONE // query_ctx->workload_group()->exceed_limit() == false TEST_F(WorkloadGroupManagerTest, wg_exceed2) { - auto wg = _wg_manager->get_or_create_workload_group({}); + WorkloadGroupInfo wg_info {.id = 1, + .memory_low_watermark = 80, + .memory_high_watermark = 95, + .slot_mem_policy = TWgSlotMemoryPolicy::NONE}; + auto wg = _wg_manager->get_or_create_workload_group(wg_info); auto query_context = _generate_on_query(wg); query_context->query_mem_tracker()->consume(1024L * 4); @@ -229,6 +235,71 @@ TEST_F(WorkloadGroupManagerTest, wg_exceed2) { ASSERT_EQ(query_context->is_cancelled(), false) << "query should be canceled"; } +TEST_F(WorkloadGroupManagerTest, wg_reserve_failed_before_query_limit_and_high_watermark) { + WorkloadGroupInfo wg_info {.id = 1, + .memory_limit = 5000, + .memory_low_watermark = 50, + .memory_high_watermark = 60, + .slot_mem_policy = TWgSlotMemoryPolicy::NONE}; + auto wg = _wg_manager->get_or_create_workload_group(wg_info); + auto query_context = _generate_on_query(wg, 4096); + ThreadContext thread_context; + thread_context.attach_task(query_context->resource_ctx()); + + auto st = thread_context.thread_mem_tracker_mgr->try_reserve(2048); + ASSERT_TRUE(st.ok()) << st.to_string(); + ASSERT_EQ(query_context->resource_ctx()->memory_context()->current_memory_bytes(), 2048); + ASSERT_EQ(query_context->resource_ctx()->memory_context()->reserved_consumption(), 2048); + ASSERT_LT(query_context->resource_ctx()->memory_context()->current_memory_bytes(), + query_context->resource_ctx()->memory_context()->mem_limit()); + ASSERT_LT(query_context->resource_ctx()->memory_context()->current_memory_bytes() + 1024, + query_context->resource_ctx()->memory_context()->mem_limit()); + + bool exceed_low_watermark = false; + bool exceed_high_watermark = false; + wg->check_mem_used(&exceed_low_watermark, &exceed_high_watermark); + ASSERT_FALSE(exceed_low_watermark); + ASSERT_FALSE(exceed_high_watermark); + + st = thread_context.thread_mem_tracker_mgr->try_reserve(1024); + ASSERT_TRUE(st.is()) << st.to_string(); + ASSERT_FALSE(st.is()) << st.to_string(); + ASSERT_EQ(query_context->resource_ctx()->memory_context()->current_memory_bytes(), 2048); + ASSERT_EQ(query_context->resource_ctx()->memory_context()->reserved_consumption(), 2048); + + _wg_manager->add_paused_query(query_context->resource_ctx(), 1024, st); + ASSERT_FALSE(query_context->get_memory_sufficient_dependency()->ready()); + { + std::unique_lock lock(_wg_manager->_paused_queries_lock); + ASSERT_EQ(_wg_manager->_paused_queries_list[wg].size(), 1) + << "paused queue should not be empty"; + } + + _run_checking_loop(wg, 3); + + ASSERT_FALSE(query_context->get_memory_sufficient_dependency()->ready()); + ASSERT_TRUE(query_context->resource_ctx() + ->task_controller() + ->paused_reason() + .is()); + ASSERT_FALSE(query_context->is_cancelled()); + { + std::unique_lock lock(_wg_manager->_paused_queries_lock); + ASSERT_EQ(_wg_manager->_paused_queries_list[wg].size(), 1) + << "paused queue should keep the query"; + } + + { + std::unique_lock lock(_wg_manager->_paused_queries_lock); + _wg_manager->_paused_queries_list.erase(wg); + } + query_context->set_memory_sufficient(true); + thread_context.thread_mem_tracker_mgr->shrink_reserved(); + thread_context.detach_task(); + ASSERT_EQ(query_context->resource_ctx()->memory_context()->current_memory_bytes(), 0); + ASSERT_EQ(query_context->resource_ctx()->memory_context()->reserved_consumption(), 0); +} + // TWgSlotMemoryPolicy::NONE // query_ctx->workload_group()->exceed_limit() == true // query limit > workload group limit @@ -352,7 +423,10 @@ TEST_F(WorkloadGroupManagerTest, wg_exceed5) { } TEST_F(WorkloadGroupManagerTest, overcommit) { - WorkloadGroupInfo wg_info {.id = 1}; + WorkloadGroupInfo wg_info {.id = 1, + .memory_low_watermark = 80, + .memory_high_watermark = 95, + .slot_mem_policy = TWgSlotMemoryPolicy::NONE}; auto wg = _wg_manager->get_or_create_workload_group(wg_info); EXPECT_EQ(wg->id(), wg_info.id); @@ -376,7 +450,10 @@ TEST_F(WorkloadGroupManagerTest, overcommit) { } TEST_F(WorkloadGroupManagerTest, slot_memory_policy_disabled) { - WorkloadGroupInfo wg_info {.id = 1, .slot_mem_policy = TWgSlotMemoryPolicy::NONE}; + WorkloadGroupInfo wg_info {.id = 1, + .memory_low_watermark = 80, + .memory_high_watermark = 95, + .slot_mem_policy = TWgSlotMemoryPolicy::NONE}; auto wg = _wg_manager->get_or_create_workload_group(wg_info); EXPECT_EQ(wg->id(), wg_info.id); EXPECT_EQ(wg->slot_memory_policy(), TWgSlotMemoryPolicy::NONE); @@ -969,8 +1046,10 @@ TEST_F(WorkloadGroupManagerTest, phase3_skips_cancelled_queries_on_resume) { // queries in WG2, because WG memory pools are independent. Only same-WG queries // should be delayed. PROCESS_MEMORY_EXCEEDED should still be delayed globally. TEST_F(WorkloadGroupManagerTest, cancelled_query_does_not_block_cross_wg_mem_exceeded) { - WorkloadGroupInfo wg1_info {.id = 1, .memory_limit = 1024L * 1024 * 1000}; - WorkloadGroupInfo wg2_info {.id = 2, .memory_limit = 1024L * 1024 * 1000}; + WorkloadGroupInfo wg1_info { + .id = 1, .memory_limit = 1024L * 1024 * 1000, .memory_high_watermark = 80}; + WorkloadGroupInfo wg2_info { + .id = 2, .memory_limit = 1024L * 1024 * 1000, .memory_high_watermark = 80}; auto wg1 = _wg_manager->get_or_create_workload_group(wg1_info); auto wg2 = _wg_manager->get_or_create_workload_group(wg2_info); From 52975b52e3351521bd6ecba458b00d9a3e8a60d6 Mon Sep 17 00:00:00 2001 From: Hu Shenggang Date: Thu, 9 Apr 2026 18:25:03 +0800 Subject: [PATCH 2/2] [fix](be) Address review feedback on workload group pause handling Issue Number: None Related PR: #62285 Problem Summary: Address review feedback for the workload group paused-query change by avoiding unsigned wraparound in the test helper loop, making the reserve-failure test cleanup exception-safe, and clarifying the cancel message when the query itself exceeds its memory limit during workload group pressure handling. None - Test: No need to test (small follow-up on test cleanup and cancellation message; attempted incremental build but the sandbox hit a ccache temporary-directory permission issue) - Behavior changed: Yes (the cancellation message now reports query memory limit exceeded instead of workload group memory exceeded in that branch) - Does this need documentation: No --- .../workload_group/workload_group_manager.cpp | 4 +- .../workload_group_manager_test.cpp | 107 +++++++++--------- 2 files changed, 58 insertions(+), 53 deletions(-) diff --git a/be/src/runtime/workload_group/workload_group_manager.cpp b/be/src/runtime/workload_group/workload_group_manager.cpp index ed9e30df26ad17..6ea679eb625dc1 100644 --- a/be/src/runtime/workload_group/workload_group_manager.cpp +++ b/be/src/runtime/workload_group/workload_group_manager.cpp @@ -779,8 +779,8 @@ bool WorkloadGroupMgr::handle_single_query_(const std::shared_ptr& wg, size_t check_times = 300) { CountDownLatch latch(1); - while (check_times-- > 0) { + while (check_times > 0) { + --check_times; _wg_manager->handle_paused_queries(); if (!_wg_manager->_paused_queries_list.contains(wg) || _wg_manager->_paused_queries_list[wg].empty()) { @@ -243,59 +245,62 @@ TEST_F(WorkloadGroupManagerTest, wg_reserve_failed_before_query_limit_and_high_w .slot_mem_policy = TWgSlotMemoryPolicy::NONE}; auto wg = _wg_manager->get_or_create_workload_group(wg_info); auto query_context = _generate_on_query(wg, 4096); - ThreadContext thread_context; - thread_context.attach_task(query_context->resource_ctx()); - - auto st = thread_context.thread_mem_tracker_mgr->try_reserve(2048); - ASSERT_TRUE(st.ok()) << st.to_string(); - ASSERT_EQ(query_context->resource_ctx()->memory_context()->current_memory_bytes(), 2048); - ASSERT_EQ(query_context->resource_ctx()->memory_context()->reserved_consumption(), 2048); - ASSERT_LT(query_context->resource_ctx()->memory_context()->current_memory_bytes(), - query_context->resource_ctx()->memory_context()->mem_limit()); - ASSERT_LT(query_context->resource_ctx()->memory_context()->current_memory_bytes() + 1024, - query_context->resource_ctx()->memory_context()->mem_limit()); - - bool exceed_low_watermark = false; - bool exceed_high_watermark = false; - wg->check_mem_used(&exceed_low_watermark, &exceed_high_watermark); - ASSERT_FALSE(exceed_low_watermark); - ASSERT_FALSE(exceed_high_watermark); - - st = thread_context.thread_mem_tracker_mgr->try_reserve(1024); - ASSERT_TRUE(st.is()) << st.to_string(); - ASSERT_FALSE(st.is()) << st.to_string(); - ASSERT_EQ(query_context->resource_ctx()->memory_context()->current_memory_bytes(), 2048); - ASSERT_EQ(query_context->resource_ctx()->memory_context()->reserved_consumption(), 2048); - - _wg_manager->add_paused_query(query_context->resource_ctx(), 1024, st); - ASSERT_FALSE(query_context->get_memory_sufficient_dependency()->ready()); { - std::unique_lock lock(_wg_manager->_paused_queries_lock); - ASSERT_EQ(_wg_manager->_paused_queries_list[wg].size(), 1) - << "paused queue should not be empty"; - } - - _run_checking_loop(wg, 3); - - ASSERT_FALSE(query_context->get_memory_sufficient_dependency()->ready()); - ASSERT_TRUE(query_context->resource_ctx() - ->task_controller() - ->paused_reason() - .is()); - ASSERT_FALSE(query_context->is_cancelled()); - { - std::unique_lock lock(_wg_manager->_paused_queries_lock); - ASSERT_EQ(_wg_manager->_paused_queries_list[wg].size(), 1) - << "paused queue should keep the query"; - } + ThreadContext thread_context; + thread_context.attach_task(query_context->resource_ctx()); + Defer cleanup {[&]() { + { + std::unique_lock lock(_wg_manager->_paused_queries_lock); + _wg_manager->_paused_queries_list.erase(wg); + } + query_context->set_memory_sufficient(true); + thread_context.thread_mem_tracker_mgr->shrink_reserved(); + thread_context.detach_task(); + }}; + + auto st = thread_context.thread_mem_tracker_mgr->try_reserve(2048); + ASSERT_TRUE(st.ok()) << st.to_string(); + ASSERT_EQ(query_context->resource_ctx()->memory_context()->current_memory_bytes(), 2048); + ASSERT_EQ(query_context->resource_ctx()->memory_context()->reserved_consumption(), 2048); + ASSERT_LT(query_context->resource_ctx()->memory_context()->current_memory_bytes(), + query_context->resource_ctx()->memory_context()->mem_limit()); + ASSERT_LT(query_context->resource_ctx()->memory_context()->current_memory_bytes() + 1024, + query_context->resource_ctx()->memory_context()->mem_limit()); + + bool exceed_low_watermark = false; + bool exceed_high_watermark = false; + wg->check_mem_used(&exceed_low_watermark, &exceed_high_watermark); + ASSERT_FALSE(exceed_low_watermark); + ASSERT_FALSE(exceed_high_watermark); + + st = thread_context.thread_mem_tracker_mgr->try_reserve(1024); + ASSERT_TRUE(st.is()) << st.to_string(); + ASSERT_FALSE(st.is()) << st.to_string(); + ASSERT_EQ(query_context->resource_ctx()->memory_context()->current_memory_bytes(), 2048); + ASSERT_EQ(query_context->resource_ctx()->memory_context()->reserved_consumption(), 2048); + + _wg_manager->add_paused_query(query_context->resource_ctx(), 1024, st); + ASSERT_FALSE(query_context->get_memory_sufficient_dependency()->ready()); + { + std::unique_lock lock(_wg_manager->_paused_queries_lock); + ASSERT_EQ(_wg_manager->_paused_queries_list[wg].size(), 1) + << "paused queue should not be empty"; + } - { - std::unique_lock lock(_wg_manager->_paused_queries_lock); - _wg_manager->_paused_queries_list.erase(wg); + _run_checking_loop(wg, 3); + + ASSERT_FALSE(query_context->get_memory_sufficient_dependency()->ready()); + ASSERT_TRUE(query_context->resource_ctx() + ->task_controller() + ->paused_reason() + .is()); + ASSERT_FALSE(query_context->is_cancelled()); + { + std::unique_lock lock(_wg_manager->_paused_queries_lock); + ASSERT_EQ(_wg_manager->_paused_queries_list[wg].size(), 1) + << "paused queue should keep the query"; + } } - query_context->set_memory_sufficient(true); - thread_context.thread_mem_tracker_mgr->shrink_reserved(); - thread_context.detach_task(); ASSERT_EQ(query_context->resource_ctx()->memory_context()->current_memory_bytes(), 0); ASSERT_EQ(query_context->resource_ctx()->memory_context()->reserved_consumption(), 0); }