[fix](be) Keep workload group paused queries blocked when reserve still fails#62285
[fix](be) Keep workload group paused queries blocked when reserve still fails#62285yiguolei merged 2 commits intoapache:masterfrom
Conversation
|
Thank you for your contribution to Apache Doris. Please clearly describe your PR:
|
There was a problem hiding this comment.
Pull request overview
Note
Copilot was unable to run its full agentic suite in this review.
This PR updates workload-group paused query handling so that queries paused due to WORKLOAD_GROUP_MEMORY_EXCEEDED remain paused when retrying the same reserve would still exceed the workload group high watermark, and adds a unit test that exercises a real reserve failure path.
Changes:
- Update
WorkloadGroupMgr::handle_single_query_to re-check high watermark including the requested reserve size before resuming a paused query. - Extend
WorkloadGroup::check_mem_usedto optionally account for a prospective reserve size. - Add a new unit test that triggers a real
try_reservefailure and verifies the query remains paused.
Reviewed changes
Copilot reviewed 3 out of 3 changed files in this pull request and generated 4 comments.
| File | Description |
|---|---|
| be/test/runtime/workload_group/workload_group_manager_test.cpp | Adds a reserve-failure test and parameterizes helpers to cover the “keep paused” behavior. |
| be/src/runtime/workload_group/workload_group_manager.cpp | Changes resume/keep-paused decision logic for WORKLOAD_GROUP_MEMORY_EXCEEDED using high watermark checks. |
| be/src/runtime/workload_group/workload_group.h | Adds an overload to include reserved bytes in watermark checks. |
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
| CountDownLatch latch(1); | ||
| size_t check_times = 300; | ||
| while (--check_times > 0) { | ||
| while (check_times-- > 0) { |
There was a problem hiding this comment.
while (check_times-- > 0) causes an unsigned underflow when check_times is 0 (the value wraps even though the loop body won’t run). Consider rewriting to avoid wraparound (e.g., while (check_times > 0) { --check_times; ... } or a for loop) to keep the intent clearer and avoid surprising state changes.
| while (check_times-- > 0) { | |
| while (check_times > 0) { | |
| --check_times; |
| auto st = thread_context.thread_mem_tracker_mgr->try_reserve(2048); | ||
| ASSERT_TRUE(st.ok()) << st.to_string(); | ||
| ASSERT_EQ(query_context->resource_ctx()->memory_context()->current_memory_bytes(), 2048); | ||
| ASSERT_EQ(query_context->resource_ctx()->memory_context()->reserved_consumption(), 2048); | ||
| ASSERT_LT(query_context->resource_ctx()->memory_context()->current_memory_bytes(), | ||
| query_context->resource_ctx()->memory_context()->mem_limit()); | ||
| ASSERT_LT(query_context->resource_ctx()->memory_context()->current_memory_bytes() + 1024, | ||
| query_context->resource_ctx()->memory_context()->mem_limit()); | ||
|
|
||
| bool exceed_low_watermark = false; | ||
| bool exceed_high_watermark = false; | ||
| wg->check_mem_used(&exceed_low_watermark, &exceed_high_watermark); | ||
| ASSERT_FALSE(exceed_low_watermark); | ||
| ASSERT_FALSE(exceed_high_watermark); | ||
|
|
||
| st = thread_context.thread_mem_tracker_mgr->try_reserve(1024); | ||
| ASSERT_TRUE(st.is<ErrorCode::WORKLOAD_GROUP_MEMORY_EXCEEDED>()) << st.to_string(); | ||
| ASSERT_FALSE(st.is<ErrorCode::QUERY_MEMORY_EXCEEDED>()) << st.to_string(); | ||
| ASSERT_EQ(query_context->resource_ctx()->memory_context()->current_memory_bytes(), 2048); | ||
| ASSERT_EQ(query_context->resource_ctx()->memory_context()->reserved_consumption(), 2048); | ||
|
|
||
| _wg_manager->add_paused_query(query_context->resource_ctx(), 1024, st); | ||
| ASSERT_FALSE(query_context->get_memory_sufficient_dependency()->ready()); | ||
| { | ||
| std::unique_lock<std::mutex> lock(_wg_manager->_paused_queries_lock); | ||
| ASSERT_EQ(_wg_manager->_paused_queries_list[wg].size(), 1) | ||
| << "paused queue should not be empty"; | ||
| } | ||
|
|
||
| _run_checking_loop(wg, 3); | ||
|
|
||
| ASSERT_FALSE(query_context->get_memory_sufficient_dependency()->ready()); | ||
| ASSERT_TRUE( | ||
| query_context->resource_ctx()->task_controller()->paused_reason().is< | ||
| ErrorCode::WORKLOAD_GROUP_MEMORY_EXCEEDED>()); | ||
| ASSERT_FALSE(query_context->is_cancelled()); | ||
| { | ||
| std::unique_lock<std::mutex> lock(_wg_manager->_paused_queries_lock); | ||
| ASSERT_EQ(_wg_manager->_paused_queries_list[wg].size(), 1) | ||
| << "paused queue should keep the query"; | ||
| } | ||
|
|
||
| { | ||
| std::unique_lock<std::mutex> lock(_wg_manager->_paused_queries_lock); | ||
| _wg_manager->_paused_queries_list.erase(wg); | ||
| } | ||
| query_context->set_memory_sufficient(true); | ||
| thread_context.thread_mem_tracker_mgr->shrink_reserved(); | ||
| thread_context.detach_task(); |
There was a problem hiding this comment.
This test mutates _paused_queries_list directly for cleanup and relies on reaching the bottom of the test to call shrink_reserved() / detach_task(). Any earlier ASSERT_* failure will skip this cleanup and can leak paused-query state and thread attachment into subsequent tests, causing order-dependent failures. Prefer a scope guard/RAII cleanup (or a test-only helper API on the manager) so _paused_queries_list cleanup and detach_task() always happen.
| auto st = thread_context.thread_mem_tracker_mgr->try_reserve(2048); | |
| ASSERT_TRUE(st.ok()) << st.to_string(); | |
| ASSERT_EQ(query_context->resource_ctx()->memory_context()->current_memory_bytes(), 2048); | |
| ASSERT_EQ(query_context->resource_ctx()->memory_context()->reserved_consumption(), 2048); | |
| ASSERT_LT(query_context->resource_ctx()->memory_context()->current_memory_bytes(), | |
| query_context->resource_ctx()->memory_context()->mem_limit()); | |
| ASSERT_LT(query_context->resource_ctx()->memory_context()->current_memory_bytes() + 1024, | |
| query_context->resource_ctx()->memory_context()->mem_limit()); | |
| bool exceed_low_watermark = false; | |
| bool exceed_high_watermark = false; | |
| wg->check_mem_used(&exceed_low_watermark, &exceed_high_watermark); | |
| ASSERT_FALSE(exceed_low_watermark); | |
| ASSERT_FALSE(exceed_high_watermark); | |
| st = thread_context.thread_mem_tracker_mgr->try_reserve(1024); | |
| ASSERT_TRUE(st.is<ErrorCode::WORKLOAD_GROUP_MEMORY_EXCEEDED>()) << st.to_string(); | |
| ASSERT_FALSE(st.is<ErrorCode::QUERY_MEMORY_EXCEEDED>()) << st.to_string(); | |
| ASSERT_EQ(query_context->resource_ctx()->memory_context()->current_memory_bytes(), 2048); | |
| ASSERT_EQ(query_context->resource_ctx()->memory_context()->reserved_consumption(), 2048); | |
| _wg_manager->add_paused_query(query_context->resource_ctx(), 1024, st); | |
| ASSERT_FALSE(query_context->get_memory_sufficient_dependency()->ready()); | |
| { | |
| std::unique_lock<std::mutex> lock(_wg_manager->_paused_queries_lock); | |
| ASSERT_EQ(_wg_manager->_paused_queries_list[wg].size(), 1) | |
| << "paused queue should not be empty"; | |
| } | |
| _run_checking_loop(wg, 3); | |
| ASSERT_FALSE(query_context->get_memory_sufficient_dependency()->ready()); | |
| ASSERT_TRUE( | |
| query_context->resource_ctx()->task_controller()->paused_reason().is< | |
| ErrorCode::WORKLOAD_GROUP_MEMORY_EXCEEDED>()); | |
| ASSERT_FALSE(query_context->is_cancelled()); | |
| { | |
| std::unique_lock<std::mutex> lock(_wg_manager->_paused_queries_lock); | |
| ASSERT_EQ(_wg_manager->_paused_queries_list[wg].size(), 1) | |
| << "paused queue should keep the query"; | |
| } | |
| { | |
| std::unique_lock<std::mutex> lock(_wg_manager->_paused_queries_lock); | |
| _wg_manager->_paused_queries_list.erase(wg); | |
| } | |
| query_context->set_memory_sufficient(true); | |
| thread_context.thread_mem_tracker_mgr->shrink_reserved(); | |
| thread_context.detach_task(); | |
| { | |
| using WorkloadGroupPtr = decltype(wg); | |
| using QueryContextPtr = decltype(query_context); | |
| struct ScopedTestCleanup { | |
| WorkloadGroupMgr* wg_manager; | |
| WorkloadGroupPtr workload_group; | |
| QueryContextPtr query_ctx; | |
| ThreadContext& thread_ctx; | |
| ~ScopedTestCleanup() { | |
| { | |
| std::unique_lock<std::mutex> lock(wg_manager->_paused_queries_lock); | |
| wg_manager->_paused_queries_list.erase(workload_group); | |
| } | |
| query_ctx->set_memory_sufficient(true); | |
| thread_ctx.thread_mem_tracker_mgr->shrink_reserved(); | |
| thread_ctx.detach_task(); | |
| } | |
| } cleanup {_wg_manager.get(), wg, query_context, thread_context}; | |
| auto st = thread_context.thread_mem_tracker_mgr->try_reserve(2048); | |
| ASSERT_TRUE(st.ok()) << st.to_string(); | |
| ASSERT_EQ(query_context->resource_ctx()->memory_context()->current_memory_bytes(), 2048); | |
| ASSERT_EQ(query_context->resource_ctx()->memory_context()->reserved_consumption(), 2048); | |
| ASSERT_LT(query_context->resource_ctx()->memory_context()->current_memory_bytes(), | |
| query_context->resource_ctx()->memory_context()->mem_limit()); | |
| ASSERT_LT(query_context->resource_ctx()->memory_context()->current_memory_bytes() + 1024, | |
| query_context->resource_ctx()->memory_context()->mem_limit()); | |
| bool exceed_low_watermark = false; | |
| bool exceed_high_watermark = false; | |
| wg->check_mem_used(&exceed_low_watermark, &exceed_high_watermark); | |
| ASSERT_FALSE(exceed_low_watermark); | |
| ASSERT_FALSE(exceed_high_watermark); | |
| st = thread_context.thread_mem_tracker_mgr->try_reserve(1024); | |
| ASSERT_TRUE(st.is<ErrorCode::WORKLOAD_GROUP_MEMORY_EXCEEDED>()) << st.to_string(); | |
| ASSERT_FALSE(st.is<ErrorCode::QUERY_MEMORY_EXCEEDED>()) << st.to_string(); | |
| ASSERT_EQ(query_context->resource_ctx()->memory_context()->current_memory_bytes(), 2048); | |
| ASSERT_EQ(query_context->resource_ctx()->memory_context()->reserved_consumption(), 2048); | |
| _wg_manager->add_paused_query(query_context->resource_ctx(), 1024, st); | |
| ASSERT_FALSE(query_context->get_memory_sufficient_dependency()->ready()); | |
| { | |
| std::unique_lock<std::mutex> lock(_wg_manager->_paused_queries_lock); | |
| ASSERT_EQ(_wg_manager->_paused_queries_list[wg].size(), 1) | |
| << "paused queue should not be empty"; | |
| } | |
| _run_checking_loop(wg, 3); | |
| ASSERT_FALSE(query_context->get_memory_sufficient_dependency()->ready()); | |
| ASSERT_TRUE( | |
| query_context->resource_ctx()->task_controller()->paused_reason().is< | |
| ErrorCode::WORKLOAD_GROUP_MEMORY_EXCEEDED>()); | |
| ASSERT_FALSE(query_context->is_cancelled()); | |
| { | |
| std::unique_lock<std::mutex> lock(_wg_manager->_paused_queries_lock); | |
| ASSERT_EQ(_wg_manager->_paused_queries_list[wg].size(), 1) | |
| << "paused queue should keep the query"; | |
| } | |
| } |
| return false; | ||
| } else { | ||
| Status error_status = Status::MemoryLimitExceeded( | ||
| "Query {} workload group memory is exceeded" |
There was a problem hiding this comment.
In the memory_usage > limit branch, the error is constructed with the message "workload group memory is exceeded", but the branch condition indicates the query/requestor has exceeded its own limit (memory_usage > limit). This makes the cancellation reason misleading for users and logs. Consider updating the message (and/or error type) to reflect query memory limit exceeded (optionally including usage/limit values), reserving the workload-group message for the watermark/limit path.
| "Query {} workload group memory is exceeded" | |
| "Query {} memory limit is exceeded" |
| bool exceed_low_watermark = false; | ||
| bool exceed_high_watermark = false; | ||
| wg->check_mem_used(size_to_reserve, &exceed_low_watermark, &exceed_high_watermark); | ||
| if (!exceed_high_watermark) { |
There was a problem hiding this comment.
PR description currently contains duplicated template sections and placeholder references (e.g., close #xxx, Related PR: #xxx) which conflict with the earlier stated summary and make the intended linkage/closure ambiguous. Please clean up the PR description so it reflects the actual issue/PR references and the single intended problem statement.
### What problem does this PR solve? Issue Number: None Related PR: apache#62285 Problem Summary: Address review feedback for the workload group paused-query change by avoiding unsigned wraparound in the test helper loop, making the reserve-failure test cleanup exception-safe, and clarifying the cancel message when the query itself exceeds its memory limit during workload group pressure handling. ### Release note None ### Check List (For Author) - Test: No need to test (small follow-up on test cleanup and cancellation message; attempted incremental build but the sandbox hit a ccache temporary-directory permission issue) - Behavior changed: Yes (the cancellation message now reports query memory limit exceeded instead of workload group memory exceeded in that branch) - Does this need documentation: No
|
run buildall |
BE UT Coverage ReportIncrement line coverage Increment coverage report
|
BE Regression && UT Coverage ReportIncrement line coverage Increment coverage report
|
|
PR approved by at least one committer and no changes requested. |
|
PR approved by anyone and no changes requested. |
Issue Number: None Related PR: apache#62285 Problem Summary: Address review feedback for the workload group paused-query change by avoiding unsigned wraparound in the test helper loop, making the reserve-failure test cleanup exception-safe, and clarifying the cancel message when the query itself exceeds its memory limit during workload group pressure handling. None - Test: No need to test (small follow-up on test cleanup and cancellation message; attempted incremental build but the sandbox hit a ccache temporary-directory permission issue) - Behavior changed: Yes (the cancellation message now reports query memory limit exceeded instead of workload group memory exceeded in that branch) - Does this need documentation: No
e1ecfae to
f9a62eb
Compare
|
run buildall |
BE UT Coverage ReportIncrement line coverage Increment coverage report
|
BE Regression && UT Coverage ReportIncrement line coverage Increment coverage report
|
…ll fails
Issue Number: None
Related PR: None
Problem Summary: Keep queries paused after WORKLOAD_GROUP_MEMORY_EXCEEDED when retrying the same reserve would still exceed the workload group high watermark, and add a manager unit test that exercises a real reserve failure path.
None
- Test: Unit Test
- Incrementally compiled , , and
- Full was not completed because linking triggers a large incremental rebuild in this worktree
- Behavior changed: Yes (queries that still cannot satisfy the same workload group reserve stay paused instead of being resumed immediately)
- Does this need documentation: No
Issue Number: None Related PR: apache#62285 Problem Summary: Address review feedback for the workload group paused-query change by avoiding unsigned wraparound in the test helper loop, making the reserve-failure test cleanup exception-safe, and clarifying the cancel message when the query itself exceeds its memory limit during workload group pressure handling. None - Test: No need to test (small follow-up on test cleanup and cancellation message; attempted incremental build but the sandbox hit a ccache temporary-directory permission issue) - Behavior changed: Yes (the cancellation message now reports query memory limit exceeded instead of workload group memory exceeded in that branch) - Does this need documentation: No
f9a62eb to
52975b5
Compare
|
run buildall |
1 similar comment
|
run buildall |
BE UT Coverage ReportIncrement line coverage Increment coverage report
|
BE Regression && UT Coverage ReportIncrement line coverage Increment coverage report
|
|
PR approved by at least one committer and no changes requested. |
…ll fails (#62285) ### What problem does this PR solve? Issue Number: None Related PR: None Problem Summary: Keep queries paused after WORKLOAD_GROUP_MEMORY_EXCEEDED when retrying the same reserve would still exceed the workload group high watermark, and add a manager unit test that exercises a real reserve failure path. ### Release note None ### Check List (For Author) - Test: Unit Test - Incrementally compiled , , and - Full was not completed because linking triggers a large incremental rebuild in this worktree - Behavior changed: Yes (queries that still cannot satisfy the same workload group reserve stay paused instead of being resumed immediately) - Does this need documentation: No ### What problem does this PR solve? Issue Number: close #xxx Related PR: #xxx Problem Summary: ### Release note None ### Check List (For Author) - Test <!-- At least one of them must be included. --> - [ ] Regression test - [ ] Unit Test - [ ] Manual test (add detailed scripts or steps below) - [ ] No need to test or manual test. Explain why: - [ ] This is a refactor/code format and no logic has been changed. - [ ] Previous test can cover this change. - [ ] No code files have been changed. - [ ] Other reason <!-- Add your reason? --> - Behavior changed: - [ ] No. - [ ] Yes. <!-- Explain the behavior change --> - Does this need documentation? - [ ] No. - [ ] Yes. <!-- Add document PR link here. eg: apache/doris-website#1214 --> ### Check List (For Reviewer who merge this PR) - [ ] Confirm the release note - [ ] Confirm test cases - [ ] Confirm document - [ ] Add branch pick label <!-- Add branch pick label that this PR should merge into -->
What problem does this PR solve?
Issue Number: None
Related PR: None
Problem Summary: Keep queries paused after WORKLOAD_GROUP_MEMORY_EXCEEDED when retrying the same reserve would still exceed the workload group high watermark, and add a manager unit test that exercises a real reserve failure path.
Release note
None
Check List (For Author)
What problem does this PR solve?
Issue Number: close #xxx
Related PR: #xxx
Problem Summary:
Release note
None
Check List (For Author)
Test
Behavior changed:
Does this need documentation?
Check List (For Reviewer who merge this PR)