diff --git a/be/src/runtime/workload_group/workload_group.h b/be/src/runtime/workload_group/workload_group.h index e18b1be00981d5..9b764664adb255 100644 --- a/be/src/runtime/workload_group/workload_group.h +++ b/be/src/runtime/workload_group/workload_group.h @@ -111,7 +111,13 @@ class WorkloadGroup : public std::enable_shared_from_this { } void check_mem_used(bool* is_low_watermark, bool* is_high_watermark) const { - auto realtime_total_mem_used = _total_mem_used + _wg_refresh_interval_memory_growth.load(); + check_mem_used(0, is_low_watermark, is_high_watermark); + } + + void check_mem_used(size_t reserved_size, bool* is_low_watermark, + bool* is_high_watermark) const { + auto realtime_total_mem_used = _total_mem_used + _wg_refresh_interval_memory_growth.load() + + static_cast(reserved_size); *is_low_watermark = (realtime_total_mem_used > ((double)_memory_limit * _memory_low_watermark.load(std::memory_order_relaxed) / 100)); diff --git a/be/src/runtime/workload_group/workload_group_manager.cpp b/be/src/runtime/workload_group/workload_group_manager.cpp index 3d5893b9a062d6..6ea679eb625dc1 100644 --- a/be/src/runtime/workload_group/workload_group_manager.cpp +++ b/be/src/runtime/workload_group/workload_group_manager.cpp @@ -763,15 +763,24 @@ bool WorkloadGroupMgr::handle_single_query_(const std::shared_ptr()) { - if (!wg->exceed_limit()) { + bool exceed_low_watermark = false; + bool exceed_high_watermark = false; + wg->check_mem_used(size_to_reserve, &exceed_low_watermark, &exceed_high_watermark); + if (!exceed_high_watermark) { LOG(INFO) << "Query: " << query_id << " paused caused by WORKLOAD_GROUP_MEMORY_EXCEEDED, now resume it."; requestor->task_controller()->set_memory_sufficient(true); return true; + } else if (memory_usage <= limit) { + LOG(INFO) << "Query: " << query_id + << " paused caused by WORKLOAD_GROUP_MEMORY_EXCEEDED, keep it paused. " + << "Reserve size: " << PrettyPrinter::print_bytes(size_to_reserve) + << ", wg info: " << wg->memory_debug_string(); + return false; } else { Status error_status = Status::MemoryLimitExceeded( - "Query {} workload group memory is exceeded" - ", and there is no cache now. And could not find task to spill, " + "Query {} memory limit is exceeded while handling workload group memory " + "pressure, and there is no cache now. And could not find task to spill, " "try to cancel query. " "Query memory usage: {}, limit: {}, reserved " "size: {}, try to reserve: {}, wg info: {}." diff --git a/be/test/runtime/workload_group/workload_group_manager_test.cpp b/be/test/runtime/workload_group/workload_group_manager_test.cpp index a061ab719f0c8b..45afdd95e6604f 100644 --- a/be/test/runtime/workload_group/workload_group_manager_test.cpp +++ b/be/test/runtime/workload_group/workload_group_manager_test.cpp @@ -32,14 +32,17 @@ #include "common/config.h" #include "common/status.h" +#include "exec/pipeline/dependency.h" #include "exec/pipeline/pipeline_tracing.h" #include "exec/spill/spill_file_manager.h" #include "runtime/exec_env.h" #include "runtime/query_context.h" #include "runtime/runtime_query_statistics_mgr.h" +#include "runtime/thread_context.h" #include "runtime/workload_group/workload_group.h" #include "storage/olap_define.h" #include "testutil/mock/mock_query_task_controller.h" +#include "util/defer_op.h" namespace doris { @@ -100,10 +103,11 @@ class WorkloadGroupManagerTest : public testing::Test { } private: - std::shared_ptr _generate_on_query(std::shared_ptr& wg) { + std::shared_ptr _generate_on_query(std::shared_ptr& wg, + int64_t mem_limit = 1024L * 1024 * 128) { TQueryOptions query_options; query_options.query_type = TQueryType::SELECT; - query_options.mem_limit = 1024L * 1024 * 128; + query_options.mem_limit = mem_limit; query_options.query_slot_count = 1; TNetworkAddress fe_address; fe_address.hostname = "127.0.0.1"; @@ -128,10 +132,10 @@ class WorkloadGroupManagerTest : public testing::Test { query_context->resource_ctx()->task_controller()); } - void _run_checking_loop(const std::shared_ptr& wg) { + void _run_checking_loop(const std::shared_ptr& wg, size_t check_times = 300) { CountDownLatch latch(1); - size_t check_times = 300; - while (--check_times > 0) { + while (check_times > 0) { + --check_times; _wg_manager->handle_paused_queries(); if (!_wg_manager->_paused_queries_list.contains(wg) || _wg_manager->_paused_queries_list[wg].empty()) { @@ -208,7 +212,11 @@ TEST_F(WorkloadGroupManagerTest, wg_exceed1) { // TWgSlotMemoryPolicy::NONE // query_ctx->workload_group()->exceed_limit() == false TEST_F(WorkloadGroupManagerTest, wg_exceed2) { - auto wg = _wg_manager->get_or_create_workload_group({}); + WorkloadGroupInfo wg_info {.id = 1, + .memory_low_watermark = 80, + .memory_high_watermark = 95, + .slot_mem_policy = TWgSlotMemoryPolicy::NONE}; + auto wg = _wg_manager->get_or_create_workload_group(wg_info); auto query_context = _generate_on_query(wg); query_context->query_mem_tracker()->consume(1024L * 4); @@ -229,6 +237,74 @@ TEST_F(WorkloadGroupManagerTest, wg_exceed2) { ASSERT_EQ(query_context->is_cancelled(), false) << "query should be canceled"; } +TEST_F(WorkloadGroupManagerTest, wg_reserve_failed_before_query_limit_and_high_watermark) { + WorkloadGroupInfo wg_info {.id = 1, + .memory_limit = 5000, + .memory_low_watermark = 50, + .memory_high_watermark = 60, + .slot_mem_policy = TWgSlotMemoryPolicy::NONE}; + auto wg = _wg_manager->get_or_create_workload_group(wg_info); + auto query_context = _generate_on_query(wg, 4096); + { + ThreadContext thread_context; + thread_context.attach_task(query_context->resource_ctx()); + Defer cleanup {[&]() { + { + std::unique_lock lock(_wg_manager->_paused_queries_lock); + _wg_manager->_paused_queries_list.erase(wg); + } + query_context->set_memory_sufficient(true); + thread_context.thread_mem_tracker_mgr->shrink_reserved(); + thread_context.detach_task(); + }}; + + auto st = thread_context.thread_mem_tracker_mgr->try_reserve(2048); + ASSERT_TRUE(st.ok()) << st.to_string(); + ASSERT_EQ(query_context->resource_ctx()->memory_context()->current_memory_bytes(), 2048); + ASSERT_EQ(query_context->resource_ctx()->memory_context()->reserved_consumption(), 2048); + ASSERT_LT(query_context->resource_ctx()->memory_context()->current_memory_bytes(), + query_context->resource_ctx()->memory_context()->mem_limit()); + ASSERT_LT(query_context->resource_ctx()->memory_context()->current_memory_bytes() + 1024, + query_context->resource_ctx()->memory_context()->mem_limit()); + + bool exceed_low_watermark = false; + bool exceed_high_watermark = false; + wg->check_mem_used(&exceed_low_watermark, &exceed_high_watermark); + ASSERT_FALSE(exceed_low_watermark); + ASSERT_FALSE(exceed_high_watermark); + + st = thread_context.thread_mem_tracker_mgr->try_reserve(1024); + ASSERT_TRUE(st.is()) << st.to_string(); + ASSERT_FALSE(st.is()) << st.to_string(); + ASSERT_EQ(query_context->resource_ctx()->memory_context()->current_memory_bytes(), 2048); + ASSERT_EQ(query_context->resource_ctx()->memory_context()->reserved_consumption(), 2048); + + _wg_manager->add_paused_query(query_context->resource_ctx(), 1024, st); + ASSERT_FALSE(query_context->get_memory_sufficient_dependency()->ready()); + { + std::unique_lock lock(_wg_manager->_paused_queries_lock); + ASSERT_EQ(_wg_manager->_paused_queries_list[wg].size(), 1) + << "paused queue should not be empty"; + } + + _run_checking_loop(wg, 3); + + ASSERT_FALSE(query_context->get_memory_sufficient_dependency()->ready()); + ASSERT_TRUE(query_context->resource_ctx() + ->task_controller() + ->paused_reason() + .is()); + ASSERT_FALSE(query_context->is_cancelled()); + { + std::unique_lock lock(_wg_manager->_paused_queries_lock); + ASSERT_EQ(_wg_manager->_paused_queries_list[wg].size(), 1) + << "paused queue should keep the query"; + } + } + ASSERT_EQ(query_context->resource_ctx()->memory_context()->current_memory_bytes(), 0); + ASSERT_EQ(query_context->resource_ctx()->memory_context()->reserved_consumption(), 0); +} + // TWgSlotMemoryPolicy::NONE // query_ctx->workload_group()->exceed_limit() == true // query limit > workload group limit @@ -352,7 +428,10 @@ TEST_F(WorkloadGroupManagerTest, wg_exceed5) { } TEST_F(WorkloadGroupManagerTest, overcommit) { - WorkloadGroupInfo wg_info {.id = 1}; + WorkloadGroupInfo wg_info {.id = 1, + .memory_low_watermark = 80, + .memory_high_watermark = 95, + .slot_mem_policy = TWgSlotMemoryPolicy::NONE}; auto wg = _wg_manager->get_or_create_workload_group(wg_info); EXPECT_EQ(wg->id(), wg_info.id); @@ -376,7 +455,10 @@ TEST_F(WorkloadGroupManagerTest, overcommit) { } TEST_F(WorkloadGroupManagerTest, slot_memory_policy_disabled) { - WorkloadGroupInfo wg_info {.id = 1, .slot_mem_policy = TWgSlotMemoryPolicy::NONE}; + WorkloadGroupInfo wg_info {.id = 1, + .memory_low_watermark = 80, + .memory_high_watermark = 95, + .slot_mem_policy = TWgSlotMemoryPolicy::NONE}; auto wg = _wg_manager->get_or_create_workload_group(wg_info); EXPECT_EQ(wg->id(), wg_info.id); EXPECT_EQ(wg->slot_memory_policy(), TWgSlotMemoryPolicy::NONE); @@ -969,8 +1051,10 @@ TEST_F(WorkloadGroupManagerTest, phase3_skips_cancelled_queries_on_resume) { // queries in WG2, because WG memory pools are independent. Only same-WG queries // should be delayed. PROCESS_MEMORY_EXCEEDED should still be delayed globally. TEST_F(WorkloadGroupManagerTest, cancelled_query_does_not_block_cross_wg_mem_exceeded) { - WorkloadGroupInfo wg1_info {.id = 1, .memory_limit = 1024L * 1024 * 1000}; - WorkloadGroupInfo wg2_info {.id = 2, .memory_limit = 1024L * 1024 * 1000}; + WorkloadGroupInfo wg1_info { + .id = 1, .memory_limit = 1024L * 1024 * 1000, .memory_high_watermark = 80}; + WorkloadGroupInfo wg2_info { + .id = 2, .memory_limit = 1024L * 1024 * 1000, .memory_high_watermark = 80}; auto wg1 = _wg_manager->get_or_create_workload_group(wg1_info); auto wg2 = _wg_manager->get_or_create_workload_group(wg2_info);