Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 7 additions & 1 deletion be/src/runtime/workload_group/workload_group.h
Original file line number Diff line number Diff line change
Expand Up @@ -111,7 +111,13 @@ class WorkloadGroup : public std::enable_shared_from_this<WorkloadGroup> {
}

void check_mem_used(bool* is_low_watermark, bool* is_high_watermark) const {
auto realtime_total_mem_used = _total_mem_used + _wg_refresh_interval_memory_growth.load();
check_mem_used(0, is_low_watermark, is_high_watermark);
}

void check_mem_used(size_t reserved_size, bool* is_low_watermark,
bool* is_high_watermark) const {
auto realtime_total_mem_used = _total_mem_used + _wg_refresh_interval_memory_growth.load() +
static_cast<int64_t>(reserved_size);
*is_low_watermark = (realtime_total_mem_used >
((double)_memory_limit *
_memory_low_watermark.load(std::memory_order_relaxed) / 100));
Expand Down
15 changes: 12 additions & 3 deletions be/src/runtime/workload_group/workload_group_manager.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -763,15 +763,24 @@ bool WorkloadGroupMgr::handle_single_query_(const std::shared_ptr<ResourceContex
return true;
}
} else if (paused_reason.is<ErrorCode::WORKLOAD_GROUP_MEMORY_EXCEEDED>()) {
if (!wg->exceed_limit()) {
bool exceed_low_watermark = false;
bool exceed_high_watermark = false;
wg->check_mem_used(size_to_reserve, &exceed_low_watermark, &exceed_high_watermark);
if (!exceed_high_watermark) {
Comment on lines +766 to +769
Copy link

Copilot AI Apr 9, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

PR description currently contains duplicated template sections and placeholder references (e.g., close #xxx, Related PR: #xxx) which conflict with the earlier stated summary and make the intended linkage/closure ambiguous. Please clean up the PR description so it reflects the actual issue/PR references and the single intended problem statement.

Copilot uses AI. Check for mistakes.
LOG(INFO) << "Query: " << query_id
<< " paused caused by WORKLOAD_GROUP_MEMORY_EXCEEDED, now resume it.";
requestor->task_controller()->set_memory_sufficient(true);
return true;
} else if (memory_usage <= limit) {
LOG(INFO) << "Query: " << query_id
<< " paused caused by WORKLOAD_GROUP_MEMORY_EXCEEDED, keep it paused. "
<< "Reserve size: " << PrettyPrinter::print_bytes(size_to_reserve)
<< ", wg info: " << wg->memory_debug_string();
return false;
} else {
Status error_status = Status::MemoryLimitExceeded(
"Query {} workload group memory is exceeded"
", and there is no cache now. And could not find task to spill, "
"Query {} memory limit is exceeded while handling workload group memory "
"pressure, and there is no cache now. And could not find task to spill, "
"try to cancel query. "
"Query memory usage: {}, limit: {}, reserved "
"size: {}, try to reserve: {}, wg info: {}."
Expand Down
104 changes: 94 additions & 10 deletions be/test/runtime/workload_group/workload_group_manager_test.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -32,14 +32,17 @@

#include "common/config.h"
#include "common/status.h"
#include "exec/pipeline/dependency.h"
#include "exec/pipeline/pipeline_tracing.h"
#include "exec/spill/spill_file_manager.h"
#include "runtime/exec_env.h"
#include "runtime/query_context.h"
#include "runtime/runtime_query_statistics_mgr.h"
#include "runtime/thread_context.h"
#include "runtime/workload_group/workload_group.h"
#include "storage/olap_define.h"
#include "testutil/mock/mock_query_task_controller.h"
#include "util/defer_op.h"

namespace doris {

Expand Down Expand Up @@ -100,10 +103,11 @@ class WorkloadGroupManagerTest : public testing::Test {
}

private:
std::shared_ptr<QueryContext> _generate_on_query(std::shared_ptr<WorkloadGroup>& wg) {
std::shared_ptr<QueryContext> _generate_on_query(std::shared_ptr<WorkloadGroup>& wg,
int64_t mem_limit = 1024L * 1024 * 128) {
TQueryOptions query_options;
query_options.query_type = TQueryType::SELECT;
query_options.mem_limit = 1024L * 1024 * 128;
query_options.mem_limit = mem_limit;
query_options.query_slot_count = 1;
TNetworkAddress fe_address;
fe_address.hostname = "127.0.0.1";
Expand All @@ -128,10 +132,10 @@ class WorkloadGroupManagerTest : public testing::Test {
query_context->resource_ctx()->task_controller());
}

void _run_checking_loop(const std::shared_ptr<WorkloadGroup>& wg) {
void _run_checking_loop(const std::shared_ptr<WorkloadGroup>& wg, size_t check_times = 300) {
CountDownLatch latch(1);
size_t check_times = 300;
while (--check_times > 0) {
while (check_times > 0) {
--check_times;
_wg_manager->handle_paused_queries();
if (!_wg_manager->_paused_queries_list.contains(wg) ||
_wg_manager->_paused_queries_list[wg].empty()) {
Expand Down Expand Up @@ -208,7 +212,11 @@ TEST_F(WorkloadGroupManagerTest, wg_exceed1) {
// TWgSlotMemoryPolicy::NONE
// query_ctx->workload_group()->exceed_limit() == false
TEST_F(WorkloadGroupManagerTest, wg_exceed2) {
auto wg = _wg_manager->get_or_create_workload_group({});
WorkloadGroupInfo wg_info {.id = 1,
.memory_low_watermark = 80,
.memory_high_watermark = 95,
.slot_mem_policy = TWgSlotMemoryPolicy::NONE};
auto wg = _wg_manager->get_or_create_workload_group(wg_info);
auto query_context = _generate_on_query(wg);

query_context->query_mem_tracker()->consume(1024L * 4);
Expand All @@ -229,6 +237,74 @@ TEST_F(WorkloadGroupManagerTest, wg_exceed2) {
ASSERT_EQ(query_context->is_cancelled(), false) << "query should be canceled";
}

TEST_F(WorkloadGroupManagerTest, wg_reserve_failed_before_query_limit_and_high_watermark) {
WorkloadGroupInfo wg_info {.id = 1,
.memory_limit = 5000,
.memory_low_watermark = 50,
.memory_high_watermark = 60,
.slot_mem_policy = TWgSlotMemoryPolicy::NONE};
auto wg = _wg_manager->get_or_create_workload_group(wg_info);
auto query_context = _generate_on_query(wg, 4096);
{
ThreadContext thread_context;
thread_context.attach_task(query_context->resource_ctx());
Defer cleanup {[&]() {
{
std::unique_lock<std::mutex> lock(_wg_manager->_paused_queries_lock);
_wg_manager->_paused_queries_list.erase(wg);
}
query_context->set_memory_sufficient(true);
thread_context.thread_mem_tracker_mgr->shrink_reserved();
thread_context.detach_task();
}};

auto st = thread_context.thread_mem_tracker_mgr->try_reserve(2048);
ASSERT_TRUE(st.ok()) << st.to_string();
ASSERT_EQ(query_context->resource_ctx()->memory_context()->current_memory_bytes(), 2048);
ASSERT_EQ(query_context->resource_ctx()->memory_context()->reserved_consumption(), 2048);
ASSERT_LT(query_context->resource_ctx()->memory_context()->current_memory_bytes(),
query_context->resource_ctx()->memory_context()->mem_limit());
ASSERT_LT(query_context->resource_ctx()->memory_context()->current_memory_bytes() + 1024,
query_context->resource_ctx()->memory_context()->mem_limit());

bool exceed_low_watermark = false;
bool exceed_high_watermark = false;
wg->check_mem_used(&exceed_low_watermark, &exceed_high_watermark);
ASSERT_FALSE(exceed_low_watermark);
ASSERT_FALSE(exceed_high_watermark);

st = thread_context.thread_mem_tracker_mgr->try_reserve(1024);
ASSERT_TRUE(st.is<ErrorCode::WORKLOAD_GROUP_MEMORY_EXCEEDED>()) << st.to_string();
ASSERT_FALSE(st.is<ErrorCode::QUERY_MEMORY_EXCEEDED>()) << st.to_string();
ASSERT_EQ(query_context->resource_ctx()->memory_context()->current_memory_bytes(), 2048);
ASSERT_EQ(query_context->resource_ctx()->memory_context()->reserved_consumption(), 2048);

_wg_manager->add_paused_query(query_context->resource_ctx(), 1024, st);
ASSERT_FALSE(query_context->get_memory_sufficient_dependency()->ready());
{
std::unique_lock<std::mutex> lock(_wg_manager->_paused_queries_lock);
ASSERT_EQ(_wg_manager->_paused_queries_list[wg].size(), 1)
<< "paused queue should not be empty";
}

_run_checking_loop(wg, 3);

ASSERT_FALSE(query_context->get_memory_sufficient_dependency()->ready());
ASSERT_TRUE(query_context->resource_ctx()
->task_controller()
->paused_reason()
.is<ErrorCode::WORKLOAD_GROUP_MEMORY_EXCEEDED>());
ASSERT_FALSE(query_context->is_cancelled());
{
std::unique_lock<std::mutex> lock(_wg_manager->_paused_queries_lock);
ASSERT_EQ(_wg_manager->_paused_queries_list[wg].size(), 1)
<< "paused queue should keep the query";
}
}
ASSERT_EQ(query_context->resource_ctx()->memory_context()->current_memory_bytes(), 0);
ASSERT_EQ(query_context->resource_ctx()->memory_context()->reserved_consumption(), 0);
}

// TWgSlotMemoryPolicy::NONE
// query_ctx->workload_group()->exceed_limit() == true
// query limit > workload group limit
Expand Down Expand Up @@ -352,7 +428,10 @@ TEST_F(WorkloadGroupManagerTest, wg_exceed5) {
}

TEST_F(WorkloadGroupManagerTest, overcommit) {
WorkloadGroupInfo wg_info {.id = 1};
WorkloadGroupInfo wg_info {.id = 1,
.memory_low_watermark = 80,
.memory_high_watermark = 95,
.slot_mem_policy = TWgSlotMemoryPolicy::NONE};
auto wg = _wg_manager->get_or_create_workload_group(wg_info);
EXPECT_EQ(wg->id(), wg_info.id);

Expand All @@ -376,7 +455,10 @@ TEST_F(WorkloadGroupManagerTest, overcommit) {
}

TEST_F(WorkloadGroupManagerTest, slot_memory_policy_disabled) {
WorkloadGroupInfo wg_info {.id = 1, .slot_mem_policy = TWgSlotMemoryPolicy::NONE};
WorkloadGroupInfo wg_info {.id = 1,
.memory_low_watermark = 80,
.memory_high_watermark = 95,
.slot_mem_policy = TWgSlotMemoryPolicy::NONE};
auto wg = _wg_manager->get_or_create_workload_group(wg_info);
EXPECT_EQ(wg->id(), wg_info.id);
EXPECT_EQ(wg->slot_memory_policy(), TWgSlotMemoryPolicy::NONE);
Expand Down Expand Up @@ -969,8 +1051,10 @@ TEST_F(WorkloadGroupManagerTest, phase3_skips_cancelled_queries_on_resume) {
// queries in WG2, because WG memory pools are independent. Only same-WG queries
// should be delayed. PROCESS_MEMORY_EXCEEDED should still be delayed globally.
TEST_F(WorkloadGroupManagerTest, cancelled_query_does_not_block_cross_wg_mem_exceeded) {
WorkloadGroupInfo wg1_info {.id = 1, .memory_limit = 1024L * 1024 * 1000};
WorkloadGroupInfo wg2_info {.id = 2, .memory_limit = 1024L * 1024 * 1000};
WorkloadGroupInfo wg1_info {
.id = 1, .memory_limit = 1024L * 1024 * 1000, .memory_high_watermark = 80};
WorkloadGroupInfo wg2_info {
.id = 2, .memory_limit = 1024L * 1024 * 1000, .memory_high_watermark = 80};
auto wg1 = _wg_manager->get_or_create_workload_group(wg1_info);
auto wg2 = _wg_manager->get_or_create_workload_group(wg2_info);

Expand Down
Loading